-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmbta.py
94 lines (77 loc) · 2.79 KB
/
mbta.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
from mbtastate import MBTAState, PREDICTIONS_TO_WATCH
import requests
from json import loads
import time
import sys
from redis import Redis
API_KEY = "9dd6a96deb434cca901b48f3a09b9479"
API_BASE = "https://api-v3.mbta.com"
headers = {"x-api-key": API_KEY, "accept": "text/event-stream"}
DEBUG = False
# MBTA Stuff
def raise_error_event():
print("Tried to apply an empty event - u suck bro")
class MBTAEvent:
def set_type(self, type_str):
if type_str == "reset":
self._type = lambda s, e: s.apply_reset(e)
elif type_str == "add":
self._type = lambda s, e: s.apply_add(e)
elif type_str == "update":
self._type = lambda s, e: s.apply_update(e)
elif type_str == "remove":
self._type = lambda s, e: s.apply_remove(e)
else:
raise ValueError("Unknown event type: {}".format(type_str))
def set_data(self, line):
self._data = line
def clear(self):
self._type = lambda s, e: raise_error_event()
self._data = ""
def get_type(self):
return self._type
def get_data(self):
if self._data:
return loads(self._data[self._data.index(" ") + 1 :])
return None
stop_ids = list(map(lambda a: a[0], PREDICTIONS_TO_WATCH))
direction_ids = list(map(lambda a: a[1], PREDICTIONS_TO_WATCH))
state = MBTAState(
Redis(host="127.0.0.1", port="6379", charset="utf-8", decode_responses=True)
)
pred_url = API_BASE + "/predictions?filter[stop]={}&filter[direction_id]={}&include=trip".format(
",".join(stop_ids), ",".join(direction_ids)
)
print("Connecting to {}".format(pred_url))
pred_stream_r = requests.get(pred_url, headers=headers, stream=True)
event_type_line = True
current_event = MBTAEvent()
stream_iterator = pred_stream_r.iter_lines(decode_unicode=True)
while True:
try:
line = next(stream_iterator)
except:
print("Connection broken, recreating")
pred_stream_r = requests.get(pred_url, headers=headers, stream=True)
stream_iterator = pred_stream_r.iter_lines(decode_unicode=True)
continue
# filter out keep-alive new lines
if line:
if DEBUG:
print("Event line: {}".format(line))
if event_type_line:
type_str = line.split(" ")[-1]
# filter out keep-alive events
if type_str == "keep-alive":
continue
# handle event-type line
current_event.set_type(type_str)
event_type_line = False
else:
# handle event-data line
current_event.set_data(line)
with state.acquire_lock():
current_event.get_type()(state, current_event) # Apply event
# state_update_event.set()
current_event.clear()
event_type_line = True