-
Notifications
You must be signed in to change notification settings - Fork 2
/
test.py
212 lines (175 loc) · 7.58 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
"""
Generates solution .png files by loading pre-trained models and predicting.
"""
import random
import skimage
import os
import tqdm
import settings as S
import numpy as np
import train
import flow
import infer
import logging
import damage
import plac
import tensorflow as tf
logger = logging.getLogger(__name__)
def write_solution(path, names:list, images:list):
"""
Write out .png image(s).
"""
assert len(names) == len(images)
for i, img in enumerate(images):
outfname = os.path.join(path, names[i])
skimage.io.imsave(outfname, img.astype(np.uint8), check_contrast=False)
def randomize_damage(img):
"""
Make each building pixel a random value; likelihood is proportional to that
class' expected frequency.
"""
assert list(img.shape) == S.SAMPLESHAPE[:2], f"expected shape (1024,1024), got {img.shape}"
# distribute pixels among the classes proportional to the expected distributions
top = 0
nodamage = range(top, top + 80000); top += 80000
minordamage = range(top, top + 11000); top += 11000
majordamage = range(top, top + 8000); top += 8000
destroyed = range(top, top + 8000); top += 8000
for x,y in zip(*np.nonzero(img)):
num = random.randint(0,top-1)
if num in nodamage:
pix_val = 1
elif num in minordamage:
pix_val = 2
elif num in majordamage:
pix_val = 3
elif num in destroyed:
pix_val = 4
else:
raise Exception("result %d is outside known values" % num)
img[x][y] = pix_val
return img
def damage_by_building_classification(path):
"""
Generate solution .png files, classifying damage using contiguous
regions in the segmentation model's predicted masks in order to extract
individual building polygons from pre-disaster and post-disaster images.
"""
# load the localization (segmentation) model
S.BATCH_SIZE = 1
model = train.build_model(architecture=S.ARCHITECTURE, train=True)
model = train.load_weights(model, S.MODELSTRING_BEST)#.replace(".hdf5", "-best.hdf5"))
# load the damage classification model
dmg_model = damage.build_model()
dmg_model = damage.load_weights(dmg_model, S.DMG_MODELSTRING_BEST)
# get a dataflow for the test files
df = flow.Dataflow(files=flow.get_test_files(), transform=False,
shuffle=False, buildings_only=False, batch_size=1,
return_stacked=True)
i = 0
pbar = tqdm.tqdm(total=len(df))
# x = pre-disaster image, y = post-disaster image
for stacked, filename in df:
filename = os.path.basename(filename)
x = stacked
#filename = os.path.basename(df.samples[i][0].img_name)
filename = filename.replace("pre", "localization").replace(".png", "_prediction.png")
#if os.path.exists(os.path.join("solution", filename)):
# continue
# localization (segmentation)
pred = model.predict(x)
mask = infer.convert_prediction(pred)
write_solution(names=[filename], images=[mask], path=path)
# damage classification
filename = filename.replace("localization", "damage")
pre, post = stacked[...,:3], stacked[...,3:]#df.samples[i][0].image(), df.samples[i][1].image()
boxes, coords = flow.Building.get_all_in(pre, post, mask)
if len(boxes) > 0:
labels = dmg_model.predict(boxes)
for k, c in enumerate(coords):
x,y,w,h = c
mask[y:y+h,x:x+w] = np.argmax(labels[k])+1
write_solution(names=[filename], images=[mask], path=path)
pbar.update(1)
i += 1
def damage_by_segmentation(path):
"""
Generate solution .png files, using a single multiclass segmentation
model to do so.
"""
model = train.build_model(classes=6, damage=True)
model = train.load_weights(model, "damage-motokimura-mobilenetv2-best.hdf5")
#model.load_individual_weights()# = train.load_weights(model, S.DMG_MODELSTRING_BEST)
df = flow.Dataflow(files=flow.get_test_files(), transform=False,
batch_size=1, buildings_only=False, shuffle=False,
return_postmask=False, return_stacked=True,
return_average=False)
pbar = tqdm.tqdm(total=len(df))
for image,filename in df:
filename = os.path.basename(filename)
filename = filename.replace("pre", "localization").replace(".png", "_prediction.png")
#if os.path.exists(os.path.join("solution", filename)):
# continue
# localization (segmentation)
pred = model.predict([image])
mask = infer.convert_prediction(pred)
write_solution(names=[filename], images=[mask], path=path)
filename = filename.replace("localization", "damage")
write_solution(names=[filename], images=[mask], path=path)
pbar.update(1)
def damage_random(path):
"""
Generate solution .png files using random damage.
"""
model = train.build_model(train=False)#, save_path="motokimura-stacked-2.hdf5")
model = train.load_weights(model, S.MODELSTRING_BEST)
df = flow.Dataflow(files=flow.get_test_files(), transform=False,
batch_size=1, buildings_only=False, shuffle=False,
return_postmask=False, return_stacked=True,
return_average=False)
pbar = tqdm.tqdm(total=len(df))
for image,filename in df:
filename = os.path.basename(filename)
filename = filename.replace("pre", "localization").replace(".png", "_prediction.png")
#if os.path.exists(os.path.join("solution", filename)):
# continue
# localization (segmentation)
pred = model.predict([image])
mask = infer.convert_prediction(pred)
write_solution(names=[filename], images=[mask], path=path)
mask = randomize_damage(mask)
filename = filename.replace("localization", "damage")
write_solution(names=[filename], images=[mask], path=path)
pbar.update(1)
def from_logits(path):
inp = tf.keras.layers.Input((3145728 / 6, 6))
#x = tf.keras.layers.Reshape((-1,6))(inp)
x = tf.keras.layers.Activation('softmax')(inp)
m = tf.keras.models.Model(inputs=[inp], outputs=[x])
pbar = tqdm.tqdm(total=933)
for (pre,post) in flow.get_test_files():
filename = os.path.basename(pre)
logits1 = np.fromfile("logits/1-{}".format(filename)).astype(np.int32).reshape((-1,6))
logits2 = np.fromfile("logits/2-{}".format(filename)).astype(np.int32).reshape((-1,6))
logits = (logits1 + logits2) / 2
pred = m.predict(np.expand_dims(logits, axis=0))
mask = infer.convert_prediction(pred).astype(np.uint8)
filename = filename.replace("pre", "localization").replace(".png", "_prediction.png")
write_solution(names=[filename], images=[mask], path=path)
filename = filename.replace("localization", "damage")
write_solution(names=[filename], images=[mask], path=path)
pbar.update(1)
def cli(outdir: "The path to write solutions to",
segmentation: ("Use a multiclass segmentation model", "flag", "s"),
building: ("Use a binary segmentation model and individual building classifier", "flag", "b"),
random: ("Randomize damage", "flag", "r")):
if segmentation:
damage_by_segmentation(outdir)
elif building:
damage_by_building_classification(outdir)
elif random:
damage_random(outdir)
else:
logger.error()
if __name__ == '__main__':
plac.call(cli)