From 818442099ec7fff43d4f14257f8711861d38365f Mon Sep 17 00:00:00 2001 From: Kyler Chin <7539174+kylerchin@users.noreply.github.com> Date: Fri, 4 Oct 2024 11:05:02 -0700 Subject: [PATCH] implement add missing ptc positions for gtfs vehicle positions --- src/aspen/import_alpenrose.rs | 148 ++++++++++++++++++++++++++++++++-- src/lib.rs | 2 +- 2 files changed, 143 insertions(+), 7 deletions(-) diff --git a/src/aspen/import_alpenrose.rs b/src/aspen/import_alpenrose.rs index 809deda..253700f 100644 --- a/src/aspen/import_alpenrose.rs +++ b/src/aspen/import_alpenrose.rs @@ -6,6 +6,7 @@ extern crate catenary; use ahash::{AHashMap, AHashSet}; use catenary::aspen_dataset::*; use catenary::postgres_tools::CatenaryPostgresPool; +use catenary::schema::gtfs::agencies::chateau; use compact_str::CompactString; use diesel::ExpressionMethods; use diesel::QueryDsl; @@ -72,6 +73,64 @@ pub enum TrackData { None, } +#[derive(Serialize, Deserialize, Clone, Debug)] +struct MetrolinkPosRaw { + symbol: CompactString, + direction: CompactString, + lat: CompactString, + lon: CompactString, + speed: CompactString, + line: CompactString, + ptc_time: CompactString, + ptc_status: CompactString, + delay_status: CompactString, +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +struct MetrolinkPos { + lat: f32, + lon: f32, + speed: f32, + symbol: CompactString, +} + +fn mph_to_mps(mph: &CompactString) -> Option { + let mph: f32 = match mph.parse() { + Ok(mph) => mph, + Err(_) => return None, + }; + + Some(mph * 0.44704) +} + +fn metrlink_coord_to_f32(coord: &CompactString) -> Option { + //Split into 3 parts based on : + let parts: Vec<&str> = coord.split(":").collect(); + + if parts.len() != 3 { + return None; + } + + let degrees: f32 = match parts[0].parse() { + Ok(degrees) => degrees, + Err(_) => return None, + }; + + let minutes: f32 = match parts[1].parse() { + Ok(minutes) => minutes, + Err(_) => return None, + }; + + let seconds: f32 = match parts[2].parse() { + Ok(seconds) => seconds, + Err(_) => return None, + }; + + let decimal = degrees + minutes / 60.0 + seconds / 3600.0; + + Some(decimal) +} + pub async fn new_rt_data( authoritative_data_store: Arc>, authoritative_gtfs_rt: Arc>, @@ -87,6 +146,53 @@ pub async fn new_rt_data( ) -> Result> { let start = std::time::Instant::now(); + let fetch_supplemental_data_positions_metrolink: Option> = + match realtime_feed_id.as_str() { + "f-metrolinktrains~rt" => { + let raw_data_req = + reqwest::get("https://rtt.metrolinktrains.com/trainlist.json").await; + + match raw_data_req { + Ok(metrolink_data) => { + let metrolink_data = metrolink_data.json::>().await; + + match metrolink_data { + Ok(metrolink_data) => { + let mut metrolink_positions: AHashMap = + AHashMap::new(); + + for pos in metrolink_data { + let lat = metrlink_coord_to_f32(&pos.lat); + let lon = metrlink_coord_to_f32(&pos.lon); + let speed = mph_to_mps(&pos.speed); + + if let (Some(lat), Some(lon), Some(speed)) = (lat, lon, speed) { + metrolink_positions.insert( + pos.symbol.clone(), + MetrolinkPos { + lat: lat, + lon: lon, + speed: speed, + symbol: pos.symbol.clone(), + }, + ); + } + } + + Some(metrolink_positions) + } + Err(e) => { + println!("Error fetching metrolink data: {}", e); + None + } + } + } + _ => None, + } + } + _ => None, + }; + let conn_pool = pool.as_ref(); let conn_pre = conn_pool.get().await; @@ -286,7 +392,6 @@ pub async fn new_rt_data( for vehicle_entity in vehicle_gtfs_rt_for_feed_id.entity.iter() { if let Some(vehicle_pos) = &vehicle_entity.vehicle { - let pos_aspenised = AspenisedVehiclePosition { trip: vehicle_pos.trip.as_ref().map(|trip| { AspenisedVehicleTripInfo { @@ -379,10 +484,14 @@ pub async fn new_rt_data( congestion_level: vehicle_pos.congestion_level }; + let pos_aspenised = vehicle_pos_supplement( + pos_aspenised, + &fetch_supplemental_data_positions_metrolink, + &chateau_id, + ); - - aspenised_vehicle_positions.insert(vehicle_entity.id.clone(), - pos_aspenised); + aspenised_vehicle_positions + .insert(vehicle_entity.id.clone(), pos_aspenised); //insert the route cache @@ -591,11 +700,38 @@ pub async fn fetch_track_data(chateau_id: &str) -> TrackData { } } -fn vehicle_pos_supplement(pos_aspenised: AspenisedVehiclePosition) -> AspenisedVehiclePosition { +fn vehicle_pos_supplement( + pos_aspenised: AspenisedVehiclePosition, + fetch_supplemental_data_positions_metrolink: &Option>, + chateau_id: &String, +) -> AspenisedVehiclePosition { + let mut pos_aspenised = pos_aspenised; + + match chateau_id.as_str() { + "metrolinktrains" => match fetch_supplemental_data_positions_metrolink { + Some(supp_metrolink_data) => { + if let Some(vehicle_ids) = &pos_aspenised.vehicle { + if let Some(vehicle_id) = &vehicle_ids.id { + if let Some(metrolink_pos) = supp_metrolink_data.get(vehicle_id.as_str()) { + pos_aspenised.position = Some(CatenaryRtVehiclePosition { + latitude: metrolink_pos.lat, + longitude: metrolink_pos.lon, + bearing: None, + odometer: None, + speed: Some(metrolink_pos.speed), + }); + } + } + } + } + None => {} + }, + _ => {} + } + pos_aspenised } - #[cfg(test)] mod tests { #[tokio::test] diff --git a/src/lib.rs b/src/lib.rs index 8bce99c..9b64400 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -361,7 +361,7 @@ pub mod aspen_dataset { pub delay: Option, pub stop_time_update: Vec, pub trip_properties: Option, - pub trip_headsign: Option + pub trip_headsign: Option, } #[derive(Clone, Debug, Serialize, Deserialize)]