-
Notifications
You must be signed in to change notification settings - Fork 0
/
app.py
230 lines (200 loc) · 8.96 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
import asyncio
import random
import subprocess
import sys
import time
from typing import List, Optional
import code128
from const import BAR_CODE_FILE_PATH, RECEIVER_LIST
from deployment import start_raiden_nodes
from raiden import RaidenNode, RaidenNodeMock
from server import Server
from track_control import (
TrackControl,
ArduinoSerial,
ArduinoTrackControl,
MockArduinoTrackControl,
BarrierEventTaskFactory,
BarrierLoopTaskRunner
)
from network import NetworkTopology
import logging
log = logging.getLogger()
ADDRESS_MAP = {address: index for index, address in enumerate(RECEIVER_LIST)}
class BarcodeHandler():
_address_map = ADDRESS_MAP
def save_barcode(self, address, nonce):
address, nonce = self._process_args(address, nonce)
self._save_barcode(address, nonce)
def _process_args(self, address, nonce):
address = self._address_map[address]
return address, nonce
def _save_barcode(self, address, nonce):
barcode = code128.image(str(address) + "," + str(nonce))
factor = 4
barcode = barcode.resize((int(barcode.width * factor), int(barcode.height * factor)))
barcode.save(str(BAR_CODE_FILE_PATH))
class TrainApp:
def __init__(self, track_control: TrackControl, raiden_nodes: List[RaidenNode],
network_topology: NetworkTopology,
barcode_handler: Optional[BarcodeHandler] = None):
self.track_control = track_control
self.raiden_nodes = raiden_nodes
self.network_topology = network_topology
self.barcode_handler = barcode_handler
self._track_loop = None
self._current_provider = None
self._provider_nonces = {provider.address: 0 for provider in self.raiden_nodes}
self._barrier_ltr = None
self._barrier_etf = None
self._possible_providers = None
def start(self):
"""
NOTE: it's necessary that the asyncio related instantiations are done at runtime,
because we need a running loop!
:return:
"""
self._barrier_ltr = BarrierLoopTaskRunner(self.track_control)
self._barrier_etf = BarrierEventTaskFactory(self.track_control)
self._barrier_ltr.start()
self._track_loop = asyncio.create_task(self.run())
# FIXME make awaitable so that errors can raise
# FIXME handle gracefully
def stop(self):
try:
self._track_loop.cancel()
self._barrier_ltr.stop()
except asyncio.CancelledError:
pass
async def run(self):
# TODO make sure that every neccessary task is running:
# (barrier_etf, barrier_ltr instantiated, etc)
log.info("Track loop started")
# Starting frontend
subprocess.Popen(
"DISPLAY=:0.0 "
"/home/train/processing-3.4/processing-java "
"--sketch=/home/train/demo-train/processing/sketchbook/railTrack --force --run",
shell=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
log.info("Started subprocess for Frontend")
time.sleep(5)
server = Server()
time.sleep(2)
server.start()
time.sleep(2)
self.track_control.power_on()
while True:
# Pick a random receiver
self._set_next_provider()
provider = self._current_provider
server.new_receiver(ADDRESS_MAP[self.current_provider_address])
current_nonce = self.current_nonce
# Sleeping 4 s to make sure that the train completely passed the light barrier
time.sleep(4)
payment_received_task = asyncio.create_task(
provider.ensure_payment_received(
sender_address=self.network_topology.sender_address,
token_address=self.network_topology.token_address,
nonce=current_nonce, poll_interval=0.05)
)
barrier_event_task = self._barrier_etf.create_await_event_task()
log.info('Waiting for payment to provider={}, nonce={}'.format(provider.address,
current_nonce))
# await both awaitables but return when one of them is finished first
done, pending = await asyncio.wait([payment_received_task, barrier_event_task],
return_when=asyncio.FIRST_COMPLETED)
payment_successful = False
if payment_received_task in done:
if payment_received_task.result() is True:
payment_successful = True
server.payment_received()
else:
assert barrier_event_task in done
assert payment_received_task in pending
# cancel the payment received task
for task in pending:
task.cancel()
if payment_successful is True:
log.info("Payment received")
assert barrier_event_task in pending
await barrier_event_task
server.barrier_triggered()
# increment the nonce after the barrier was triggered
self._increment_nonce_for_current_provider()
else:
log.info("Payment not received before next barrier trigger")
self.track_control.power_off()
server.payment_missing()
payment_received_task = asyncio.create_task(
provider.ensure_payment_received(
sender_address=self.network_topology.sender_address,
token_address=self.network_topology.token_address,
nonce=self.current_nonce,
poll_interval=0.05)
)
await payment_received_task
if payment_received_task.result() is True:
self._increment_nonce_for_current_provider()
self.track_control.power_on()
server.payment_received()
log.info("Payment received, turning track power on again")
else:
# this shouldn't happen
# FIXME remove assert in production code
assert False
def _on_new_provider(self):
if self.barcode_handler is not None:
self.barcode_handler.save_barcode(self.current_provider_address, self.current_nonce)
def set_possible_providers(self, nodes):
if not set(nodes).issubset(set(self.raiden_nodes)):
raise ValueError("Possible providers have to be known raiden nodes.")
self._possible_providers = nodes
def _set_next_provider(self):
nodes_to_choose_from = self._possible_providers or self.raiden_nodes
self._current_provider = random.choice(nodes_to_choose_from)
self._on_new_provider()
@property
def current_provider_address(self):
return self._current_provider.address
@property
def current_nonce(self):
return self._provider_nonces[self.current_provider_address]
def _increment_nonce_for_current_provider(self):
self._provider_nonces[self.current_provider_address] += 1
@classmethod
def build_app(cls, network: NetworkTopology, mock_arduino=False, mock_raiden=False,
config_file=None, possible_receiver_addresses=None):
raiden_node_cls = RaidenNode
if mock_arduino:
log.debug('Mocking Arduino serial')
arduino_track_control = MockArduinoTrackControl()
else:
arduino_serial = ArduinoSerial(port='/dev/ttyACM0', baudrate=9600, timeout=.1)
# arduino_serial = ArduinoSerial(port='/dev/cu.usbmodem1421', baudrate=9600, timeout=.1)
arduino_track_control = ArduinoTrackControl(arduino_serial)
arduino_track_control.connect()
if mock_raiden:
raiden_node_cls = RaidenNodeMock
log.debug('Mocking RaidenNode')
# TODO for mock nodes, we should skip the deployment script
# FIXME asyncio.run() is not the correct method
try:
raiden_nodes_dict = asyncio.run(
start_raiden_nodes(raiden_node_cls, receivers=network.receivers,
config_file=config_file))
except (asyncio.TimeoutError, TimeoutError):
log.info(
'Not all raiden nodes could get started, check the log files for more info. Shutting down')
sys.exit()
raiden_nodes = list(raiden_nodes_dict.values())
# TODO
track_control = TrackControl(arduino_track_control)
barcode_handler = BarcodeHandler()
obj = cls(track_control, raiden_nodes, network, barcode_handler)
if possible_receiver_addresses is not None:
possible_receiver_nodes = [raiden_nodes_dict[addr] for addr in possible_receiver_addresses]
obj.set_possible_providers(possible_receiver_nodes)
return obj