-
Notifications
You must be signed in to change notification settings - Fork 0
/
ransac.py
331 lines (288 loc) · 13.7 KB
/
ransac.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
import numpy as np
import copy
from models import Transforms
from scipy.misc import comb
from rh_logger.api import logger
import logging
import math
def array_to_string(arr):
return arr.tostring()
#return '_'.join(map(str, arr))
def tri_area(p1, p2, p3):
scalar = p1.ndim == 1
p1 = np.atleast_2d(p1)
p2 = np.atleast_2d(p2)
p3 = np.atleast_2d(p3)
area = (p1[:, 0]*(p2[:, 1] - p3[:, 1]) +
p2[:, 0]*(p3[:, 1] - p1[:, 1]) +
p3[:, 0]*(p1[:, 1] - p2[:, 1])) / 2.0
# area might be negative
return area[0] if scalar else area
def choose_forward(n, k, n_draws):
'''Choose k without replacement from among N
:param n: number of samples to choose from
:param k: number of samples to choose
:param n_draws: number of tuples to return
returns an n_draws by k array of k-tuples
'''
if n == 0:
return np.zeros((0, k), int)
max_combinations = comb(n, k)
if max_combinations / 3 < n_draws:
return choose_forward_dense(n, k, n_draws)
else:
return choose_forward_sparse(n, k, n_draws)
def enumerate_choices(n, k):
'''Enumerate all the ways to choose k from n
returns choices sorted lexigraphically, e.g.
0, 1
0, 2
1, 2
'''
if k == 1:
return np.arange(n).reshape(n, 1)
#
# Enumerate ways to choose k-1 from n-1 (there are no ways
# to choose the last, so n-1)
last = enumerate_choices(n-1, k-1)
#
# number of possible choices for each of the previous
# is from among the remaining.
#
n_choices = n - 1 - last[:, -1]
index = np.hstack([[0], np.cumsum(n_choices)])
#
# allocate memory for the result
#
result = np.zeros((index[-1], k), int)
#
# Create a back pointer into "last" for each element of the new array
#
back_ptr = np.zeros(result.shape[0], int)
back_ptr[index[:-1]] = 1
back_ptr = np.cumsum(back_ptr) - 1
#
# Broadcast the elements of the old array into the new one
# using the back pointer
result[:, :-1] = last[back_ptr, :]
#
# pull a cumsum trick: fill the last column with all "1" except
# for the first element which is - its place in the array
#
result[1:, -1] = 1
#
# Then we subtract the number of entries to get back to zero
result[index[1:-1], -1] = -n_choices[:-1]+1
#
# The last result has to start at the next to last + 1
# 0, 1 <-
# 0, 2
# 1, 2 <-
result[:, -1] = np.cumsum(result[:, -1]) + result[:, -2] + 1
return result
def choose_forward_dense(n, k, n_draws):
'''Choose k without replacement from among N where n_draws ~ # of combos
:param n: number of samples to choose from
:param k: number of samples to choose
:param n_draws: number of tuples to return
returns an n_draws by k array of k-tuples
'''
all_possible = enumerate_choices(n, k)
choices = np.random.choice(np.arange(all_possible.shape[0]), n_draws,
replace=False)
return all_possible[choices]
def choose_forward_sparse(n, k, n_draws):
'''Choose k without replacement from among N where n_draws << combos
:param n: number of samples to choose from
:param k: number of samples to choose
:param n_draws: number of tuples to return
returns an n_draws by k array of k-tuples
'''
#
# We assume that there is very little chance of collisions
# and we choose a few more than asked
#
extra = int(np.sqrt(n_draws)) + 1
n1_draws = n_draws + extra
choices = np.random.randint(0, n, (n1_draws, k))
while True:
#
# We sort in the k direction to get indices from low to high per draw
#
choices.sort(axis=1)
#
# We then argsort and duplicates should be adjacent in argsortland
#
order = np.lexsort([choices[:, k_] for k_ in range(k-1, -1, -1)])
to_remove = np.where(
np.all(choices[order[:-1]] == choices[order[1:]], axis=1))
result = np.delete(choices, order[to_remove], axis=0)
if len(result) >= n_draws:
return result
#
# Add some more choices if we didn't get enough
#
choices = np.vstack((result, np.random.randint(0, n, (extra, k))))
def check_model_stretch(model_matrix, max_stretch=0.25):
# Use the eigen values to validate the stretch
assert(max_stretch >= 0.0 and max_stretch <= 1.0)
eig_vals, _ = np.linalg.eig(model_matrix)
# Note that this also takes flipping as an incorrect transformation
valid_eig_vals = [eig_val for eig_val in eig_vals if eig_val >= 1.0 - max_stretch and eig_val <= 1.0 + max_stretch]
return len(valid_eig_vals) == 2
def filter_triangles(m0, m1, choices,
max_stretch=0.25,
max_area=.3):
'''Filter a set of match choices
:param m0: set of points in one domain
:param m1: set of matching points to m0 in another domain
:param choices: an N x 3 array of triangles
:param max_stretch: filter out a choice if it shrinks by 1-max_stretch
or stretches by 1+max_stretch
:param max_area: filter out a choice if the area of m1's triangle is
less than 1-max_area or more than 1+max_area
If a triangle in m0 has a different absolute area than in m1, exclude it
If the eigenvalues of the affine transform array indicate a shrink of
a factor of 1-max_stretch or a stretch of a factor of 1+max_stretch,
exclude
'''
pt1a, pt2a, pt3a = [m0[choices][:, _, :] for _ in range(3)]
pt1b, pt2b, pt3b = [m1[choices][:, _, :] for _ in range(3)]
areas_a = tri_area(pt1a, pt2a, pt3a)
areas_b = tri_area(pt1b, pt2b, pt3b)
area_ratio = areas_a / (areas_b + np.finfo(areas_b.dtype).eps)
mask = (area_ratio <= 1+max_area) & (area_ratio >= 1-max_area)
return choices[mask]
def ransac(sample_matches, test_matches, target_model_type, iterations, epsilon, min_inlier_ratio, min_num_inlier, det_delta=0.55, max_stretch=None, max_rot_deg=None, tri_angles_comparator=None):
# model = Model.create_model(target_model_type)
assert(len(sample_matches[0]) == len(sample_matches[1]))
best_model = None
best_model_score = 0 # The higher the better
best_inlier_mask = None
best_model_mean_dists = 0
proposed_model = Transforms.create(target_model_type)
max_rot_deg_cos = None
if max_rot_deg is not None:
max_rot_deg_cos = math.cos(max_rot_deg * math.pi / 180.0)
#print("max_rot_deg: {}, max_rot_deg_cos: {}, {}".format(max_rot_deg, max_rot_deg_cos, max_rot_deg * math.pi / 180.0))
if proposed_model.MIN_MATCHES_NUM > sample_matches[0].shape[0]:
logger.report_event("RANSAC cannot find a good model because the number of initial matches ({}) is too small.".format(sample_matches[0].shape[0]), log_level=logging.WARN)
return None, None, None
# Avoiding repeated indices permutations using a dictionary
# Limit the number of possible matches that we can search for using n choose k
max_combinations = int(comb(len(sample_matches[0]), proposed_model.MIN_MATCHES_NUM))
max_iterations = min(iterations, max_combinations)
choices = choose_forward(len(sample_matches[0]),
proposed_model.MIN_MATCHES_NUM,
max_iterations)
if proposed_model.MIN_MATCHES_NUM == 3:
choices = filter_triangles(sample_matches[0], sample_matches[1], choices)
for min_matches_idxs in choices:
# Try to fit them to the model
if proposed_model.fit(sample_matches[0][min_matches_idxs], sample_matches[1][min_matches_idxs]) == False:
continue
model_matrix = proposed_model.get_matrix()[:2, :2]
if abs(model_matrix[0, 0]) < 0.01:
continue
if max_rot_deg_cos is not None and proposed_model.MIN_MATCHES_NUM == 2:
if model_matrix[0, 0] < max_rot_deg_cos:
continue
if proposed_model.MIN_MATCHES_NUM == 3:
# check the stretch of the new transformation
if max_stretch is not None and not check_model_stretch(model_matrix, max_stretch):
continue
# if the proposed model distorts the image too much, skip the model
det = np.linalg.det(model_matrix)
if det < 1.0 - det_delta or det > 1.0 + det_delta:
continue
# Compare the triangle angles
if tri_angles_comparator is not None:
if tri_angles_comparator(proposed_model) is False:
continue
# print "proposed_model", proposed_model.to_str()
# Verify the new model
proposed_model_score, inlier_mask, proposed_model_mean = proposed_model.score(test_matches[0], test_matches[1], epsilon, min_inlier_ratio, min_num_inlier)
# print "proposed_model_score", proposed_model_score
if proposed_model_score > best_model_score:
best_model = copy.deepcopy(proposed_model)
best_model_score = proposed_model_score
best_inlier_mask = inlier_mask
best_model_mean_dists = proposed_model_mean
'''
if best_model is None:
print "Cannot find a good model during ransac. best_model_score {}".format(best_model_score)
else:
print "RANSAC result: best_model_score", best_model_score, "best_model:", best_model.to_str(), "best_model_mean_dists:", best_model_mean_dists
'''
return best_inlier_mask, best_model, best_model_mean_dists
def filter_after_ransac(candidates, model, max_trust, min_num_inliers):
"""
Estimate the AbstractModel and filter potential outliers by robust iterative regression.
This method performs well on data sets with low amount of outliers (or after RANSAC).
"""
# copy the model
new_model = copy.deepcopy(model)
dists = []
# iteratively find a new model, by fitting the candidates, and removing those that are far than max_trust*median-distance
# until the set of remaining candidates does not change its size
# for the initial iteration, we set a value that is higher the given candidates size
prev_iteration_num_inliers = candidates.shape[1] + 1
# keep a copy of the candidates that will be changed due to fitting and error
inliers = copy.copy(candidates[0])
# keep track of the candidates using a mask
candidates_mask = np.ones((candidates.shape[1]), dtype=np.bool)
while prev_iteration_num_inliers > np.sum(candidates_mask):
prev_iteration_num_inliers = np.sum(candidates_mask)
# Get the inliers and their corresponding matches
inliers = candidates[0][candidates_mask]
to_image_candidates = candidates[1][candidates_mask]
# try to fit the model
if new_model.fit(inliers, to_image_candidates) == False:
break
# get the meidan error (after transforming the points)
pts_after_transform = new_model.apply(inliers)
dists = np.sqrt(np.sum((pts_after_transform - to_image_candidates) ** 2, axis=1))
median = np.median(dists)
# print "dists mean", np.mean(dists)
# print "median", median
# print dists <= (median * max_trust)
inliers_mask = dists <= (median * max_trust)
candidates_mask[candidates_mask == True] = inliers_mask
if np.sum(candidates_mask) < min_num_inliers:
return None, None, -1
return new_model, candidates_mask, np.mean(dists)
def filter_matches(sample_matches, test_matches, target_model_type, iterations, epsilon, min_inlier_ratio, min_num_inlier, max_trust, det_delta=0.35, max_stretch=None, max_rot_deg=None, robust_filter=True, tri_angles_comparator=None):
"""Perform a RANSAC filtering given all the matches"""
new_model = None
filtered_matches = None
meandists = -1
# Apply RANSAC
# print "Filtering {} matches".format(matches.shape[1])
logger.report_event("pre-ransac matches count: sample={}, test={}".format(len(sample_matches[0]), len(test_matches[0])), log_level=logging.DEBUG)
inliers_mask, model, _ = ransac(sample_matches, test_matches, target_model_type, iterations, epsilon, min_inlier_ratio, min_num_inlier, det_delta, max_stretch, max_rot_deg, tri_angles_comparator=tri_angles_comparator)
if inliers_mask is None:
logger.report_event("post-ransac matches count: 0", log_level=logging.DEBUG)
else:
logger.report_event("post-ransac matches count: {}".format(np.sum(inliers_mask)), log_level=logging.DEBUG)
if not robust_filter:
inliers = np.array([test_matches[0][inliers_mask], test_matches[1][inliers_mask]])
return model, inliers
# Apply further filtering
if inliers_mask is not None:
inliers = np.array([test_matches[0][inliers_mask], test_matches[1][inliers_mask]])
# print "Found {} good matches out of {} matches after RANSAC".format(inliers.shape[1], matches.shape[1])
new_model, filtered_inliers_mask, meandists = filter_after_ransac(inliers, model, max_trust, min_num_inlier)
filtered_matches = np.array([inliers[0][filtered_inliers_mask], inliers[1][filtered_inliers_mask]])
'''
if new_model is None:
print "No model found after RANSAC"
else:
# _, filtered_matches_mask, mean_val = new_model.score(matches[0], matches[1], epsilon, min_inlier_ratio, min_num_inlier)
# filtered_matches = np.array([matches[0][filtered_matches], matches[1][filtered_matches]])
print "Model found after robust regression: {}, applies to {} out of {} matches.".format(new_model.to_str(), filtered_matches.shape[1], matches.shape[1])
'''
if filtered_matches is None:
logger.report_event("post-ransac-filter matches count: 0", log_level=logging.DEBUG)
else:
logger.report_event("post-ransac-filter matches count: {}".format(filtered_matches.shape[1]), log_level=logging.DEBUG)
return new_model, filtered_matches