-
Notifications
You must be signed in to change notification settings - Fork 14
/
basic_test.py
executable file
·108 lines (89 loc) · 3.8 KB
/
basic_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
"""
Temporary development test for quick-hacking.
Probably should be deleted on 'final release'.
It's pretty illustrative, however.
For bettter examples on C API wrappers look at test/test_capi.py.
"""
from __future__ import print_function
import random
import logging
import multiprocessing
import networkx
from pysimgrid import simdag
import pysimgrid.simdag.algorithms as algorithms
_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
_LOG_FORMAT = "[%(name)s] [%(levelname)5s] [%(asctime)s] %(message)s"
logging.basicConfig(level=logging.DEBUG, format=_LOG_FORMAT, datefmt=_LOG_DATE_FORMAT)
class RandomSchedule(simdag.StaticScheduler):
def get_schedule(self, simulation):
schedule = {host: [] for host in simulation.hosts}
graph = simulation.get_task_graph()
for task in networkx.topological_sort(graph):
schedule[random.choice(simulation.hosts)].append(task)
return schedule
class SimpleDynamic(simdag.DynamicScheduler):
def prepare(self, simulation):
for h in simulation.hosts:
h.data = {}
def schedule(self, simulation, changed):
for h in simulation.hosts:
h.data["free"] = True
for task in simulation.tasks[simdag.TaskState.TASK_STATE_RUNNING, simdag.TaskState.TASK_STATE_SCHEDULED]:
task.hosts[0].data["free"] = False
for t in simulation.tasks[simdag.TaskState.TASK_STATE_SCHEDULABLE]:
free_hosts = simulation.hosts.by_data("free", True).sorted(lambda h: t.get_eet(h))
if free_hosts:
t.schedule(free_hosts[0])
free_hosts[0].data["free"] = False
else:
break
_SCHEDULERS = {
"MinMin": algorithms.BatchMin,
"MaxMin": algorithms.BatchMax,
"Sufferage": algorithms.BatchSufferage,
"DLS": algorithms.DLS,
"RandomSchedule": algorithms.RandomStatic,
"SimpleDynamic": SimpleDynamic,
"MCT": algorithms.MCT,
"OLB": algorithms.OLB,
"HCPT": algorithms.HCPT,
"HEFT": algorithms.HEFT,
"Lookahead": algorithms.Lookahead,
"PEFT": algorithms.PEFT
}
def run_simulation(scheduler):
scheduler_class = _SCHEDULERS[scheduler]
with simdag.Simulation("test/data/pl_4hosts_master.xml", "dag/tasks_exp2/testg0.6.dot") as simulation:
print("Scheduler:", scheduler, scheduler_class)
scheduler = scheduler_class(simulation)
scheduler.run()
print("Scheduler time:", scheduler.scheduler_time)
def main():
# single run in current process mode, used for profiling
if True:
#with simdag.Simulation("test/data/pl_4hosts.xml", "test/data/basic_graph.dot") as simulation:
with simdag.Simulation("dag/plat_exp1/cluster_5_1-4_100_100_0.xml", "dag/tasks_exp1/Inspiral_100.xml") as simulation:
#with simdag.Simulation("dag/plat_exp1/cluster_20_1-4_100_100_0.xml", "dag/tasks_exp2/testg0.6.dot") as simulation:
#graph = simulation.get_task_graph()
#scheduler = algorithms.HEFT(simulation)
#scheduler = algorithms.DLS(simulation)
#scheduler = algorithms.HCPT(simulation)
scheduler = algorithms.Lookahead(simulation)
#scheduler = algorithms.OLB(simulation)
#scheduler = algorithms.BatchMin(simulation)
#scheduler = algorithms.PEFT(simulation)
#scheduler = algorithms.SimHEFT(simulation)
scheduler.run()
for t in simulation.tasks.sorted(lambda t: t.start_time):
print(t.name, t.start_time, t.finish_time, t.hosts[0].name)
print(scheduler.scheduler_time, scheduler.total_time)
print("EXEC", sum([(t.finish_time - t.start_time) for t in simulation.tasks]))
print("COMM", sum([(t.finish_time - t.start_time) for t in simulation.connections]))
return
# example: how to run multiple simulations in a single script (circumventing SimGrid limitation of 'non-restartable' simulator state)
for scheduler in _SCHEDULERS.keys():
p = multiprocessing.Process(target=run_simulation, args=(scheduler,))
p.start()
p.join()
if __name__ == '__main__':
main()