Skip to content

Commit

Permalink
Finish output of vehicle locations from aspen system
Browse files Browse the repository at this point in the history
  • Loading branch information
kylerchin committed Apr 23, 2024
1 parent 55ebea3 commit 2f40a93
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 17 deletions.
83 changes: 76 additions & 7 deletions src/aspen/import_alpenrose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,15 @@ pub async fn new_rt_data(
//if this item is empty, create it
if this_chateau_dashmap.is_none() {
let mut new_aspenised_data = catenary::aspen_dataset::AspenisedData {
vehicle_positions: Vec::new(),
vehicle_positions: HashMap::new(),
vehicle_routes_cache: HashMap::new(),
trip_updates: HashMap::new(),
trip_updates_lookup_by_trip_id_to_trip_update_ids: HashMap::new(),
raw_alerts: None,
impacted_routes_alerts: None,
impacted_stops_alerts: None,
impacted_routes_stops_alerts: None,
last_updated_time_ms: 0,
};
let _ = authoritative_data_store.insert(realtime_feed_id.clone(), new_aspenised_data);
}
Expand All @@ -131,7 +132,7 @@ pub async fn new_rt_data(

// take all the gtfs rt data and merge it together

let mut vehicle_positions: Vec<AspenisedVehiclePosition> = Vec::new();
let mut vehicle_positions: HashMap<String, AspenisedVehiclePosition> = HashMap::new();
let mut vehicle_routes_cache: HashMap<String, AspenisedVehicleRouteCache> = HashMap::new();
let mut trip_updates: HashMap<String, TripUpdate> = HashMap::new();
let mut trip_updates_lookup_by_trip_id_to_trip_update_ids: HashMap<String, Vec<String>> =
Expand Down Expand Up @@ -266,9 +267,6 @@ pub async fn new_rt_data(
let itinerary_pattern_id_to_itinerary_pattern_meta =
itinerary_pattern_id_to_itinerary_pattern_meta;

let vehicle_routes_cache: HashMap<String, AspenisedVehicleRouteCache> =
HashMap::new();

for realtime_feed_id in this_chateau.realtime_feeds.iter().flatten() {
if let Some(vehicle_gtfs_rt_for_feed_id) = authoritative_gtfs_rt
.get(&(realtime_feed_id.clone(), GtfsRtType::VehiclePositions))
Expand Down Expand Up @@ -326,16 +324,87 @@ pub async fn new_rt_data(
}
}
}),
position: vehicle_pos.position.clone(),
position: match &vehicle_pos.position {
Some(position) => Some(CatenaryRtVehiclePosition {
latitude: position.latitude,
longitude: position.longitude,
bearing: position.bearing,
odometer: position.odometer,
speed: position.speed,
}),
None => None
},
timestamp: vehicle_pos.timestamp.clone(),
vehicle: vehicle_pos.vehicle.clone(),
vehicle: match &vehicle_pos.vehicle {
Some(vehicle) => Some(AspenisedVehicleDescriptor {
id: vehicle.id.clone(),
label: vehicle.label.clone(),
license_plate: vehicle.license_plate.clone(),
wheelchair_accessible: vehicle.wheelchair_accessible,
}),
None => None
}
});

//insert the route cache

if let Some(trip) = &vehicle_pos.trip {
if let Some(route_id) = &trip.route_id {
if !vehicle_routes_cache.contains_key(route_id) {
let route = route_id_to_route.get(route_id);
if let Some(route) = route {
vehicle_routes_cache.insert(
route_id.clone(),
AspenisedVehicleRouteCache {
route_short_name: route.short_name.clone(),
route_long_name: route.long_name.clone(),
// route_short_name_langs: route.short_name_translations.clone(),
//route_long_name_langs: route.short_name_translations.clone(),
route_colour: route.color.clone(),
route_text_colour: route.text_color.clone(),
},
);
}
}
}
}
}
}
}
}

//Insert data back into process-wide authoritative_data_store

authoritative_data_store
.entry(chateau_id.clone())
.and_modify(|data| {
*data = AspenisedData {
vehicle_positions: aspenised_vehicle_positions.clone(),
vehicle_routes_cache: vehicle_routes_cache.clone(),
trip_updates: trip_updates.clone(),
trip_updates_lookup_by_trip_id_to_trip_update_ids:
trip_updates_lookup_by_trip_id_to_trip_update_ids.clone(),
raw_alerts: None,
impacted_routes_alerts: None,
impacted_stops_alerts: None,
impacted_routes_stops_alerts: None,
last_updated_time_ms: catenary::duration_since_unix_epoch().as_millis()
as u64,
}
})
.or_insert(AspenisedData {
vehicle_positions: aspenised_vehicle_positions.clone(),
vehicle_routes_cache: vehicle_routes_cache.clone(),
trip_updates: trip_updates.clone(),
trip_updates_lookup_by_trip_id_to_trip_update_ids:
trip_updates_lookup_by_trip_id_to_trip_update_ids.clone(),
raw_alerts: None,
impacted_routes_alerts: None,
impacted_stops_alerts: None,
impacted_routes_stops_alerts: None,
last_updated_time_ms: catenary::duration_since_unix_epoch().as_millis()
as u64,
});
}
true
}
Expand Down
22 changes: 18 additions & 4 deletions src/aspen/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@

/// This is the service definition. It looks a lot like a trait definition.
/// It defines one RPC, hello, which takes one arg, name, and returns a String.
use crate::aspen_dataset::*;
use crate::ChateauDataNoGeometry;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::net::IpAddr;

#[tarpc::service]
pub trait AspenRpc {
Expand All @@ -25,12 +31,20 @@ pub trait AspenRpc {
alerts_response_code: Option<u16>,
time_of_submission_ms: u64,
) -> bool;

async fn get_vehicle_locations(
chateau_id: String,
existing_fasthash_of_routes: Option<u64>,
) -> (Option<GetVehicleLocationsResponse>);
}

use crate::ChateauDataNoGeometry;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::net::IpAddr;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct GetVehicleLocationsResponse {
pub vehicle_route_cache: Option<HashMap<String, AspenisedVehicleRouteCache>>,
pub vehicle_positions: HashMap<String, AspenisedVehiclePosition>,
pub hash_of_routes: u64,
pub last_updated_time_ms: u64,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ChateauMetadataZookeeper {
Expand Down
38 changes: 37 additions & 1 deletion src/aspen/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ mod leader_thread;
use leader_thread::aspen_leader_thread;
mod import_alpenrose;
use catenary::aspen_dataset::GtfsRtType;
use catenary::aspen_dataset::*;
use catenary::postgres_tools::CatenaryPostgresPool;
use crossbeam::deque::{Injector, Steal};
use futures::join;
use gtfs_rt::FeedMessage;
use scc::HashMap as SccHashMap;

mod async_threads_alpenrose;

// This is the type that implements the generated World trait. It is the business logic
Expand Down Expand Up @@ -112,6 +112,42 @@ impl AspenRpc for AspenServer {
});
true
}

async fn get_vehicle_locations(
self,
_: context::Context,
chateau_id: String,
existing_fasthash_of_routes: Option<u64>,
) -> Option<GetVehicleLocationsResponse> {
match self.authoritative_data_store.get(&chateau_id) {
Some(aspenised_data) => {
let aspenised_data = aspenised_data.get();

let fast_hash_of_routes = catenary::fast_hash(
&aspenised_data
.vehicle_routes_cache
.values()
.collect::<Vec<&AspenisedVehicleRouteCache>>(),
);

Some(GetVehicleLocationsResponse {
vehicle_route_cache: match existing_fasthash_of_routes {
Some(existing_fasthash_of_routes) => {
match existing_fasthash_of_routes == fast_hash_of_routes {
true => None,
false => Some(aspenised_data.vehicle_routes_cache.clone()),
}
}
None => Some(aspenised_data.vehicle_routes_cache.clone()),
},
vehicle_positions: aspenised_data.vehicle_positions.clone(),
hash_of_routes: fast_hash_of_routes,
last_updated_time_ms: aspenised_data.last_updated_time_ms,
})
}
None => None,
}
}
}

async fn spawn(fut: impl Future<Output = ()> + Send + 'static) {
Expand Down
31 changes: 26 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ pub mod aspen_dataset {
use std::{collections::BTreeMap, collections::HashMap, hash::Hash};

pub struct AspenisedData {
pub vehicle_positions: Vec<AspenisedVehiclePosition>,
pub vehicle_positions: HashMap<String, AspenisedVehiclePosition>,
pub vehicle_routes_cache: HashMap<String, AspenisedVehicleRouteCache>,
//id to trip update
pub trip_updates: HashMap<String, TripUpdate>,
Expand All @@ -167,27 +167,48 @@ pub mod aspen_dataset {
pub impacted_routes_alerts: Option<HashMap<String, Vec<String>>>,
pub impacted_stops_alerts: Option<HashMap<String, Vec<String>>>,
pub impacted_routes_stops_alerts: Option<HashMap<String, Vec<String>>>,
pub last_updated_time_ms: u64,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AspenisedVehiclePosition {
pub trip: Option<AspenisedVehicleTripInfo>,
pub vehicle: Option<VehicleDescriptor>,
pub position: Option<gtfs_rt::Position>,
pub vehicle: Option<AspenisedVehicleDescriptor>,
pub position: Option<CatenaryRtVehiclePosition>,
pub timestamp: Option<u64>,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CatenaryRtVehiclePosition {
pub latitude: f32,
pub longitude: f32,
pub bearing: Option<f32>,
pub odometer: Option<f64>,
pub speed: Option<f32>,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AspenisedVehicleDescriptor {
pub id: Option<String>,
pub label: Option<String>,
pub license_plate: Option<String>,
pub wheelchair_accessible: Option<i32>,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AspenisedVehicleTripInfo {
pub trip_id: Option<String>,
pub trip_headsign: Option<String>,
pub route_id: Option<String>,
pub trip_short_name: Option<String>,
}

#[derive(Clone, Debug, Serialize, Deserialize, Hash, PartialEq, Eq)]
pub struct AspenisedVehicleRouteCache {
pub route_short_name: Option<String>,
pub route_long_name: Option<String>,
pub route_short_name_langs: Option<HashMap<String, String>>,
pub route_long_name_langs: Option<HashMap<String, String>>,
// pub route_short_name_langs: Option<HashMap<String, String>>,
// pub route_long_name_langs: Option<HashMap<String, String>>,
pub route_colour: Option<String>,
pub route_text_colour: Option<String>,
}
Expand Down

0 comments on commit 2f40a93

Please sign in to comment.