Skip to content

Commit

Permalink
Finish Aspen vehicle data lookup and hydrater system
Browse files Browse the repository at this point in the history
  • Loading branch information
kylerchin committed Apr 23, 2024
1 parent bfac42c commit 55ebea3
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 36 deletions.
237 changes: 209 additions & 28 deletions src/aspen/import_alpenrose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ use diesel::ExpressionMethods;
use diesel::QueryDsl;
use diesel::SelectableHelper;
use diesel_async::RunQueryDsl;
use gtfs_rt::FeedMessage;
use gtfs_rt::TripUpdate;
use prost::Message;
use scc::HashMap as SccHashMap;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::RwLock;
use gtfs_rt::FeedMessage;

const MAKE_VEHICLES_FEED_LIST: [&str; 9] = [
"f-mta~nyc~rt~subway~1~2~3~4~5~6~7",
Expand All @@ -34,7 +35,7 @@ const MAKE_VEHICLES_FEED_LIST: [&str; 9] = [

pub async fn new_rt_data(
authoritative_data_store: Arc<SccHashMap<String, catenary::aspen_dataset::AspenisedData>>,
authoritative_gtfs_rt:Arc<SccHashMap<(String, GtfsRtType), FeedMessage>>,
authoritative_gtfs_rt: Arc<SccHashMap<(String, GtfsRtType), FeedMessage>>,
chateau_id: String,
realtime_feed_id: String,
vehicles: Option<Vec<u8>>,
Expand Down Expand Up @@ -88,20 +89,24 @@ pub async fn new_rt_data(
//get and update raw gtfs_rt data

if let Some(vehicles_gtfs_rt) = &vehicles_gtfs_rt {
authoritative_gtfs_rt.entry((realtime_feed_id.clone(), GtfsRtType::VehiclePositions)).and_modify(|gtfs_data| *gtfs_data = vehicles_gtfs_rt.clone())
.or_insert(vehicles_gtfs_rt.clone());
authoritative_gtfs_rt
.entry((realtime_feed_id.clone(), GtfsRtType::VehiclePositions))
.and_modify(|gtfs_data| *gtfs_data = vehicles_gtfs_rt.clone())
.or_insert(vehicles_gtfs_rt.clone());
}

if let Some(trip_gtfs_rt) = &trips_gtfs_rt {
authoritative_gtfs_rt.entry((realtime_feed_id.clone(), GtfsRtType::TripUpdates))
.and_modify(|gtfs_data| *gtfs_data = trip_gtfs_rt.clone())
.or_insert(trip_gtfs_rt.clone());
authoritative_gtfs_rt
.entry((realtime_feed_id.clone(), GtfsRtType::TripUpdates))
.and_modify(|gtfs_data| *gtfs_data = trip_gtfs_rt.clone())
.or_insert(trip_gtfs_rt.clone());
}

if let Some(alerts_gtfs_rt) = &alerts_gtfs_rt {
authoritative_gtfs_rt.entry((realtime_feed_id.clone(), GtfsRtType::Alerts))
.and_modify(|gtfs_data| *gtfs_data = alerts_gtfs_rt.clone())
.or_insert(alerts_gtfs_rt.clone());
authoritative_gtfs_rt
.entry((realtime_feed_id.clone(), GtfsRtType::Alerts))
.and_modify(|gtfs_data| *gtfs_data = alerts_gtfs_rt.clone())
.or_insert(alerts_gtfs_rt.clone());
}

let this_chateau_dashmap = authoritative_data_store.get(&realtime_feed_id);
Expand Down Expand Up @@ -132,31 +137,207 @@ pub async fn new_rt_data(
let mut trip_updates_lookup_by_trip_id_to_trip_update_ids: HashMap<String, Vec<String>> =
HashMap::new();

use catenary::schema::gtfs::static_download_attempts as static_download_attempts_pg_schema;
use catenary::schema::gtfs::chateaus as chateaus_pg_schema;
use catenary::schema::gtfs::routes as routes_pg_schema;
use catenary::schema::gtfs::static_download_attempts as static_download_attempts_pg_schema;

//get this chateau
let this_chateau = chateaus_pg_schema::dsl::chateaus
.filter(chateaus_pg_schema::dsl::chateau.eq(&chateau_id))
.first::<catenary::models::Chateau>(conn)
.await;

match this_chateau {
Err(err) => false,
Ok(this_chateau) => {
//get all routes inside chateau from postgres db
let routes = routes_pg_schema::dsl::routes
.filter(routes_pg_schema::dsl::chateau.eq(&chateau_id))
.select((catenary::models::Route::as_select()))
.load::<catenary::models::Route>(conn)
.await
.unwrap();

let mut route_id_to_route: HashMap<String, catenary::models::Route> = HashMap::new();

for route in routes {
route_id_to_route.insert(route.route_id.clone(), route);
}

let route_id_to_route = route_id_to_route;

//combine them together and insert them with the vehicles positions

// trips can be left fairly raw for now, with a lot of data references

// ignore alerts for now, as well as trip modifications

let mut aspenised_vehicle_positions: HashMap<String, AspenisedVehiclePosition> =
HashMap::new();

//collect all trip ids that must be looked up
//collect all common itinerary patterns and look those up

let mut trip_ids_to_lookup: HashSet<String> = HashSet::new();

//get all routes inside chateau from postgres db
let routes = routes_pg_schema::dsl::routes
.filter(routes_pg_schema::dsl::chateau.eq(&chateau_id))
.select((catenary::models::Route::as_select()))
.load::<catenary::models::Route>(conn)
.await
.unwrap();
for realtime_feed_id in this_chateau.realtime_feeds.iter().flatten() {
if let Some(vehicle_gtfs_rt_for_feed_id) = authoritative_gtfs_rt
.get(&(realtime_feed_id.clone(), GtfsRtType::VehiclePositions))
{
let vehicle_gtfs_rt_for_feed_id = vehicle_gtfs_rt_for_feed_id.get();

//for (realtime_feed_id, gtfs_dataset) in this_chateau_lock.raw_gtfs_rt.iter() {
// if gtfs_dataset.vehicle_positions.is_some() {
for vehicle_entity in vehicle_gtfs_rt_for_feed_id.entity.iter() {
if let Some(vehicle_pos) = &vehicle_entity.vehicle {
if let Some(trip) = &vehicle_pos.trip {
if let Some(trip_id) = &trip.trip_id {
trip_ids_to_lookup.insert(trip_id.clone());
}
}
}
}
}

//for trips, batch lookups by groups of 100
//collect all common itinerary patterns and look those up
//now do the same for alerts updates
if let Some(alert_gtfs_rt_for_feed_id) =
authoritative_gtfs_rt.get(&(realtime_feed_id.clone(), GtfsRtType::Alerts))
{
let alert_gtfs_rt_for_feed_id = alert_gtfs_rt_for_feed_id.get();

//combine them together and insert them with the vehicles positions
// }
for alert_entity in alert_gtfs_rt_for_feed_id.entity.iter() {
if let Some(alert) = &alert_entity.alert {
for entity in alert.informed_entity.iter() {
if let Some(trip) = &entity.trip {
if let Some(trip_id) = &trip.trip_id {
trip_ids_to_lookup.insert(trip_id.clone());
}
}
}
}
}
}

// trips can be left fairly raw for now, with a lot of data references
//now look up all the trips

// ignore alerts for now, as well as trip modifications
// }
let trips = catenary::schema::gtfs::trips_compressed::dsl::trips_compressed
.filter(
catenary::schema::gtfs::trips_compressed::dsl::trip_id
.eq_any(trip_ids_to_lookup.iter()),
)
.load::<catenary::models::CompressedTrip>(conn)
.await
.unwrap();

true
let mut trip_id_to_trip: HashMap<String, catenary::models::CompressedTrip> =
HashMap::new();

for trip in trips {
trip_id_to_trip.insert(trip.trip_id.clone(), trip);
}

let trip_id_to_trip = trip_id_to_trip;

//also lookup all the headsigns from the trips via itinerary patterns

let mut list_of_itinerary_patterns_to_lookup: HashSet<String> = HashSet::new();

for trip in trip_id_to_trip.values() {
list_of_itinerary_patterns_to_lookup.insert(trip.itinerary_pattern_id.clone());
}

let itinerary_patterns = catenary::schema::gtfs::itinerary_pattern_meta::dsl::itinerary_pattern_meta
.filter(catenary::schema::gtfs::itinerary_pattern_meta::dsl::itinerary_pattern_id.eq_any(list_of_itinerary_patterns_to_lookup.iter()))
.select(catenary::models::ItineraryPatternMeta::as_select())
.load::<catenary::models::ItineraryPatternMeta>(conn)
.await
.unwrap();

let mut itinerary_pattern_id_to_itinerary_pattern_meta: HashMap<
String,
catenary::models::ItineraryPatternMeta,
> = HashMap::new();

for itinerary_pattern in itinerary_patterns {
itinerary_pattern_id_to_itinerary_pattern_meta.insert(
itinerary_pattern.itinerary_pattern_id.clone(),
itinerary_pattern,
);
}

let itinerary_pattern_id_to_itinerary_pattern_meta =
itinerary_pattern_id_to_itinerary_pattern_meta;

let vehicle_routes_cache: HashMap<String, AspenisedVehicleRouteCache> =
HashMap::new();

for realtime_feed_id in this_chateau.realtime_feeds.iter().flatten() {
if let Some(vehicle_gtfs_rt_for_feed_id) = authoritative_gtfs_rt
.get(&(realtime_feed_id.clone(), GtfsRtType::VehiclePositions))
{
let vehicle_gtfs_rt_for_feed_id = vehicle_gtfs_rt_for_feed_id.get();

for vehicle_entity in vehicle_gtfs_rt_for_feed_id.entity.iter() {
if let Some(vehicle_pos) = &vehicle_entity.vehicle {
aspenised_vehicle_positions.insert(vehicle_entity.id.clone(), AspenisedVehiclePosition {
trip: vehicle_pos.trip.as_ref().map(|trip| {
AspenisedVehicleTripInfo {
trip_id: trip.trip_id.clone(),
route_id: match &trip.route_id {
Some(route_id) => Some(route_id.clone()),
None => match &trip.trip_id {
Some(trip_id) => {
let trip = trip_id_to_trip.get(&trip_id.clone());
trip.map(|trip| trip.route_id.clone())
},
None => None
}
},
trip_headsign: match &trip.trip_id {
Some(trip_id) => {
let trip = trip_id_to_trip.get(&trip_id.clone());
match trip {
Some(trip) => {
let itinerary_pattern = itinerary_pattern_id_to_itinerary_pattern_meta.get(&trip.itinerary_pattern_id);
match itinerary_pattern {
Some(itinerary_pattern) => {
itinerary_pattern.trip_headsign.clone()
},
None => None
}
},
None => None
}
},
None => None
},
trip_short_name: match &trip.trip_id {
Some(trip_id) => {
let trip = trip_id_to_trip.get(&trip_id.clone());
match trip {
Some(trip) => {
match &trip.trip_short_name {
Some(trip_short_name) => Some(trip_short_name.clone()),
None => None
}
},
None => None
}
},
None => None
}
}
}),
position: vehicle_pos.position.clone(),
timestamp: vehicle_pos.timestamp.clone(),
vehicle: vehicle_pos.vehicle.clone(),
});

//insert the route cache
}
}
}
}
}
true
}
}
}
7 changes: 5 additions & 2 deletions src/aspen/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ mod import_alpenrose;
use catenary::aspen_dataset::GtfsRtType;
use catenary::postgres_tools::CatenaryPostgresPool;
use crossbeam::deque::{Injector, Steal};
use futures::join;
use gtfs_rt::FeedMessage;
use scc::HashMap as SccHashMap;
use futures::join;

mod async_threads_alpenrose;

Expand Down Expand Up @@ -210,7 +210,10 @@ async fn main() -> anyhow::Result<()> {
.for_each(|_| async {})
.await;

join!(async_from_alpenrose_processor_handler.join().unwrap(), leader_thread_handler.join().unwrap());
join!(
async_from_alpenrose_processor_handler.join().unwrap(),
leader_thread_handler.join().unwrap()
);

Ok(())
}
12 changes: 6 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,12 @@ pub mod aspen_dataset {
}

pub struct AspenisedVehicleRouteCache {
route_short_name: Option<String>,
route_long_name: Option<String>,
//route_short_name_langs: Option<HashMap<String, String>>,
//route_long_name_langs: Option<HashMap<String, String>>,
route_colour: Option<String>,
route_text_colour: Option<String>,
pub route_short_name: Option<String>,
pub route_long_name: Option<String>,
pub route_short_name_langs: Option<HashMap<String, String>>,
pub route_long_name_langs: Option<HashMap<String, String>>,
pub route_colour: Option<String>,
pub route_text_colour: Option<String>,
}

#[derive(Copy, Eq, Hash, PartialEq, Clone)]
Expand Down

0 comments on commit 55ebea3

Please sign in to comment.