This repository has been archived by the owner on Dec 3, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
145 lines (112 loc) · 3.69 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import os
import pickle
from xmlrpc.client import ServerProxy
from collections import namedtuple
from enum import Enum
import utils
# Info about chunk:
# :param chunk_position: number position in file
# :param chunk_name: chunk name
# :param main_server_id: id of server where main copy stored
# :param replica_server_id: id of server where replica stored
ChunkInfo = namedtuple('ChunkInfo', ['chunk_position', 'chunk_name', 'main_server_id', 'replica_server_id'])
def get_chuck_info(dict):
return namedtuple('ChunkInfo', dict.keys())(**dict)
# def get_master_address():
# return "localhost:8000"
#
#
# def get_slaves_info():
# return [
# (1, "localhost:8001"),
# (2, "localhost:8002"),
# (3, "localhost:8003"),
# (4, "localhost:8004")
# ]
def get_configuration():
return {"mappers_num": 2, "reducers_num": 2}
def get_master_address():
return "localhost", 8000
# return "server", 8000
def get_slaves_info():
return [
(1, ('localhost', 8001)),
(2, ('localhost', 8002)),
(3, ('localhost', 8003)),
(4, ('localhost', 8004))
]
# return [
# (1, ('node1', 8001)),
# (2, ('node2', 8002)),
# (3, ('node3', 8003)),
# (4, ('node4', 8004))
# ]
def write_content(self, path, content):
full_path = self.root_directory + '/' + path
with open(full_path, mode='x') as file:
file.write(content)
print('The result is written to ' + full_path)
def get_file_content(path):
if not os.path.exists(path):
raise FileNotFoundError()
with open(path, mode='r') as file:
return file.read()
MAPPER_CONTENT_PATH = os.path.abspath(os.path.dirname(__file__))
REDUCER_CONTENT_PATH = os.path.abspath(os.path.dirname(__file__))
def get_map_code(map_fun_path):
return utils.get_file_content(MAPPER_CONTENT_PATH + '/' + map_fun_path)
def get_reducer_code(reduce_fun_path):
return utils.get_file_content(REDUCER_CONTENT_PATH + '/' + reduce_fun_path)
class DirFileEnum:
"""
Enum for provide info about path, is it directory, file, or neither
"""
Error = 0
Directory = 1
File = 2
class FileInfo(object):
"""
Helper for creating stubs of real DFS files at NamingServer's file system
"""
def __init__(self, path, size, chunks_info_list):
"""
:param path: path to file
:param size: file size
:param chunks_info_list: list of ChunkInfo objects
"""
self.size = size
self.path = path
self.chunks = chunks_info_list
def save_file(self):
"""Serializes and saves object to real file system, using path of DFS file"""
with open(self.path, mode='wb') as file:
pickle.dump(self, file)
@staticmethod
def get_file_info(path):
"""
Method to create FIleInfo object using serialized FileInfo object on disk
:return: FileInfo object
"""
with open(path, mode='rb') as file:
return pickle.load(file)
class SlaveStatus(Enum):
Down = 0
Up = 1
class StorageServerInfo:
"""
Helper object that represents particular StorageServer at NamingServer
"""
def __init__(self, server_id, address):
"""
:param server_id:
:param address: RPC network address in format domain_name:port or ip:port
"""
self.id = server_id
self.address = address
self.proxy = self.init_proxy()
self.status = SlaveStatus.Up
# Helping structure to know which files stored on that server
# contains only paths
self.files = set()
def init_proxy(self):
return ServerProxy('http://' + self.address[0] + ':' + str(self.address[1]))