diff --git a/src/aspen/import_alpenrose.rs b/src/aspen/import_alpenrose.rs index d1bffd51..d1819a17 100644 --- a/src/aspen/import_alpenrose.rs +++ b/src/aspen/import_alpenrose.rs @@ -114,7 +114,7 @@ pub async fn new_rt_data( //if this item is empty, create it if this_chateau_dashmap.is_none() { let mut new_aspenised_data = catenary::aspen_dataset::AspenisedData { - vehicle_positions: Vec::new(), + vehicle_positions: HashMap::new(), vehicle_routes_cache: HashMap::new(), trip_updates: HashMap::new(), trip_updates_lookup_by_trip_id_to_trip_update_ids: HashMap::new(), @@ -122,6 +122,7 @@ pub async fn new_rt_data( impacted_routes_alerts: None, impacted_stops_alerts: None, impacted_routes_stops_alerts: None, + last_updated_time_ms: 0, }; let _ = authoritative_data_store.insert(realtime_feed_id.clone(), new_aspenised_data); } @@ -131,7 +132,7 @@ pub async fn new_rt_data( // take all the gtfs rt data and merge it together - let mut vehicle_positions: Vec = Vec::new(); + let mut vehicle_positions: HashMap = HashMap::new(); let mut vehicle_routes_cache: HashMap = HashMap::new(); let mut trip_updates: HashMap = HashMap::new(); let mut trip_updates_lookup_by_trip_id_to_trip_update_ids: HashMap> = @@ -266,9 +267,6 @@ pub async fn new_rt_data( let itinerary_pattern_id_to_itinerary_pattern_meta = itinerary_pattern_id_to_itinerary_pattern_meta; - let vehicle_routes_cache: HashMap = - HashMap::new(); - for realtime_feed_id in this_chateau.realtime_feeds.iter().flatten() { if let Some(vehicle_gtfs_rt_for_feed_id) = authoritative_gtfs_rt .get(&(realtime_feed_id.clone(), GtfsRtType::VehiclePositions)) @@ -326,16 +324,87 @@ pub async fn new_rt_data( } } }), - position: vehicle_pos.position.clone(), + position: match &vehicle_pos.position { + Some(position) => Some(CatenaryRtVehiclePosition { + latitude: position.latitude, + longitude: position.longitude, + bearing: position.bearing, + odometer: position.odometer, + speed: position.speed, + }), + None => None + }, timestamp: vehicle_pos.timestamp.clone(), - vehicle: vehicle_pos.vehicle.clone(), + vehicle: match &vehicle_pos.vehicle { + Some(vehicle) => Some(AspenisedVehicleDescriptor { + id: vehicle.id.clone(), + label: vehicle.label.clone(), + license_plate: vehicle.license_plate.clone(), + wheelchair_accessible: vehicle.wheelchair_accessible, + }), + None => None + } }); //insert the route cache + + if let Some(trip) = &vehicle_pos.trip { + if let Some(route_id) = &trip.route_id { + if !vehicle_routes_cache.contains_key(route_id) { + let route = route_id_to_route.get(route_id); + if let Some(route) = route { + vehicle_routes_cache.insert( + route_id.clone(), + AspenisedVehicleRouteCache { + route_short_name: route.short_name.clone(), + route_long_name: route.long_name.clone(), + // route_short_name_langs: route.short_name_translations.clone(), + //route_long_name_langs: route.short_name_translations.clone(), + route_colour: route.color.clone(), + route_text_colour: route.text_color.clone(), + }, + ); + } + } + } + } } } } } + + //Insert data back into process-wide authoritative_data_store + + authoritative_data_store + .entry(chateau_id.clone()) + .and_modify(|data| { + *data = AspenisedData { + vehicle_positions: aspenised_vehicle_positions.clone(), + vehicle_routes_cache: vehicle_routes_cache.clone(), + trip_updates: trip_updates.clone(), + trip_updates_lookup_by_trip_id_to_trip_update_ids: + trip_updates_lookup_by_trip_id_to_trip_update_ids.clone(), + raw_alerts: None, + impacted_routes_alerts: None, + impacted_stops_alerts: None, + impacted_routes_stops_alerts: None, + last_updated_time_ms: catenary::duration_since_unix_epoch().as_millis() + as u64, + } + }) + .or_insert(AspenisedData { + vehicle_positions: aspenised_vehicle_positions.clone(), + vehicle_routes_cache: vehicle_routes_cache.clone(), + trip_updates: trip_updates.clone(), + trip_updates_lookup_by_trip_id_to_trip_update_ids: + trip_updates_lookup_by_trip_id_to_trip_update_ids.clone(), + raw_alerts: None, + impacted_routes_alerts: None, + impacted_stops_alerts: None, + impacted_routes_stops_alerts: None, + last_updated_time_ms: catenary::duration_since_unix_epoch().as_millis() + as u64, + }); } true } diff --git a/src/aspen/lib.rs b/src/aspen/lib.rs index c8e15db4..f3bc7ef3 100644 --- a/src/aspen/lib.rs +++ b/src/aspen/lib.rs @@ -4,6 +4,12 @@ /// This is the service definition. It looks a lot like a trait definition. /// It defines one RPC, hello, which takes one arg, name, and returns a String. +use crate::aspen_dataset::*; +use crate::ChateauDataNoGeometry; +use serde::{Deserialize, Serialize}; +use std::collections::BTreeMap; +use std::collections::HashMap; +use std::net::IpAddr; #[tarpc::service] pub trait AspenRpc { @@ -25,12 +31,20 @@ pub trait AspenRpc { alerts_response_code: Option, time_of_submission_ms: u64, ) -> bool; + + async fn get_vehicle_locations( + chateau_id: String, + existing_fasthash_of_routes: Option, + ) -> (Option); } -use crate::ChateauDataNoGeometry; -use serde::{Deserialize, Serialize}; -use std::collections::BTreeMap; -use std::net::IpAddr; +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct GetVehicleLocationsResponse { + pub vehicle_route_cache: Option>, + pub vehicle_positions: HashMap, + pub hash_of_routes: u64, + pub last_updated_time_ms: u64, +} #[derive(Clone, Debug, Serialize, Deserialize)] pub struct ChateauMetadataZookeeper { diff --git a/src/aspen/main.rs b/src/aspen/main.rs index 66a0db90..8e0923db 100644 --- a/src/aspen/main.rs +++ b/src/aspen/main.rs @@ -50,12 +50,12 @@ mod leader_thread; use leader_thread::aspen_leader_thread; mod import_alpenrose; use catenary::aspen_dataset::GtfsRtType; +use catenary::aspen_dataset::*; use catenary::postgres_tools::CatenaryPostgresPool; use crossbeam::deque::{Injector, Steal}; use futures::join; use gtfs_rt::FeedMessage; use scc::HashMap as SccHashMap; - mod async_threads_alpenrose; // This is the type that implements the generated World trait. It is the business logic @@ -112,6 +112,42 @@ impl AspenRpc for AspenServer { }); true } + + async fn get_vehicle_locations( + self, + _: context::Context, + chateau_id: String, + existing_fasthash_of_routes: Option, + ) -> Option { + match self.authoritative_data_store.get(&chateau_id) { + Some(aspenised_data) => { + let aspenised_data = aspenised_data.get(); + + let fast_hash_of_routes = catenary::fast_hash( + &aspenised_data + .vehicle_routes_cache + .values() + .collect::>(), + ); + + Some(GetVehicleLocationsResponse { + vehicle_route_cache: match existing_fasthash_of_routes { + Some(existing_fasthash_of_routes) => { + match existing_fasthash_of_routes == fast_hash_of_routes { + true => None, + false => Some(aspenised_data.vehicle_routes_cache.clone()), + } + } + None => Some(aspenised_data.vehicle_routes_cache.clone()), + }, + vehicle_positions: aspenised_data.vehicle_positions.clone(), + hash_of_routes: fast_hash_of_routes, + last_updated_time_ms: aspenised_data.last_updated_time_ms, + }) + } + None => None, + } + } } async fn spawn(fut: impl Future + Send + 'static) { diff --git a/src/lib.rs b/src/lib.rs index 5636d6b3..b79d3352 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -158,7 +158,7 @@ pub mod aspen_dataset { use std::{collections::BTreeMap, collections::HashMap, hash::Hash}; pub struct AspenisedData { - pub vehicle_positions: Vec, + pub vehicle_positions: HashMap, pub vehicle_routes_cache: HashMap, //id to trip update pub trip_updates: HashMap, @@ -167,15 +167,35 @@ pub mod aspen_dataset { pub impacted_routes_alerts: Option>>, pub impacted_stops_alerts: Option>>, pub impacted_routes_stops_alerts: Option>>, + pub last_updated_time_ms: u64, } + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct AspenisedVehiclePosition { pub trip: Option, - pub vehicle: Option, - pub position: Option, + pub vehicle: Option, + pub position: Option, pub timestamp: Option, } + #[derive(Clone, Debug, Serialize, Deserialize)] + pub struct CatenaryRtVehiclePosition { + pub latitude: f32, + pub longitude: f32, + pub bearing: Option, + pub odometer: Option, + pub speed: Option, + } + + #[derive(Clone, Debug, Serialize, Deserialize)] + pub struct AspenisedVehicleDescriptor { + pub id: Option, + pub label: Option, + pub license_plate: Option, + pub wheelchair_accessible: Option, + } + + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct AspenisedVehicleTripInfo { pub trip_id: Option, pub trip_headsign: Option, @@ -183,11 +203,12 @@ pub mod aspen_dataset { pub trip_short_name: Option, } + #[derive(Clone, Debug, Serialize, Deserialize, Hash, PartialEq, Eq)] pub struct AspenisedVehicleRouteCache { pub route_short_name: Option, pub route_long_name: Option, - pub route_short_name_langs: Option>, - pub route_long_name_langs: Option>, + // pub route_short_name_langs: Option>, + // pub route_long_name_langs: Option>, pub route_colour: Option, pub route_text_colour: Option, }