Skip to content

Commit

Permalink
Merge pull request #11 from HSLdevcom/oulu-vp
Browse files Browse the repository at this point in the history
Oulu vp
  • Loading branch information
vesameskanen authored Nov 6, 2019
2 parents 060f115 + c1c528f commit 14c182e
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 25 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ COPY requirements.txt /usr/src/app/
RUN pip install -r requirements.txt
COPY gtfs_realtime_pb2.py /usr/src/app/
COPY gtfsrthttp2mqtt.py /usr/src/app/
COPY route_utils.py /usr/src/app/
COPY utils.py /usr/src/app/

CMD ["python", "gtfsrthttp2mqtt.py" ]
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ You need to configure at least the following env variables that are marked as ma
* (mandatory) "FEED_URL" URL for the HTTP(S) GTFS RT data source
* (optional) "USERNAME" username for publishing to a MQTT broker
* (optional) "PASSWORD" password for publishing to a MQTT broker
* (optional) "INTERVAL" how long to wait between fetching new data from HTTP(S) data feed
* (optional, default 5) "INTERVAL" how long to wait in seconds between fetching new data from HTTP(S) data feed
* (optional, default https://dev-api.digitransit.fi/routing/v1/routers/waltti/index/graphql) "OTP_URL" defines where to fetch otp data from
* (optional, default 3600) "OTP_INTERVAL" defines in seconds the wait time between fetching new data from OTP
63 changes: 50 additions & 13 deletions gtfsrthttp2mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from requests.packages.urllib3.util.retry import Retry

import gtfs_realtime_pb2
import route_utils
import utils


## https://stackoverflow.com/questions/22498038/improve-current-implementation-of-a-setinterval-python/22498708#22498708
Expand Down Expand Up @@ -36,6 +36,7 @@ def __init__(self, mqttConnect, mqttCredentials, baseMqttTopic, gtfsrtFeedURL, f
retry = Retry(connect=60, backoff_factor=1.5)
adapter = HTTPAdapter(max_retries=retry)
self.session.mount(gtfsrtFeedURL, adapter)
self.OTPData = None



Expand All @@ -45,10 +46,11 @@ def onMQTTConnected(self, client, userdata, flags, rc):
return False
if self.mqttConnected is True:
print("Reconnecting and restarting poller")
self.cancelPoller.cancel()
self.GTFSRTPoller()
self.mqttConnected = True

self.doOTPPolling() #first round of polling otp data
self.startGTFSRTPolling()
self.startOTPPolling()

def connectMQTT(self):
self.client = mqtt.Client()
Expand All @@ -59,9 +61,9 @@ def connectMQTT(self):
self.client.loop_forever()

def startGTFSRTPolling(self):
print("Starting poller")
print("Starting GTFS RT poller")
polling_interval = int(os.environ.get('INTERVAL', 5))
self.cancelPoller = call_repeatedly(polling_interval, self.doGTFSRTPolling)
self.GTFSRTPoller = call_repeatedly(polling_interval, self.doGTFSRTPolling)

def doGTFSRTPolling(self):
print("doGTFSRTPolling", time.ctime())
Expand All @@ -87,10 +89,10 @@ def doGTFSRTPolling(self):

nent.CopyFrom(entity)

route_id = route_utils.parse_route_id(self.feedName, entity.vehicle.trip.route_id)
direction_id = entity.vehicle.trip.direction_id
trip_headsign = entity.vehicle.vehicle.label
trip_id = entity.vehicle.trip.trip_id
route_id = utils.parse_route_id(self.feedName, entity.vehicle.trip.route_id, trip_id, self.OTPData)
direction_id = utils.parse_direction_id(self.feedName, entity.vehicle.trip.direction_id, trip_id, self.OTPData)
trip_headsign = entity.vehicle.vehicle.label
latitude = "{:.6f}".format(entity.vehicle.position.latitude) # Force coordinates to have 6 numbers
latitude_head = latitude[:2]
longitude = "{:.6f}".format(entity.vehicle.position.longitude)
Expand All @@ -99,22 +101,56 @@ def doGTFSRTPolling(self):
geohash_firstdeg = latitude[3] + "" + longitude[3]
geohash_seconddeg = latitude[4] + "" + longitude[4]
geohash_thirddeg = latitude[5] + "" + longitude[5]
stop_id = entity.vehicle.stop_id
start_time = entity.vehicle.trip.start_time[0:5] # hh:mm
vehicle_id = entity.vehicle.vehicle.id
short_name = utils.parse_short_name(self.feedName, trip_id, route_id, self.OTPData)

# gtfsrt/vp/<feed_name>/<agency_id>/<agency_name>/<mode>/<route_id>/<direction_id>/<trip_headsign>/<trip_id>/<next_stop>/<start_time>/<vehicle_id>/<geohash_head>/<geohash_firstdeg>/<geohash_seconddeg>/<geohash_thirddeg>
# gtfsrt/vp/<feed_name>/<agency_id>/<agency_name>/<mode>/<route_id>/<direction_id>/<trip_headsign>/<trip_id>/<next_stop>/<start_time>/<vehicle_id>/<geohash_head>/<geohash_firstdeg>/<geohash_seconddeg>/<geohash_thirddeg>/<short_name>/
# GTFS RT feed used for testing was missing some information so those are empty
full_topic = '{0}/{1}////{2}/{3}/{4}/{5}//{6}/{7}/{8}/{9}/{10}/{11}/'.format(
full_topic = '{0}/{1}////{2}/{3}/{4}/{5}/{6}/{7}/{8}/{9}/{10}/{11}/{12}/{13}/'.format(
self.baseMqttTopic, self.feedName, route_id, direction_id,
trip_headsign, trip_id, start_time, vehicle_id, geohash_head, geohash_firstdeg,
geohash_seconddeg, geohash_thirddeg)
trip_headsign, trip_id, stop_id, start_time, vehicle_id, geohash_head, geohash_firstdeg,
geohash_seconddeg, geohash_thirddeg, short_name)

sernmesg = nfeedmsg.SerializeToString()
self.client.publish(full_topic, sernmesg)
except:
print(r.content)
raise

def startOTPPolling(self):
print("Starting OTP poller")
polling_interval = int(os.environ.get('OTP_INTERVAL', 60 * 60)) # default 1 hour
self.OTPPoller = call_repeatedly(polling_interval, self.doOTPPolling)

def doOTPPolling(self):
OTP_URL = os.environ.get('OTP_URL', 'https://dev-api.digitransit.fi/routing/v1/routers/waltti/index/graphql')
otp_polling_session = requests.Session()
retry = Retry(
total=30,
read=30,
connect=30,
backoff_factor=10,
method_whitelist=frozenset(['GET', 'OPTIONS', 'POST'])
)
adapter = HTTPAdapter(max_retries=retry)
otp_polling_session.mount(OTP_URL, adapter)
query = utils.get_OTP_query(self.feedName)

try:
response = otp_polling_session.post(OTP_URL, json={'query': query})
except Exception as x:
print('Failed to fetch OTP data :(', x.__class__.__name__)
else:
print('Fetched new OTP data')
data_dictionary = {}
data_type = 'routes' if 'routes' in response.json()['data'] else 'trips'
for element in response.json()['data'][data_type]:
gtfsId = element['gtfsId']
del element['gtfsId']
data_dictionary[gtfsId] = element
self.OTPData = data_dictionary

if __name__ == '__main__':
gh2mt = GTFSRTHTTP2MQTTTransformer(
Expand All @@ -128,4 +164,5 @@ def doGTFSRTPolling(self):
try:
gh2mt.connectMQTT()
finally:
gh2mt.cancelPoller()
gh2mt.OTPPoller()
gh2mt.GTFSRTPoller()
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ protobuf==3.6.1
public==2018.11.20
query-string==2018.11.20
request==2018.11.20
requests==2.21.0
requests==2.22.0
six==1.12.0
urllib3==1.24.1
urllib3==1.25.6
8 changes: 0 additions & 8 deletions route_utils.py

This file was deleted.

61 changes: 61 additions & 0 deletions utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
def parse_route_id(feed, route_id, trip_id, otp_data):
if feed == "tampere":
if len(route_id) > 5 and route_id[-5:] == "47374":
return route_id[0:-5]
else:
return route_id[0:-4]
elif feed == "OULU":
feed_scoped_id = "OULU:" + trip_id
if otp_data == None or feed_scoped_id not in otp_data:
return ""
return otp_data[feed_scoped_id]["route"]["gtfsId"].split(':')[1]
return route_id

def parse_direction_id(feed, direction_id, trip_id, otp_data):
if feed == "OULU":
feed_scoped_id = "OULU:" + trip_id
if otp_data == None or feed_scoped_id not in otp_data:
return ""
return str(otp_data[feed_scoped_id]["pattern"]["directionId"])
return direction_id

def parse_short_name(feed, trip_id, route_id, otp_data):
if otp_data == None:
return ""
elif feed == "OULU":
feed_scoped_id = "OULU:" + trip_id
if feed_scoped_id not in otp_data:
return ""
return otp_data[feed_scoped_id]["route"]["shortName"]

feed_scoped_id = feed + ":" + route_id
if feed_scoped_id not in otp_data:
return ""
return otp_data[feed + ":" + route_id]["shortName"]

def get_OTP_query(feed):
if feed == "OULU":
return """
{
trips(feeds: [\"OULU\"]) {
route {
shortName
gtfsId
}
gtfsId
pattern {
directionId
}
}
}
"""
else:
return """
{
routes(feeds: [\"%s\"]) {
gtfsId
shortName
}
}
""" % feed

0 comments on commit 14c182e

Please sign in to comment.