Skip to content

Commit

Permalink
implement add missing ptc positions for gtfs vehicle positions
Browse files Browse the repository at this point in the history
  • Loading branch information
kylerchin committed Oct 4, 2024
1 parent fb210fc commit 8184420
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 7 deletions.
148 changes: 142 additions & 6 deletions src/aspen/import_alpenrose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ extern crate catenary;
use ahash::{AHashMap, AHashSet};
use catenary::aspen_dataset::*;
use catenary::postgres_tools::CatenaryPostgresPool;
use catenary::schema::gtfs::agencies::chateau;
use compact_str::CompactString;
use diesel::ExpressionMethods;
use diesel::QueryDsl;
Expand Down Expand Up @@ -72,6 +73,64 @@ pub enum TrackData {
None,
}

#[derive(Serialize, Deserialize, Clone, Debug)]
struct MetrolinkPosRaw {
symbol: CompactString,
direction: CompactString,
lat: CompactString,
lon: CompactString,
speed: CompactString,
line: CompactString,
ptc_time: CompactString,
ptc_status: CompactString,
delay_status: CompactString,
}

#[derive(Serialize, Deserialize, Clone, Debug)]
struct MetrolinkPos {
lat: f32,
lon: f32,
speed: f32,
symbol: CompactString,
}

fn mph_to_mps(mph: &CompactString) -> Option<f32> {
let mph: f32 = match mph.parse() {
Ok(mph) => mph,
Err(_) => return None,
};

Some(mph * 0.44704)
}

fn metrlink_coord_to_f32(coord: &CompactString) -> Option<f32> {
//Split into 3 parts based on :
let parts: Vec<&str> = coord.split(":").collect();

if parts.len() != 3 {
return None;
}

let degrees: f32 = match parts[0].parse() {
Ok(degrees) => degrees,
Err(_) => return None,
};

let minutes: f32 = match parts[1].parse() {
Ok(minutes) => minutes,
Err(_) => return None,
};

let seconds: f32 = match parts[2].parse() {
Ok(seconds) => seconds,
Err(_) => return None,
};

let decimal = degrees + minutes / 60.0 + seconds / 3600.0;

Some(decimal)
}

pub async fn new_rt_data(
authoritative_data_store: Arc<SccHashMap<String, catenary::aspen_dataset::AspenisedData>>,
authoritative_gtfs_rt: Arc<SccHashMap<(String, GtfsRtType), FeedMessage>>,
Expand All @@ -87,6 +146,53 @@ pub async fn new_rt_data(
) -> Result<bool, Box<dyn std::error::Error + Send + Sync>> {
let start = std::time::Instant::now();

let fetch_supplemental_data_positions_metrolink: Option<AHashMap<CompactString, MetrolinkPos>> =
match realtime_feed_id.as_str() {
"f-metrolinktrains~rt" => {
let raw_data_req =
reqwest::get("https://rtt.metrolinktrains.com/trainlist.json").await;

match raw_data_req {
Ok(metrolink_data) => {
let metrolink_data = metrolink_data.json::<Vec<MetrolinkPosRaw>>().await;

match metrolink_data {
Ok(metrolink_data) => {
let mut metrolink_positions: AHashMap<CompactString, MetrolinkPos> =
AHashMap::new();

for pos in metrolink_data {
let lat = metrlink_coord_to_f32(&pos.lat);
let lon = metrlink_coord_to_f32(&pos.lon);
let speed = mph_to_mps(&pos.speed);

if let (Some(lat), Some(lon), Some(speed)) = (lat, lon, speed) {
metrolink_positions.insert(
pos.symbol.clone(),
MetrolinkPos {
lat: lat,
lon: lon,
speed: speed,
symbol: pos.symbol.clone(),
},
);
}
}

Some(metrolink_positions)
}
Err(e) => {
println!("Error fetching metrolink data: {}", e);
None
}
}
}
_ => None,
}
}
_ => None,
};

let conn_pool = pool.as_ref();
let conn_pre = conn_pool.get().await;

Expand Down Expand Up @@ -286,7 +392,6 @@ pub async fn new_rt_data(

for vehicle_entity in vehicle_gtfs_rt_for_feed_id.entity.iter() {
if let Some(vehicle_pos) = &vehicle_entity.vehicle {

let pos_aspenised = AspenisedVehiclePosition {
trip: vehicle_pos.trip.as_ref().map(|trip| {
AspenisedVehicleTripInfo {
Expand Down Expand Up @@ -379,10 +484,14 @@ pub async fn new_rt_data(
congestion_level: vehicle_pos.congestion_level
};

let pos_aspenised = vehicle_pos_supplement(
pos_aspenised,
&fetch_supplemental_data_positions_metrolink,
&chateau_id,
);


aspenised_vehicle_positions.insert(vehicle_entity.id.clone(),
pos_aspenised);
aspenised_vehicle_positions
.insert(vehicle_entity.id.clone(), pos_aspenised);

//insert the route cache

Expand Down Expand Up @@ -591,11 +700,38 @@ pub async fn fetch_track_data(chateau_id: &str) -> TrackData {
}
}

fn vehicle_pos_supplement(pos_aspenised: AspenisedVehiclePosition) -> AspenisedVehiclePosition {
fn vehicle_pos_supplement(
pos_aspenised: AspenisedVehiclePosition,
fetch_supplemental_data_positions_metrolink: &Option<AHashMap<CompactString, MetrolinkPos>>,
chateau_id: &String,
) -> AspenisedVehiclePosition {
let mut pos_aspenised = pos_aspenised;

match chateau_id.as_str() {
"metrolinktrains" => match fetch_supplemental_data_positions_metrolink {
Some(supp_metrolink_data) => {
if let Some(vehicle_ids) = &pos_aspenised.vehicle {
if let Some(vehicle_id) = &vehicle_ids.id {
if let Some(metrolink_pos) = supp_metrolink_data.get(vehicle_id.as_str()) {
pos_aspenised.position = Some(CatenaryRtVehiclePosition {
latitude: metrolink_pos.lat,
longitude: metrolink_pos.lon,
bearing: None,
odometer: None,
speed: Some(metrolink_pos.speed),
});
}
}
}
}
None => {}
},
_ => {}
}

pos_aspenised
}


#[cfg(test)]
mod tests {
#[tokio::test]
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ pub mod aspen_dataset {
pub delay: Option<i32>,
pub stop_time_update: Vec<AspenisedStopTimeUpdate>,
pub trip_properties: Option<AspenTripProperties>,
pub trip_headsign: Option<CompactString>
pub trip_headsign: Option<CompactString>,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
Expand Down

0 comments on commit 8184420

Please sign in to comment.