From 55ebea30d8009c48f123f2f7a285a4afad05f9ba Mon Sep 17 00:00:00 2001 From: Kyler Chin <7539174+kylerchin@users.noreply.github.com> Date: Tue, 23 Apr 2024 04:54:46 -0700 Subject: [PATCH] Finish Aspen vehicle data lookup and hydrater system --- src/aspen/import_alpenrose.rs | 237 ++++++++++++++++++++++++++++++---- src/aspen/main.rs | 7 +- src/lib.rs | 12 +- 3 files changed, 220 insertions(+), 36 deletions(-) diff --git a/src/aspen/import_alpenrose.rs b/src/aspen/import_alpenrose.rs index 4346545c..d1bffd51 100644 --- a/src/aspen/import_alpenrose.rs +++ b/src/aspen/import_alpenrose.rs @@ -11,14 +11,15 @@ use diesel::ExpressionMethods; use diesel::QueryDsl; use diesel::SelectableHelper; use diesel_async::RunQueryDsl; +use gtfs_rt::FeedMessage; use gtfs_rt::TripUpdate; use prost::Message; use scc::HashMap as SccHashMap; use std::collections::BTreeMap; use std::collections::HashMap; +use std::collections::HashSet; use std::sync::Arc; use tokio::sync::RwLock; -use gtfs_rt::FeedMessage; const MAKE_VEHICLES_FEED_LIST: [&str; 9] = [ "f-mta~nyc~rt~subway~1~2~3~4~5~6~7", @@ -34,7 +35,7 @@ const MAKE_VEHICLES_FEED_LIST: [&str; 9] = [ pub async fn new_rt_data( authoritative_data_store: Arc>, - authoritative_gtfs_rt:Arc>, + authoritative_gtfs_rt: Arc>, chateau_id: String, realtime_feed_id: String, vehicles: Option>, @@ -88,20 +89,24 @@ pub async fn new_rt_data( //get and update raw gtfs_rt data if let Some(vehicles_gtfs_rt) = &vehicles_gtfs_rt { - authoritative_gtfs_rt.entry((realtime_feed_id.clone(), GtfsRtType::VehiclePositions)).and_modify(|gtfs_data| *gtfs_data = vehicles_gtfs_rt.clone()) - .or_insert(vehicles_gtfs_rt.clone()); + authoritative_gtfs_rt + .entry((realtime_feed_id.clone(), GtfsRtType::VehiclePositions)) + .and_modify(|gtfs_data| *gtfs_data = vehicles_gtfs_rt.clone()) + .or_insert(vehicles_gtfs_rt.clone()); } if let Some(trip_gtfs_rt) = &trips_gtfs_rt { - authoritative_gtfs_rt.entry((realtime_feed_id.clone(), GtfsRtType::TripUpdates)) - .and_modify(|gtfs_data| *gtfs_data = trip_gtfs_rt.clone()) - .or_insert(trip_gtfs_rt.clone()); + authoritative_gtfs_rt + .entry((realtime_feed_id.clone(), GtfsRtType::TripUpdates)) + .and_modify(|gtfs_data| *gtfs_data = trip_gtfs_rt.clone()) + .or_insert(trip_gtfs_rt.clone()); } if let Some(alerts_gtfs_rt) = &alerts_gtfs_rt { - authoritative_gtfs_rt.entry((realtime_feed_id.clone(), GtfsRtType::Alerts)) - .and_modify(|gtfs_data| *gtfs_data = alerts_gtfs_rt.clone()) - .or_insert(alerts_gtfs_rt.clone()); + authoritative_gtfs_rt + .entry((realtime_feed_id.clone(), GtfsRtType::Alerts)) + .and_modify(|gtfs_data| *gtfs_data = alerts_gtfs_rt.clone()) + .or_insert(alerts_gtfs_rt.clone()); } let this_chateau_dashmap = authoritative_data_store.get(&realtime_feed_id); @@ -132,31 +137,207 @@ pub async fn new_rt_data( let mut trip_updates_lookup_by_trip_id_to_trip_update_ids: HashMap> = HashMap::new(); - use catenary::schema::gtfs::static_download_attempts as static_download_attempts_pg_schema; use catenary::schema::gtfs::chateaus as chateaus_pg_schema; use catenary::schema::gtfs::routes as routes_pg_schema; + use catenary::schema::gtfs::static_download_attempts as static_download_attempts_pg_schema; + + //get this chateau + let this_chateau = chateaus_pg_schema::dsl::chateaus + .filter(chateaus_pg_schema::dsl::chateau.eq(&chateau_id)) + .first::(conn) + .await; + + match this_chateau { + Err(err) => false, + Ok(this_chateau) => { + //get all routes inside chateau from postgres db + let routes = routes_pg_schema::dsl::routes + .filter(routes_pg_schema::dsl::chateau.eq(&chateau_id)) + .select((catenary::models::Route::as_select())) + .load::(conn) + .await + .unwrap(); + + let mut route_id_to_route: HashMap = HashMap::new(); + + for route in routes { + route_id_to_route.insert(route.route_id.clone(), route); + } + + let route_id_to_route = route_id_to_route; + + //combine them together and insert them with the vehicles positions + + // trips can be left fairly raw for now, with a lot of data references + + // ignore alerts for now, as well as trip modifications + + let mut aspenised_vehicle_positions: HashMap = + HashMap::new(); + + //collect all trip ids that must be looked up + //collect all common itinerary patterns and look those up + + let mut trip_ids_to_lookup: HashSet = HashSet::new(); - //get all routes inside chateau from postgres db - let routes = routes_pg_schema::dsl::routes - .filter(routes_pg_schema::dsl::chateau.eq(&chateau_id)) - .select((catenary::models::Route::as_select())) - .load::(conn) - .await - .unwrap(); + for realtime_feed_id in this_chateau.realtime_feeds.iter().flatten() { + if let Some(vehicle_gtfs_rt_for_feed_id) = authoritative_gtfs_rt + .get(&(realtime_feed_id.clone(), GtfsRtType::VehiclePositions)) + { + let vehicle_gtfs_rt_for_feed_id = vehicle_gtfs_rt_for_feed_id.get(); - //for (realtime_feed_id, gtfs_dataset) in this_chateau_lock.raw_gtfs_rt.iter() { - // if gtfs_dataset.vehicle_positions.is_some() { + for vehicle_entity in vehicle_gtfs_rt_for_feed_id.entity.iter() { + if let Some(vehicle_pos) = &vehicle_entity.vehicle { + if let Some(trip) = &vehicle_pos.trip { + if let Some(trip_id) = &trip.trip_id { + trip_ids_to_lookup.insert(trip_id.clone()); + } + } + } + } + } - //for trips, batch lookups by groups of 100 - //collect all common itinerary patterns and look those up + //now do the same for alerts updates + if let Some(alert_gtfs_rt_for_feed_id) = + authoritative_gtfs_rt.get(&(realtime_feed_id.clone(), GtfsRtType::Alerts)) + { + let alert_gtfs_rt_for_feed_id = alert_gtfs_rt_for_feed_id.get(); - //combine them together and insert them with the vehicles positions - // } + for alert_entity in alert_gtfs_rt_for_feed_id.entity.iter() { + if let Some(alert) = &alert_entity.alert { + for entity in alert.informed_entity.iter() { + if let Some(trip) = &entity.trip { + if let Some(trip_id) = &trip.trip_id { + trip_ids_to_lookup.insert(trip_id.clone()); + } + } + } + } + } + } - // trips can be left fairly raw for now, with a lot of data references + //now look up all the trips - // ignore alerts for now, as well as trip modifications - // } + let trips = catenary::schema::gtfs::trips_compressed::dsl::trips_compressed + .filter( + catenary::schema::gtfs::trips_compressed::dsl::trip_id + .eq_any(trip_ids_to_lookup.iter()), + ) + .load::(conn) + .await + .unwrap(); - true + let mut trip_id_to_trip: HashMap = + HashMap::new(); + + for trip in trips { + trip_id_to_trip.insert(trip.trip_id.clone(), trip); + } + + let trip_id_to_trip = trip_id_to_trip; + + //also lookup all the headsigns from the trips via itinerary patterns + + let mut list_of_itinerary_patterns_to_lookup: HashSet = HashSet::new(); + + for trip in trip_id_to_trip.values() { + list_of_itinerary_patterns_to_lookup.insert(trip.itinerary_pattern_id.clone()); + } + + let itinerary_patterns = catenary::schema::gtfs::itinerary_pattern_meta::dsl::itinerary_pattern_meta + .filter(catenary::schema::gtfs::itinerary_pattern_meta::dsl::itinerary_pattern_id.eq_any(list_of_itinerary_patterns_to_lookup.iter())) + .select(catenary::models::ItineraryPatternMeta::as_select()) + .load::(conn) + .await + .unwrap(); + + let mut itinerary_pattern_id_to_itinerary_pattern_meta: HashMap< + String, + catenary::models::ItineraryPatternMeta, + > = HashMap::new(); + + for itinerary_pattern in itinerary_patterns { + itinerary_pattern_id_to_itinerary_pattern_meta.insert( + itinerary_pattern.itinerary_pattern_id.clone(), + itinerary_pattern, + ); + } + + let itinerary_pattern_id_to_itinerary_pattern_meta = + itinerary_pattern_id_to_itinerary_pattern_meta; + + let vehicle_routes_cache: HashMap = + HashMap::new(); + + for realtime_feed_id in this_chateau.realtime_feeds.iter().flatten() { + if let Some(vehicle_gtfs_rt_for_feed_id) = authoritative_gtfs_rt + .get(&(realtime_feed_id.clone(), GtfsRtType::VehiclePositions)) + { + let vehicle_gtfs_rt_for_feed_id = vehicle_gtfs_rt_for_feed_id.get(); + + for vehicle_entity in vehicle_gtfs_rt_for_feed_id.entity.iter() { + if let Some(vehicle_pos) = &vehicle_entity.vehicle { + aspenised_vehicle_positions.insert(vehicle_entity.id.clone(), AspenisedVehiclePosition { + trip: vehicle_pos.trip.as_ref().map(|trip| { + AspenisedVehicleTripInfo { + trip_id: trip.trip_id.clone(), + route_id: match &trip.route_id { + Some(route_id) => Some(route_id.clone()), + None => match &trip.trip_id { + Some(trip_id) => { + let trip = trip_id_to_trip.get(&trip_id.clone()); + trip.map(|trip| trip.route_id.clone()) + }, + None => None + } + }, + trip_headsign: match &trip.trip_id { + Some(trip_id) => { + let trip = trip_id_to_trip.get(&trip_id.clone()); + match trip { + Some(trip) => { + let itinerary_pattern = itinerary_pattern_id_to_itinerary_pattern_meta.get(&trip.itinerary_pattern_id); + match itinerary_pattern { + Some(itinerary_pattern) => { + itinerary_pattern.trip_headsign.clone() + }, + None => None + } + }, + None => None + } + }, + None => None + }, + trip_short_name: match &trip.trip_id { + Some(trip_id) => { + let trip = trip_id_to_trip.get(&trip_id.clone()); + match trip { + Some(trip) => { + match &trip.trip_short_name { + Some(trip_short_name) => Some(trip_short_name.clone()), + None => None + } + }, + None => None + } + }, + None => None + } + } + }), + position: vehicle_pos.position.clone(), + timestamp: vehicle_pos.timestamp.clone(), + vehicle: vehicle_pos.vehicle.clone(), + }); + + //insert the route cache + } + } + } + } + } + true + } + } } diff --git a/src/aspen/main.rs b/src/aspen/main.rs index fa9f9f87..66a0db90 100644 --- a/src/aspen/main.rs +++ b/src/aspen/main.rs @@ -52,9 +52,9 @@ mod import_alpenrose; use catenary::aspen_dataset::GtfsRtType; use catenary::postgres_tools::CatenaryPostgresPool; use crossbeam::deque::{Injector, Steal}; +use futures::join; use gtfs_rt::FeedMessage; use scc::HashMap as SccHashMap; -use futures::join; mod async_threads_alpenrose; @@ -210,7 +210,10 @@ async fn main() -> anyhow::Result<()> { .for_each(|_| async {}) .await; - join!(async_from_alpenrose_processor_handler.join().unwrap(), leader_thread_handler.join().unwrap()); + join!( + async_from_alpenrose_processor_handler.join().unwrap(), + leader_thread_handler.join().unwrap() + ); Ok(()) } diff --git a/src/lib.rs b/src/lib.rs index af9bc290..5636d6b3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -184,12 +184,12 @@ pub mod aspen_dataset { } pub struct AspenisedVehicleRouteCache { - route_short_name: Option, - route_long_name: Option, - //route_short_name_langs: Option>, - //route_long_name_langs: Option>, - route_colour: Option, - route_text_colour: Option, + pub route_short_name: Option, + pub route_long_name: Option, + pub route_short_name_langs: Option>, + pub route_long_name_langs: Option>, + pub route_colour: Option, + pub route_text_colour: Option, } #[derive(Copy, Eq, Hash, PartialEq, Clone)]