This repository has been archived by the owner on Dec 3, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
job_tracker.py
155 lines (131 loc) · 4.82 KB
/
job_tracker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# TODO create named tuple JobTrackerOptions for params
import os
import shutil
import utils
import mapreduce
class JobTracker:
def __init__(self):
self.results = []
slaves_info = utils.get_slaves_info()
self.slaves = [utils.StorageServerInfo(server[0], server[1]) for server in slaves_info]
config = utils.get_configuration()
self.mappers_num = config['mappers_num']
self.reducers_num = config['reducers_num']
self.is_job_finished = False
# ==========================================
# API
# ==========================================
def init_naming_server(self, naming_server):
self.naming_server = naming_server
# ==========================================
# RPC API
# ==========================================
def startup(self):
pass
def shutdown(self):
pass
def start_job(self, data_path, map_function, reduce_function):
"""
Starts job
:param data_path: path to the file with data to process by map/reduce
:param map_function: mapper function in str
:param reduce_function: reducer function in str
"""
if os.path.exists("files/filesystem/results"):
shutil.rmtree("files/filesystem/results")
os.mkdir("files/filesystem/results")
self.is_job_finished = False
self.results.clear()
self.reduce_fun = reduce_function
mappers = self._get_mapper_servers()
self.mappers_status = {mapper.id: False for mapper in mappers}
chunks_info = self.naming_server.read(data_path)
chunks_count = len(chunks_info)
# chunks_per_mapper = chunks_count/self.mappers_num
chunks_for_mappers = []
for item in range(self.mappers_num):
chunks_for_mappers.append([])
for i in range(chunks_count):
chunks_for_mappers[i % self.mappers_num].append(chunks_info[i])
reducers_ids = [x.id for x in self._get_reducer_servers()]
reducers_ids.sort()
for i in range(self.mappers_num):
mappers[i].proxy.init_mapper(chunks_for_mappers[i], map_function, reducers_ids)
def stop_job(self):
pass
def man_up(self, server_id):
"""
Notification that server is up after being down
:param server_id: server that turned on
"""
pass
def man_down(self, server_id):
"""
Notification that server is went down
:param server_id: server that went down
"""
pass
def map_finished(self, server_id):
"""
Notification that map finished on particular server
Runs reducers when all mappers finished the work
:param server_id: mapper that finished the work
"""
self.mappers_status[server_id] = True
for key, value in self.mappers_status.items():
if not value:
return
self._start_reducers()
def reduce_finished(self, server_id, reduced_file_path):
"""
Notification from reducer that it finished his work
:param server_id: server id of reducer
:param reduced_file_path: result file path in DFS
"""
self.results.append(reduced_file_path)
self.reducers_status[server_id] = True
for reducer in self.reducers_status.items():
if not reducer[1]:
return
self._process_results()
pass
# ==========================================
# Private
# ==========================================
def _start_reducers(self):
"""
Starts reducers
"""
reducers = self._get_reducer_servers()
self.reducers_status = {reducer.id: False for reducer in reducers}
for reducer in reducers:
reducer.proxy.init_reducer(list(self.mappers_status.keys()), self.reduce_fun)
def _get_mapper_servers(self):
"""
:return: List of servers which should be mappers
"""
return self.slaves[:self.mappers_num]
def _get_reducer_servers(self):
"""
:return: List of servers which should be reducers
"""
return self.slaves[self.mappers_num:self.mappers_num + self.reducers_num]
def _process_results(self):
result = []
for x in self.results:
with open(x, 'r') as file:
content = file.read()
result.extend(mapreduce.Jobber.split_to_list(content))
result.sort(key=lambda x: x[1])
for x in result:
print(str(x))
self.is_job_finished = True
pass
def check_job_status(self):
"""Return status of the job"""
return self.is_job_finished
def get_results(self):
"""Returns list of result files paths in dfs"""
if self.is_job_finished:
return self.results
return None