From d3c77e59444b917d9ea71cddf0bb78fb893bd058 Mon Sep 17 00:00:00 2001 From: Kyler Chin <7539174+kylerchin@users.noreply.github.com> Date: Wed, 24 Apr 2024 14:24:54 -0700 Subject: [PATCH] Save data before appending to job queue --- src/aspen/import_alpenrose.rs | 6 +-- src/aspen/lib.rs | 3 -- src/aspen/main.rs | 73 +++++++++++++++++++++++++++++++++-- 3 files changed, 73 insertions(+), 9 deletions(-) diff --git a/src/aspen/import_alpenrose.rs b/src/aspen/import_alpenrose.rs index 1014097c..d6025c38 100644 --- a/src/aspen/import_alpenrose.rs +++ b/src/aspen/import_alpenrose.rs @@ -61,7 +61,7 @@ pub async fn new_rt_data( let vehicles_gtfs_rt = match vehicles_response_code { Some(200) => match vehicles { - Some(v) => match parse_gtfs_rt_message(&v.as_slice()) { + Some(v) => match parse_gtfs_rt_message(v.as_slice()) { Ok(v) => Some(v), Err(e) => { println!("Error decoding vehicles: {}", e); @@ -75,7 +75,7 @@ pub async fn new_rt_data( let trips_gtfs_rt = match trips_response_code { Some(200) => match trips { - Some(t) => match parse_gtfs_rt_message(&t.as_slice()) { + Some(t) => match parse_gtfs_rt_message(t.as_slice()) { Ok(t) => Some(t), Err(e) => { println!("Error decoding trips: {}", e); @@ -89,7 +89,7 @@ pub async fn new_rt_data( let alerts_gtfs_rt = match alerts_response_code { Some(200) => match alerts { - Some(a) => match parse_gtfs_rt_message(&a.as_slice()) { + Some(a) => match parse_gtfs_rt_message(a.as_slice()) { Ok(a) => Some(a), Err(e) => { println!("Error decoding alerts: {}", e); diff --git a/src/aspen/lib.rs b/src/aspen/lib.rs index 49d861b8..ec80b31e 100644 --- a/src/aspen/lib.rs +++ b/src/aspen/lib.rs @@ -69,9 +69,6 @@ pub struct ChateausLeaderHashMap { pub struct ProcessAlpenroseData { pub chateau_id: String, pub realtime_feed_id: String, - pub vehicles: Option>, - pub trips: Option>, - pub alerts: Option>, pub has_vehicles: bool, pub has_trips: bool, pub has_alerts: bool, diff --git a/src/aspen/main.rs b/src/aspen/main.rs index 89743d19..a50fc845 100644 --- a/src/aspen/main.rs +++ b/src/aspen/main.rs @@ -99,12 +99,79 @@ impl AspenRpc for AspenServer { alerts_response_code: Option, time_of_submission_ms: u64, ) -> bool { + + let vehicles_gtfs_rt = match vehicles_response_code { + Some(200) => match vehicles { + Some(v) => match parse_gtfs_rt_message(v.as_slice()) { + Ok(v) => Some(v), + Err(e) => { + println!("Error decoding vehicles: {}", e); + None + } + }, + None => None, + }, + _ => None, + }; + + let trips_gtfs_rt = match trips_response_code { + Some(200) => match trips { + Some(t) => match parse_gtfs_rt_message(t.as_slice()) { + Ok(t) => Some(t), + Err(e) => { + println!("Error decoding trips: {}", e); + None + } + }, + None => None, + }, + _ => None, + }; + + let alerts_gtfs_rt = match alerts_response_code { + Some(200) => match alerts { + Some(a) => match parse_gtfs_rt_message(a.as_slice()) { + Ok(a) => Some(a), + Err(e) => { + println!("Error decoding alerts: {}", e); + None + } + }, + None => None, + }, + _ => None, + }; + + //get and update raw gtfs_rt data + + println!("Parsed FeedMessages for {}", realtime_feed_id); + + if let Some(vehicles_gtfs_rt) = &vehicles_gtfs_rt { + authoritative_gtfs_rt + .entry((realtime_feed_id.clone(), GtfsRtType::VehiclePositions)) + .and_modify(|gtfs_data| *gtfs_data = vehicles_gtfs_rt.clone()) + .or_insert(vehicles_gtfs_rt.clone()); + } + + if let Some(trip_gtfs_rt) = &trips_gtfs_rt { + authoritative_gtfs_rt + .entry((realtime_feed_id.clone(), GtfsRtType::TripUpdates)) + .and_modify(|gtfs_data| *gtfs_data = trip_gtfs_rt.clone()) + .or_insert(trip_gtfs_rt.clone()); + } + + if let Some(alerts_gtfs_rt) = &alerts_gtfs_rt { + authoritative_gtfs_rt + .entry((realtime_feed_id.clone(), GtfsRtType::Alerts)) + .and_modify(|gtfs_data| *gtfs_data = alerts_gtfs_rt.clone()) + .or_insert(alerts_gtfs_rt.clone()); + } + + println!("Saved FeedMessages for {}", realtime_feed_id); + self.alpenrose_to_process_queue.push(ProcessAlpenroseData { chateau_id, realtime_feed_id, - vehicles, - trips, - alerts, has_vehicles, has_trips, has_alerts,