-
Notifications
You must be signed in to change notification settings - Fork 0
/
utility_functions.py
148 lines (126 loc) · 3.65 KB
/
utility_functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import collections
import logging
import os
import errno
from construct import Container
from twisted.python.filepath import FilePath
import packets
path = FilePath(os.path.dirname(os.path.abspath(__file__)))
logger = logging.getLogger('starrypy.utility_functions')
class Singleton(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(
*args, **kwargs
)
return cls._instances[cls]
def give_item_to_player(player_protocol, item, count=1):
logger.debug(
'Attempting to give item %s (count: %s) to %s',
item,
count,
player_protocol.player.name
)
item_count = int(count)
hard_max = 90000
if item_count > hard_max:
logger.warn(
'Attempted to give more items than the max allowed (%s). '
'Capping amount.',
hard_max
)
item_count = hard_max
maximum = 1000
given = 0
while item_count > 0:
x = item_count
if x > maximum:
x = maximum
item_packet = build_packet(
packets.Packets.GIVE_ITEM, packets.give_item_write(item, x + 1)
)
player_protocol.transport.write(item_packet)
item_count -= x
given += x
return given
def recursive_dictionary_update(d, u):
for k, v in u.iteritems():
if isinstance(v, collections.Mapping):
r = recursive_dictionary_update(d.get(k, {}), v)
d[k] = r
else:
d[k] = u[k]
return d
def build_packet(packet_type, data):
"""
Convenience method to build packets for sending.
:param packet_type: An integer 1 <= packet_type <= 53
:param data: Data to send.
:return: The build packet.
:rtype : str
"""
length = len(data)
return packets.packet().build(
Container(
id=packet_type,
payload_size=length,
data=data
)
)
class Planet(object):
def __init__(self, x, y, z, planet, satellite):
self.x = x
self.y = y
self.z = z
self.planet = planet
self.satellite = satellite
def __str__(self):
return '{}:{}:{}:{}:{}'.format(
self.x, self.y, self.z, self.planet, self.satellite
)
def move_ship_to_coords(protocol, x, y, z, planet, satellite):
logger.info(
'Moving %s\'s ship to coordinates: %s',
protocol.player.name,
':'.join(map(str, (x, y, z, planet, satellite)))
)
x, y, z, planet, satellite = map(int, (x, y, z, planet, satellite))
warp_packet = build_packet(
packets.Packets.FLY_SHIP,
packets.fly_ship_write(
x=x,
y=y,
z=z,
planet=planet,
satellite=satellite
)
)
protocol.client_protocol.transport.write(warp_packet)
def extract_name(l):
name = []
if l[0][0] not in ["'", '"']:
return l[0], l[1:]
name.append(l[0][1:])
terminator = l[0][0]
for idx, s in enumerate(l[1:]):
if s[-1] == terminator:
name.append(s[:-1])
if idx + 2 != len(l):
return ' '.join(name), l[idx + 2:]
else:
return ' '.join(name), None
else:
name.append(s)
raise ValueError(
'Final terminator character of <%s> not found'.format(terminator)
)
def verify_path(path):
"""
Helper function to make sure path exists, and create if it doesn't.
"""
try:
os.makedirs(path)
except OSError as exception:
if exception.errno != errno.EEXIST:
raise