-
Notifications
You must be signed in to change notification settings - Fork 1
/
instaFunctions.py
421 lines (345 loc) · 13.6 KB
/
instaFunctions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
# Functions
def labelToInt(label):
'''
Map from set of labels in original dataset (`strings`) into set of natural numbers (`int`) for easier manipulation of rdd
'''
uniqueLabels=list(np.unique(y))
return uniqueLabels.index(label)
def deleteBytes(datum):
'''
Clean dataset from categorical attributes, leaving numerical ones
Arguments:
One datum of the rdd.
Return:
Updated datum.
'''
x = datum[1]["x"]
mask = [type(i) != bytes for i in x]
datum[1]["x"] = np.asarray(x[mask])
print(x)
print(mask)
return datum
def localPlusPlusInit(points, k):
'''
KMeans++ initialization.
Arguments:
`points`: array (n, dim) of points to be clustered;
`k`: desired number of centroids.
Returns:
Initial array (k, dim) of centroids, k<=n.
'''
# Sample one point uniformly from points array
C=points[np.random.choice(points.shape[0])]
C=C[np.newaxis, :]
for _ in range(k):
# Compute array (n,1) of probabilities associated to each point
probs=np.min(np.sum((points[:,:,np.newaxis]-C.T[np.newaxis,:,:])**2, axis=1), axis=1).flatten()
# Normalize probability distribution
probs=probs/np.sum(probs)
# Draw one new centroid according to distrbution
nextCentroid=points[np.random.choice(points.shape[0], p=probs)][np.newaxis,:]
# Add centroid to array
C=np.vstack((C, nextCentroid))
return C
def weightedAverage(group):
"""
Compute weighted average of a group from a pd.DataFrame with point coordinates, weights, clusterId.
Utilized in local (non-distributed) version of Lloyds algorithm, needed also in K-Means//
"""
weight_column='weights'
groupby_column='clusterId'
columns_to_average = group.columns.difference([weight_column, groupby_column])
weighted_averages = group[columns_to_average].multiply(group[weight_column], axis=0).sum() / group[weight_column].sum()
return weighted_averages
def localLloyds(points, k, C_init=None, weights=None, n_iterations=100, logDict=None):
"""
Local (non-distributed) Lloyds algorithm
Arguments:
`points`: array (n, dim) of points to cluster;
`k`: number of desired clusters;
`C_init`: optional, array (k, dim) of initial centroids
`weights`: optional, weights for weighted average in centroid re-computing;
`n_iterations`: optional, number of iteration in lloyds algorithm;
`logDict`: optional, dictionary {'CostsKmeans', 'tIterations', 'tTotal'} to store cost and time info.
Return:
Array of expected centroids.
"""
t0 = time()
# Storing cost and time info
my_kMeansCosts = []
tIterations = []
df=pd.DataFrame(points)
# If weights not given, assume uniform weights for points
if weights is None:
weights=np.ones(shape=len(points))
df['weights']=weights
df['clusterId']=np.zeros(shape=len(points))
# If no C_init, default to K-Means++ initialization
if C_init is None:
C=localPlusPlusInit(points, k)
else:
C=C_init
clusterId=np.argmin(np.sum((points[:,:,np.newaxis]-C.T[np.newaxis,:,:])**2, axis=1), axis=1)
for iteration in range(n_iterations):
t1=time()
# Compute centroid given cluster
df['clusterId']=clusterId
C_df=df.groupby('clusterId')\
.apply(weightedAverage)\
.reset_index()
# Compute cluster given centroid
C_array=C_df[C_df.columns.difference(['weights', 'clusterId'])].reset_index(drop=True).to_numpy()
squared_distances=np.sum((points[:,:,np.newaxis]-C_array.T[np.newaxis,:,:])**2, axis=1)
clusterId=np.argmin(squared_distances, axis=1)
my_cost=sum(squared_distances[np.arange(len(squared_distances)), clusterId])
my_kMeansCosts.append(my_cost)
t2 = time()
tIteration = t2 - t1
tIterations.append(tIteration)
tEnd = time()
tTotal = tEnd - t0
# Store cost and time info
if logDict is not None:
logDict["CostsKmeans"] = my_kMeansCosts
logDict["tIterations"] = tIterations
logDict["tTotal"] = tTotal
return C_array
def minmaxRescale(datum, minS, maxS):
"""
Rescale datum in [0,1] interval for better clusterization
Arguments:
`datum`: see rdd format;
`minS`: array of min coordinate value among points for each attribute;
`maxS`: as `minS` with max.
Return:
Updated datum.
"""
mask = np.array(minS < maxS).astype(bool)
feature = datum[1]["x"]
feature = (feature[mask] - minS[mask])/(maxS[mask] - minS[mask])
return (datum[0], {"x": feature, "y": datum[1]["y"], "d2":datum[1]["d2"]})
def selectCluster(datum, C, updateDistances=True):
"""
Associate datum to its centroid and optionally updates squared distance between them.
Arguments:
`datum`: see rdd format;
`C`: array (k, len(datum[1]["x"]));
`updateDistances`: if True, updates `datum[1]["d2"]` with squared distance between datum point and closest centroid in C.
Return:
Updated datum.
"""
distances = np.sum((datum[1]["x"] - C)**2, axis=1)
print('distances: ',distances)
clusterId = np.argmin(distances)
if updateDistances is True:
return (clusterId, {'x':datum[1]['x'], 'y':datum[1]['y'], 'd2':distances[clusterId]})
else:
return (clusterId, datum[1])
def updateCentroids(Rdd):
"""
Update centroids as spatial average of cluster points
Argument:
`Rdd`: see rdd format;
Return:
Updated array of centroids.
"""
C=Rdd.mapValues(lambda xy: (xy['x'], 1))\
.reduceByKey(lambda a,b : (a[0]+b[0], a[1]+b[1]))\
.mapValues(lambda a:a[0]/a[1])\
.values()\
.collect()
C=np.array(C) #check later more carefully if causes some overhead
return C
def updateDistances(Rdd, C):
"""
Update Rdd with square distances from centroids, given Rdd with clusters already assigned to each point
Arguments:
`Rdd`: see rdd format;
`C`: array of cluster centroids.
Return:
Updated rdd.
"""
def datumUpdate(datum, C):
'''
Update a datum of the rdd with distance from assigned centroid
'''
d2=np.sum((datum[1]['x']-C[datum[0]])**2)
#return datum
return (datum[0], {"x": datum[1]["x"], "y": datum[1]["y"], "d2":d2})
Rdd=Rdd.map(lambda datum:datumUpdate(datum, C))
return Rdd
def cost(Rdd):
"""
Calculate global cost of clusterization, from an Rdd with distances from centroids already updated
"""
my_cost=Rdd.map(lambda datum : datum[1]['d2'])\
.reduce(lambda a,b: a+b)
return my_cost
def kMeans(Rdd, C_init, maxIterations, logParallelKmeans=None):
"""
Distributed (parallel) Lloyds algorithm
Arguments:
`Rdd`: see rdd format;
`C_init`: array (k, dim) of initial centroids;
`maxIterations`: max number of iterations;
`logParallelKmeans`: optional, dictionary {'CostsKmeans', 'tIterations', 'tTotal'} to store cost and time info.
Return:
Array of expected centroids.
"""
t0 = time()
# Storing cost and time info
my_kMeansCosts = []
tIterations = []
C=C_init
for t in range(maxIterations):
t1 = time()
RddCached = Rdd.map(lambda datum: selectCluster(datum, C)).persist() ###
# Now we compute the new centroids by calculating the averages of points belonging to the same cluster.
C=updateCentroids(RddCached)
my_cost = cost(RddCached)
my_kMeansCosts.append(my_cost)
t2 = time()
tIteration = t2 - t1
tIterations.append(tIteration)
#RddCached.unpersist()
# Break loop if convergence of cost is reached
if (len(my_kMeansCosts) > 1) and (my_kMeansCosts[-1] > 0.999*my_kMeansCosts[-2]):
break
tEnd = time()
tTotal = tEnd - t0
# Store cost and time info in argument dictionary
if logParallelKmeans is not None:
logParallelKmeans["CostsKmeans"] = my_kMeansCosts
logParallelKmeans["tIterations"] = tIterations
logParallelKmeans["tTotal"] = tTotal
return C
def naiveInitFromSet(Rdd, k, logNaiveInit=None, spark_seed=54321):
"""
Uniform sampling of k points from Rdd
Arguments:
`Rdd`: see rdd structure;
`k`: desired number of clusters;
`spark_seed`: optional, seed for spark random sampling;
`logNaiveInit`: optional, dictionary {'tTotal'} to store time info.
Return:
Initial array (k, dim) of centroids.
"""
t0 = time()
# Sampling. Replacement is set to False to avoid coinciding centroids BUT no guarantees that in the original dataset all points are distinct!!!
kSubset=Rdd.takeSample(False, k, seed=spark_seed)
C_init=np.array([datum[1]['x'] for datum in kSubset])
tEnd = time()
if logNaiveInit is not None:
logNaiveInit["tTotal"] = tEnd - t0
return C_init
def naiveInitFromSpace(k, dim):
"""
Uniform drawing of k points from euclidean space assuming the Rdd has been mapped into a [0,1]^dim space
Arguments:
`k`: desired number of clusters;
`dim`: dimensionality of points space.
Return:
Initial array (k, dim) of centroids.
"""
C_init=np.random.uniform(size=(k,dim))
return C_init
def parallelInit(Rdd, k, l, logParallelInit=None):
"""
Parallel initialization
Arguments:
`Rdd`: see rdd structure;
`k`: desired number of clusters;
`l`: coefficient to adjust sampling probability in order to obtain at least k centroids;
`logParallelInit`: optional, dictionary {'CostsKmeans', 'tIterations', 'tTotal'} to store cost and time info.
Return:
Initial array (k, dim) of centroids.
"""
t0 = time()
# initialize C as a point in the dataset
C=naiveInitFromSet(Rdd, 1)
# associate each datum to the only centroid (computed before) and computed distances and cost
Rdd=Rdd.map(lambda datum : (0, datum[1]))
Rdd=updateDistances(Rdd, C).persist() ###
my_cost=cost(Rdd)
# number of iterations (log(cost))
n_iterations=int(np.log(my_cost))
if(n_iterations<1): n_iterations=1
tSamples = []
tCentroids = []
CostInits = [my_cost]
# iterative sampling of the centroids
for _ in range(n_iterations):
t1=time()
# sample C' according to the probability
C_prime=Rdd.filter(lambda datum : np.random.uniform()<l*datum[1]['d2']/my_cost)\
.map(lambda datum : datum[1]['x'])\
.collect()
C_prime=np.array(C_prime)
t2=time()
# stack C and C', update distances, centroids, and cost
if (C_prime.shape[0]>0):
C=np.vstack((C, C_prime))
#Rdd.unpersist() ###
Rdd=Rdd.map(lambda datum: selectCluster(datum, C)).persist() ###
my_cost=cost(Rdd)
t3=time()
tSample = t2 -t1
tCentroid = t3 - t2
tSamples.append(tSample)
tCentroids.append(tCentroid)
CostInits.append(my_cost)
#erase centroids sampled more than once
C=C.astype(float)
C=np.unique(C, axis=0)
Rdd=Rdd.map(lambda datum: selectCluster(datum, C))
#compute weights of centroids (sizes of each cluster) and put them in a list whose index is same centroid index as C
wx=Rdd.countByKey()
weights=np.zeros(len(C))
weights[[list(wx.keys())]]=[list(wx.values())]
#subselection of k centroids from C, using local Lloyds algorithm with k-means++ initialization
if C.shape[0]<=k:
C_init=C
else:
C_init=localLloyds(C, k, weights=weights, n_iterations=100) #can be set to lloydsMaxIterations for consistency TODO
tEnd = time()
if logParallelInit is not None:
logParallelInit["tSamples"] = tSamples
logParallelInit["tCentroids"] = tCentroids
logParallelInit["CostInit"] = CostInits
logParallelInit["tTotal"] = tEnd - t0
#Rdd.unpersist() ###
return C_init
def predictedCentroidsLabeler(C_expected, C_predicted):
"""
Associate expected and predicted centroids based on distance.
Parameters:
`C_expected`: array (k, dim) of expected centroids;
`C_predicted`: array (k,dim) of predicted centroids;
Return:
List of labels, one for each expected centroid and pointing to its nearest predicted centroid;
List of corresponding distances.
"""
# Compute the distance matrix
distMatrix=np.sum((C_expected[:,:,np.newaxis]-C_predicted.T[np.newaxis, :,:])**2,axis=1)
# The labeler i-th entry j, tells that i-th centroid of C_expected is associated to j-th element of C_predicted
labeler=np.argmin(distMatrix,axis=1)
# Square distance of element of C_expected to nearest point in C_predicted
distances=np.sqrt(np.array(distMatrix[np.arange(len(distMatrix)),labeler]).astype(float))
return labeler, distances
def nearestCentroidDistances(C):
"""
Associate each centroid to the distance of the nearest one
Parameters:
`C`: array (k, dim) of centroids;
Return:
List of labels, one for each centroid and pointing to its nearest centroid;
List of corresponding distances.
"""
# Compute the distance matrix
distMatrix=np.sum((C[:,:,np.newaxis]-C.T[np.newaxis, :,:])**2,axis=1)
distMatrix+=np.diag(np.repeat(np.inf, distMatrix.shape[0]))
# The labeler i-th entry j, tells that i-th centroid of C_expected is associated to j-th element of C_predicted
labeler=np.argmin(distMatrix,axis=1)
# Square distance of element of C_expected to nearest point in C_predicted
distances=np.sqrt(np.array(distMatrix[np.arange(distMatrix.shape[0]),labeler]).astype(float))
return labeler, distances