-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsimulator.py
333 lines (295 loc) · 9.17 KB
/
simulator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
#!/usr/bin/env pypy
from commons import isRealistic
import math
import random
if not isRealistic():
random.seed(0)
from operator import attrgetter
from node import Node
from job import Job
from schedulerpolicy import SchedulerPolicy
from history import History
from history import HistoryViewer
import sys
from datetime import datetime
'''
Simulator. Author: Inigo, Cheng
'''
class Simulator(SchedulerPolicy):
def __init__(self, logfile='history.log'):
# Initialize the scheduler
SchedulerPolicy.__init__(self) # super()
self.t = 0
# Nodes
self.nodes = {}
# History
self.logfile = logfile
self.history = History(filename=self.logfile)
# Job submission
self.lastJobId = 1
# Simulation
self.maxTime = None
# Id for jobs
if sys.platform == 'win32':
self.trackerId = datetime.now().strftime('%Y%m%d%H%M')
else:
self.trackerId = datetime.now().strftime('%4Y%2m%2d%2H%2M')
# Specify if the nodes are sent to sleep when there's no load
self.nodeManagement = True
# Outputs
self.energy = None
# Step length
self.STEP = 1
# Submit a job to run
def addJob(self, job):
# Assign automatic job id
if job.jobId == None:
while 'job_%s_%04d' % (self.trackerId, self.lastJobId) in self.jobs:
self.lastJobId += 1
job.jobId = 'job_%s_%04d' % (self.trackerId, self.lastJobId)
# Initialize tasks
job.initTasks()
# Save the information
self.jobs[job.jobId] = job
self.jobsQueue.append(job.jobId)
# Sort the queue according to submission order
self.jobsQueue = sorted(self.jobsQueue, cmp=self.schedulingPolicy)
return job.jobId
# Check if there is any idle node for reduces
def getIdleNodeMap(self):
for nodeId in sorted(self.nodes):
node = self.nodes[nodeId]
if node.status == 'ON' and len(node.maps) < node.numMaps:
return node
return None
def getIdleNodesMap(self):
ret = []
for nodeId in sorted(self.nodes):
node = self.nodes[nodeId]
if node.status == 'ON' and len(node.maps) < node.numMaps:
ret.append(node)
return ret
# Check if there is any idle node for reduces
def getIdleNodeRed(self):
for nodeId in sorted(self.nodes):
node = self.nodes[nodeId]
if node.status == 'ON' and len(node.reds) < node.numReds:
return node
return None
def getIdleNodesRed(self):
ret = []
for nodeId in sorted(self.nodes):
node = self.nodes[nodeId]
if node.status == 'ON' and len(node.reds) < node.numReds:
ret.append(node)
return ret
def getWakingNodes(self):
ret = 0
for nodeId in self.nodes:
node = self.nodes[nodeId]
if node.status.startswith('WAKING-'):
ret += 1
return ret
# Get a queued map
def getMapTask(self):
for jobId in self.jobsQueue:
job = self.jobs[jobId]
if self.t >= job.submit:
mapTask = job.getMapTask()
if mapTask != None:
return mapTask
return None
# Get a queued reduce
def getRedTask(self):
for jobId in self.jobsQueue:
job = self.jobs[jobId]
if self.t >= job.submit:
redTask = job.getRedTask()
if redTask != None:
return redTask
return None
# Check if there is a map queued
def mapQueued(self):
ret = 0
for jobId in self.jobsQueue:
job = self.jobs[jobId]
if self.t >= job.submit:
ret += job.mapQueued()
return ret
# Check if the node is required: running job or providing data for a job
def isNodeRequired(self, nodeId):
node = self.nodes[nodeId]
# Check if the node is in the covering subset (data) or is running
if node.covering or node.isRunning():
return True
# Check if it has executed tasks from active tasks
for jobId in self.jobsQueue:
job = self.jobs[jobId]
if job.isRunning() and nodeId in job.getNodes():
return True
return False
# Check if there is a reduce queued
def redQueued(self):
ret = 0
for jobId in self.jobsQueue:
job = self.jobs[jobId]
if self.t >= job.submit:
ret += job.redQueued()
return ret
def getNodesUtilization(self):
utilizations = []
for nodeId in self.nodes:
node = self.nodes[nodeId]
if node.status == 'ON':
utilization = 1.0*len(node.maps)/node.numMaps
utilizations.append(utilization)
return sum(utilizations)/len(utilizations) if len(utilizations)>0 else 1.0
def getNodesRunning(self):
ret = 0
for nodeId in self.nodes:
node = self.nodes[nodeId]
if node.status == 'ON':
ret += 1
return ret
# Energy in Wh
def getEnergy(self):
# J = Ws -> Wh
return self.energy/3600.0
# Average time to run per job in seconds
def getPerformance(self):
ret = None
if len(self.jobs) > 0:
ret = 0.0
for jobId in self.jobs:
job = self.jobs[jobId]
ret += job.getFinish()
ret = ret / len(self.jobs)
return ret
# Average quality per job in %
def getQuality(self):
ret = []
for jobId in self.jobs:
job = self.jobs[jobId]
ret.append(job.getQuality())
return sum(ret)/len(ret) if len(ret)>0 else 0.0
def isTimeLimit(self):
return not (self.maxTime==None or self.t < self.maxTime)
# Run simulation
def run(self):
self.energy = 0.0
# Log initial node status
for nodeId in self.nodes:
node = self.nodes[nodeId]
self.history.logNodeStatus(self.t, node)
# Iterate every X seconds
while len(self.jobsQueue) > 0 and not self.isTimeLimit():
# Run running tasks
# =====================================================
completedAttempts = []
for node in self.nodes.values():
completedAttempts += node.progress(self.STEP) # progress 1 second at a time
# Mark completed maps
completedJobs = []
for attempt in completedAttempts:
attempt.finish = self.t
# Check if we finish the jobs
completedJobs += attempt.getJob().completeAttempt(attempt)
# Log
self.history.logAttempt(attempt)
for job in completedJobs:
job.finish = self.t
job.status = Job.Status.SUCCEEDED
# Update queues
self.jobsQueue.remove(job.jobId)
self.jobsDone.append(job.jobId)
# Log
self.history.logJob(job)
# Check which nodes are available to run tasks
# =====================================================
# Maps
while self.mapQueued()>0 and self.getIdleNodeMap() != None:
# Get a map that needs to be executed and assign it to a node
idleNode = self.getIdleNodeMap()
# TODO policy to decide when to approximate
#mapAttempt = self.getMapTask(approx=True if self.getNodesUtilization() > 1.8 else False)
mapAttempt = self.getMapTask()
mapAttempt.start = self.t
if mapAttempt.getJob().isMapDropping():
mapAttempt.drop()
mapAttempt.finish = self.t
mapAttempt.approx = False
completedJobs += mapAttempt.getJob().dropAttempt(mapAttempt)
# Log
self.history.logAttempt(mapAttempt)
else:
# Start running in a node
idleNode.assignMap(mapAttempt)
# Reduces
while self.redQueued()>0 and self.getIdleNodeRed() != None:
# Get a map that needs to be executed and assign it to a node
idleNode = self.getIdleNodeRed()
redAttempt = self.getRedTask()
redAttempt.start = self.t
if redAttempt.getJob().isRedDropping():
redAttempt.drop()
redAttempt.finish = self.t
# Log
self.history.logAttempt(redAttempt)
else:
idleNode.assignRed(redAttempt)
# Node management
# =====================================================
# Check if we need less nodes. Idle nodes.
if self.nodeManagement:
lessNodes = 0
lessNodes = min(len(self.getIdleNodesMap()), len(self.getIdleNodesRed()))
# Check if we need more nodes. Size of the queues.
moreNodes = 0
if lessNodes == 0:
moreNodesMaps = math.ceil(1.0*self.mapQueued() / 3) - self.getWakingNodes()
moreNodesReds = math.ceil(self.redQueued() / 1) - self.getWakingNodes()
moreNodes = max(moreNodesMaps, moreNodesReds, 0)
# Change node status
for node in self.nodes.values():
if node.status == 'ON' and not self.isNodeRequired(node.nodeId) and lessNodes > 0:
lessNodes -= 1
seconds = node.timeSleep
if isRealistic():
seconds = random.gauss(seconds, 0.1*seconds) #+/-10%
node.status = 'SLEEPING-%d' % seconds
self.history.logNodeStatus(self.t, node)
elif node.status == 'SLEEP' and moreNodes > 0:
moreNodes -= 1
seconds = node.timeWake
if isRealistic():
seconds = random.gauss(seconds, 0.1*seconds) #+/-10%
node.status = 'WAKING-%d' % seconds
self.history.logNodeStatus(self.t, node)
# Transition status
elif node.status.startswith('SLEEPING-'):
seconds = int(node.status[len('SLEEPING-'):]) - 1
if seconds <= 0:
node.status = 'SLEEP'
self.history.logNodeStatus(self.t, node)
else:
node.status = 'SLEEPING-%d' % seconds
elif node.status.startswith('WAKING-'):
seconds = int(node.status[len('WAKING-'):]) - 1
if seconds <= 0:
node.status = 'ON'
self.history.logNodeStatus(self.t, node)
else:
node.status = 'WAKING-%d' % seconds
# Account for power
power = 0.0
for node in self.nodes.values():
power += node.getPower()
self.history.logPower(self.t, power)
self.energy += 1.0*power # s x W = J
# Progress to next period
self.t += self.STEP
# Log final output
if self.logfile != None:
self.history.close()
viewer = HistoryViewer(self.history.getFilename())
viewer.generate()