This repository has been archived by the owner on Feb 1, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
matcher.py
157 lines (141 loc) · 5.88 KB
/
matcher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
from worker import Worker
from math import sqrt
import os
import pickle
import numpy as np
import scipy
import scipy.spatial
from PyQt5.QtGui import *
from PyQt5.QtWidgets import *
from PyQt5.QtCore import *
class Matcher(QObject):
"""Class untuk melakukan proses matching"""
# Pyqt Signals
sgnSrcProgress = pyqtSignal(int)
sgnSrcTotalImg = pyqtSignal(int)
sgnSrcException = pyqtSignal(object)
sgnSrcResult = pyqtSignal(object)
sgnSrcDone = pyqtSignal()
def __init__(self, pckPath="imgData.pck"):
# QObject init
super(Matcher, self).__init__()
# Multithreader
self.threadPool = QThreadPool()
# Class property
self.pckPath = os.path.join("pck", pckPath)
def precalculateVector(self, fastAlgorithm=False):
with open(self.pckPath, "rb") as f:
self.data = pickle.load(f)
# Parse the dictionary
self.name = []
self.vector = []
self.vectorLen = []
vectorLenSample = len(next(iter(self.data.values())))
zeroVector = np.zeros(vectorLenSample)
if not(fastAlgorithm):
for name, vector in self.data.items():
self.name.append(name)
self.vector.append(vector)
# precompute vector length
try:
self.vectorLen.append(self.vectorDistance(vector, zeroVector))
except Exception as e:
self.name.pop()
self.vector.pop()
else:
for name, vector in self.data.items():
self.name.append(name)
self.vector.append(vector)
# precompute vector length
try:
self.vectorLen.append(self.vectorDistance(vector, zeroVector, fastAlgorithm=True))
except Exception as e:
self.name.pop()
self.vector.pop()
# Create numpy array
self.name = np.array(self.name)
self.vector = np.array(self.vector)
self.vectorLen = np.array(self.vectorLen)
def vectorDistance(self, vector1, vector2, fastAlgorithm=False):
if not(fastAlgorithm): # default algo
sum = 0
# calculate vector1[i] - vector2[i], i element index, and store it in vectorDiff
vectorDiff = np.array(vector1) - np.array(vector2)
for el in vectorDiff:
sum += el * el
return (sqrt(sum))
else: # using numpy norm function, for GUI testing only if needed
return (np.linalg.norm(vector1 - vector2))
def euDist(self, vector, fastAlgorithm=False):
# Init QProgressDialog
self.sgnSrcTotalImg.emit(len(self.vector))
counter = 0
imgSimilarity = []
for v in self.vector:
imgSimilarity.append(self.vectorDistance(vector, v, fastAlgorithm=fastAlgorithm))
# Update progress
counter += 1
self.sgnSrcProgress.emit(counter)
imgSimilarity = np.array(imgSimilarity)
return imgSimilarity
def vectorDot(self, vector1, vector2):
num = 0
# calculate vector1[i] * vector2[i], i element index, and store it in vectorMult
vectorMult = np.array(vector1) * np.array(vector2)
for el in vectorMult:
num += el
return (num)
def cosSim(self, vector, fastAlgorithm=False):
if not(fastAlgorithm): # default algo
# Init QProgressDialog
self.sgnSrcTotalImg.emit(len(self.vector))
counter = 0
imgSimilarity = []
zeroVector = np.zeros(len(vector))
for i in range(len(self.vector)):
denom = self.vectorDistance(vector, zeroVector) * self.vectorLen[i]
try:
assert denom != 0
num = self.vectorDot(vector, self.vector[i])
imgSimilarity.append(1 - num/denom)
except AssertionError as e:
imgSimilarity.append(1) # sehingga gambar yang error tidak akan dipilih jadi top imgSimilarity
# imgSimilarity, smaller value: more similar, maxVal in cosSim is 1
# Update progress
counter += 1
self.sgnSrcProgress.emit(counter)
imgSimilarity = np.array(imgSimilarity)
return imgSimilarity
else: # using scipy, for GUI testing only if needed
return scipy.spatial.distance.cdist(self.vector, vector.reshape(1, -1), 'cosine').reshape(-1)
def match(self, vector, op, top, fastAlgorithm):
try:
if (op == "euDist"):
imgSimilarity = self.euDist(vector, fastAlgorithm=fastAlgorithm)
elif (op == "cosSim"):
imgSimilarity = self.cosSim(vector, fastAlgorithm=fastAlgorithm)
else:
raise Exception
except Exception as e:
print("Invalid option")
# Sort imgSimilarity, smaller value: more similar
idxSort = np.argsort(imgSimilarity)
nearestImgPath = self.name[idxSort][:top]
nearestImgDist = imgSimilarity[idxSort][:top]
return (nearestImgPath.tolist(), nearestImgDist.tolist())
# Multithreader
def matchThreader(self, vector, op, top=3, fastAlgorithm=False):
# Create worker instance
worker = Worker(self.match, vector, op, top, fastAlgorithm)
# Connect signals
worker.signals.exception.connect(self.matchThreadException)
worker.signals.result.connect(self.matchThreadResult)
worker.signals.done.connect(self.matchThreadDone)
# Run thread
self.threadPool.start(worker)
def matchThreadException(self, exception):
self.sgnSrcException.emit(exception)
def matchThreadResult(self, res):
self.sgnSrcResult.emit(res)
def matchThreadDone(self):
self.sgnSrcDone.emit()