diff --git a/Cargo.toml b/Cargo.toml index 8026dada..ba9d3536 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -98,7 +98,7 @@ tilejson = "0.4.1" tile-grid = "=0.2.2" zstd-safe = "7.1.0" hitbox = "0.1.1" -cached = "0.49.2" +cached = { version = "0.49.2", features = ["async_tokio_rt_multi_thread", "disk_store"] } geo-booleanop = "0.3.2" geo-clipper = "0.8.0" random-string = "1.1.0" @@ -113,6 +113,7 @@ ipnetwork = "0.20.0" pnet = "0.34.0" crossbeam = "0.8.4" leapfrog = { version = "0.3.0", features = ["stable_alloc"] } +scc = "2.1.0" [[bin]] name = "maple" diff --git a/src/alpenrose/main.rs b/src/alpenrose/main.rs index 694db776..4eda712f 100644 --- a/src/alpenrose/main.rs +++ b/src/alpenrose/main.rs @@ -401,8 +401,7 @@ async fn main() -> Result<(), Box> { if let Some(last_updated_assignment_time) = last_updated_assignment_time_zk_fetch { let last_updated_assignment_time = - bincode::deserialize(&last_updated_assignment_time.0) - .unwrap_or_else(|_| None::); + bincode::deserialize(&last_updated_assignment_time.0).unwrap_or(None::); //is the time newer than the last time we updated the assignments for this worker node? if last_updated_assignment_time != last_updated_ms_for_this_worker { diff --git a/src/aspen/async_threads_alpenrose.rs b/src/aspen/async_threads_alpenrose.rs new file mode 100644 index 00000000..36b5955e --- /dev/null +++ b/src/aspen/async_threads_alpenrose.rs @@ -0,0 +1,75 @@ +use catenary::aspen::lib::*; +use catenary::aspen_dataset::GtfsRtType; +use catenary::postgres_tools::CatenaryPostgresPool; +use crossbeam::deque::{Injector, Steal}; +use gtfs_rt::FeedMessage; +use scc::HashMap as SccHashMap; +use std::sync::Arc; +use std::thread; +use std::time::Duration; + +use crate::import_alpenrose::new_rt_data; + +pub async fn alpenrose_process_threads( + alpenrose_to_process_queue: Arc>, + authoritative_gtfs_rt_store: Arc>, + authoritative_data_store: Arc>, + conn_pool: Arc, + alpenrosethreadcount: usize, +) { + let mut handler_vec: Vec> = vec![]; + + for i in 0..alpenrosethreadcount { + handler_vec.push(thread::spawn({ + let alpenrose_to_process_queue = Arc::clone(&alpenrose_to_process_queue); + let authoritative_gtfs_rt_store = Arc::clone(&authoritative_gtfs_rt_store); + let authoritative_data_store = Arc::clone(&authoritative_data_store); + let conn_pool = Arc::clone(&conn_pool); + move || async move { + println!("Starting alpenrose reader thread {}", i); + alpenrose_loop_process_thread( + alpenrose_to_process_queue, + authoritative_gtfs_rt_store, + authoritative_data_store, + conn_pool, + ) + .await + } + })); + } + + for handle in handler_vec.into_iter() { + handle.join().unwrap().await; + } +} + +pub async fn alpenrose_loop_process_thread( + alpenrose_to_process_queue: Arc>, + authoritative_gtfs_rt_store: Arc>, + authoritative_data_store: Arc>, + conn_pool: Arc, +) { + loop { + if let Steal::Success(new_ingest_task) = alpenrose_to_process_queue.steal() { + new_rt_data( + Arc::clone(&authoritative_data_store), + Arc::clone(&authoritative_gtfs_rt_store), + new_ingest_task.chateau_id, + new_ingest_task.realtime_feed_id, + new_ingest_task.vehicles, + new_ingest_task.trips, + new_ingest_task.alerts, + new_ingest_task.has_vehicles, + new_ingest_task.has_trips, + new_ingest_task.has_alerts, + new_ingest_task.vehicles_response_code, + new_ingest_task.trips_response_code, + new_ingest_task.alerts_response_code, + Arc::clone(&conn_pool), + ) + .await; + } else { + thread::sleep(Duration::from_millis(1)) + } + } +} diff --git a/src/aspen/import_alpenrose.rs b/src/aspen/import_alpenrose.rs index 4bcd1794..4346545c 100644 --- a/src/aspen/import_alpenrose.rs +++ b/src/aspen/import_alpenrose.rs @@ -13,10 +13,12 @@ use diesel::SelectableHelper; use diesel_async::RunQueryDsl; use gtfs_rt::TripUpdate; use prost::Message; +use scc::HashMap as SccHashMap; use std::collections::BTreeMap; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::RwLock; +use gtfs_rt::FeedMessage; const MAKE_VEHICLES_FEED_LIST: [&str; 9] = [ "f-mta~nyc~rt~subway~1~2~3~4~5~6~7", @@ -31,7 +33,8 @@ const MAKE_VEHICLES_FEED_LIST: [&str; 9] = [ ]; pub async fn new_rt_data( - authoritative_data_store: Arc>>, + authoritative_data_store: Arc>, + authoritative_gtfs_rt:Arc>, chateau_id: String, realtime_feed_id: String, vehicles: Option>, @@ -49,7 +52,7 @@ pub async fn new_rt_data( let conn_pre = conn_pool.get().await; let conn = &mut conn_pre.unwrap(); - let vehicle = match vehicles { + let vehicles_gtfs_rt = match vehicles { Some(v) => match parse_gtfs_rt_message(&v.as_slice()) { Ok(v) => Some(v), Err(e) => { @@ -60,7 +63,7 @@ pub async fn new_rt_data( None => None, }; - let trip = match trips { + let trips_gtfs_rt = match trips { Some(t) => match parse_gtfs_rt_message(&t.as_slice()) { Ok(t) => Some(t), Err(e) => { @@ -71,7 +74,7 @@ pub async fn new_rt_data( None => None, }; - let alert = match alerts { + let alerts_gtfs_rt = match alerts { Some(a) => match parse_gtfs_rt_message(&a.as_slice()) { Ok(a) => Some(a), Err(e) => { @@ -84,6 +87,23 @@ pub async fn new_rt_data( //get and update raw gtfs_rt data + if let Some(vehicles_gtfs_rt) = &vehicles_gtfs_rt { + authoritative_gtfs_rt.entry((realtime_feed_id.clone(), GtfsRtType::VehiclePositions)).and_modify(|gtfs_data| *gtfs_data = vehicles_gtfs_rt.clone()) + .or_insert(vehicles_gtfs_rt.clone()); + } + + if let Some(trip_gtfs_rt) = &trips_gtfs_rt { + authoritative_gtfs_rt.entry((realtime_feed_id.clone(), GtfsRtType::TripUpdates)) + .and_modify(|gtfs_data| *gtfs_data = trip_gtfs_rt.clone()) + .or_insert(trip_gtfs_rt.clone()); + } + + if let Some(alerts_gtfs_rt) = &alerts_gtfs_rt { + authoritative_gtfs_rt.entry((realtime_feed_id.clone(), GtfsRtType::Alerts)) + .and_modify(|gtfs_data| *gtfs_data = alerts_gtfs_rt.clone()) + .or_insert(alerts_gtfs_rt.clone()); + } + let this_chateau_dashmap = authoritative_data_store.get(&realtime_feed_id); //if this item is empty, create it @@ -97,46 +117,13 @@ pub async fn new_rt_data( impacted_routes_alerts: None, impacted_stops_alerts: None, impacted_routes_stops_alerts: None, - raw_gtfs_rt: BTreeMap::new(), }; - authoritative_data_store.insert(realtime_feed_id.clone(), RwLock::new(new_aspenised_data)); + let _ = authoritative_data_store.insert(realtime_feed_id.clone(), new_aspenised_data); } //now it exists! let this_chateau_dashmap = authoritative_data_store.get(&realtime_feed_id).unwrap(); - let mut this_chateau_lock = this_chateau_dashmap.write().await; - - let mutable_raw_gtfs_rt = this_chateau_lock - .raw_gtfs_rt - .get_mut(&realtime_feed_id.clone()); - - match mutable_raw_gtfs_rt { - Some(m) => { - if m.vehicle_positions.is_none() && vehicle.is_some() { - m.vehicle_positions = vehicle; - } - - if m.trip_updates.is_none() && trip.is_some() { - m.trip_updates = trip; - } - - if m.alerts.is_none() && alert.is_some() { - m.alerts = alert; - } - } - None => { - let mut new_gtfs_rt_data = catenary::aspen_dataset::GtfsRtDataStore { - vehicle_positions: vehicle, - trip_updates: trip, - alerts: alert, - }; - this_chateau_lock - .raw_gtfs_rt - .insert(realtime_feed_id.clone(), new_gtfs_rt_data); - } - } - // take all the gtfs rt data and merge it together let mut vehicle_positions: Vec = Vec::new(); @@ -145,6 +132,8 @@ pub async fn new_rt_data( let mut trip_updates_lookup_by_trip_id_to_trip_update_ids: HashMap> = HashMap::new(); + use catenary::schema::gtfs::static_download_attempts as static_download_attempts_pg_schema; + use catenary::schema::gtfs::chateaus as chateaus_pg_schema; use catenary::schema::gtfs::routes as routes_pg_schema; //get all routes inside chateau from postgres db @@ -155,19 +144,19 @@ pub async fn new_rt_data( .await .unwrap(); - for (realtime_feed_id, gtfs_dataset) in this_chateau_lock.raw_gtfs_rt.iter() { - if gtfs_dataset.vehicle_positions.is_some() { + //for (realtime_feed_id, gtfs_dataset) in this_chateau_lock.raw_gtfs_rt.iter() { + // if gtfs_dataset.vehicle_positions.is_some() { - //for trips, batch lookups by groups of 100 - //collect all common itinerary patterns and look those up + //for trips, batch lookups by groups of 100 + //collect all common itinerary patterns and look those up - //combine them together and insert them with the vehicles positions - } + //combine them together and insert them with the vehicles positions + // } - // trips can be left fairly raw for now, with a lot of data references + // trips can be left fairly raw for now, with a lot of data references - // ignore alerts for now, as well as trip modifications - } + // ignore alerts for now, as well as trip modifications + // } true } diff --git a/src/aspen/leader_thread.rs b/src/aspen/leader_thread.rs index c686f66a..607d4e9f 100644 --- a/src/aspen/leader_thread.rs +++ b/src/aspen/leader_thread.rs @@ -188,6 +188,11 @@ pub async fn aspen_leader_thread( for (index, (chateau_id, chateau)) in chateau_list_lock.chateaus.iter().enumerate() { + // in the future, this should be rewritten to prevent full reshuffling of the entire assignment list + // For example, taking only the orphened nodes and redistributing them + // if a new node is added, carefully reassign the data + + //for now this simply round robins the chateaus around let selected_aspen_worker_to_assign = workers_nodes_lock[index % workers_nodes_lock.len()].clone(); diff --git a/src/aspen/lib.rs b/src/aspen/lib.rs index 8237354a..c8e15db4 100644 --- a/src/aspen/lib.rs +++ b/src/aspen/lib.rs @@ -49,3 +49,19 @@ pub struct RealtimeFeedMetadataZookeeper { pub struct ChateausLeaderHashMap { pub chateaus: BTreeMap, } + +#[derive(Clone)] +pub struct ProcessAlpenroseData { + pub chateau_id: String, + pub realtime_feed_id: String, + pub vehicles: Option>, + pub trips: Option>, + pub alerts: Option>, + pub has_vehicles: bool, + pub has_trips: bool, + pub has_alerts: bool, + pub vehicles_response_code: Option, + pub trips_response_code: Option, + pub alerts_response_code: Option, + pub time_of_submission_ms: u64, +} diff --git a/src/aspen/main.rs b/src/aspen/main.rs index 48cbc639..fa9f9f87 100644 --- a/src/aspen/main.rs +++ b/src/aspen/main.rs @@ -24,18 +24,16 @@ clippy::iter_nth, clippy::iter_cloned_collect )] - use catenary::aspen::lib::*; use catenary::postgres_tools::make_async_pool; use clap::Parser; -use dashmap::DashMap; use futures::{future, prelude::*}; -use leapfrog::leapmap::LeapMap; use rand::{ distributions::{Distribution, Uniform}, thread_rng, }; use std::sync::Arc; +use std::thread; use std::{ net::{IpAddr, Ipv6Addr, SocketAddr}, time::Duration, @@ -51,9 +49,14 @@ use uuid::Uuid; mod leader_thread; use leader_thread::aspen_leader_thread; mod import_alpenrose; -use catenary::aspen_dataset::GtfsRtDataStore; +use catenary::aspen_dataset::GtfsRtType; use catenary::postgres_tools::CatenaryPostgresPool; use crossbeam::deque::{Injector, Steal}; +use gtfs_rt::FeedMessage; +use scc::HashMap as SccHashMap; +use futures::join; + +mod async_threads_alpenrose; // This is the type that implements the generated World trait. It is the business logic // and is used to start the server. @@ -62,28 +65,13 @@ pub struct AspenServer { pub addr: SocketAddr, pub this_tailscale_ip: IpAddr, pub worker_id: Arc, // Worker Id for this instance of Aspen - pub authoritative_data_store: - Arc>>, + pub authoritative_data_store: Arc>, + // Backed up in redis as well, program can be shut down and restarted without data loss + pub authoritative_gtfs_rt_store: Arc>, pub conn_pool: Arc, pub alpenrose_to_process_queue: Arc>, } -#[derive(Clone)] -struct ProcessAlpenroseData { - chateau_id: String, - realtime_feed_id: String, - vehicles: Option>, - trips: Option>, - alerts: Option>, - has_vehicles: bool, - has_trips: bool, - has_alerts: bool, - vehicles_response_code: Option, - trips_response_code: Option, - alerts_response_code: Option, - time_of_submission_ms: u64, -} - impl AspenRpc for AspenServer { async fn hello(self, _: context::Context, name: String) -> String { let sleep_time = @@ -108,22 +96,20 @@ impl AspenRpc for AspenServer { alerts_response_code: Option, time_of_submission_ms: u64, ) -> bool { - self.alpenrose_to_process_queue.push( - ProcessAlpenroseData { - chateau_id, - realtime_feed_id, - vehicles, - trips, - alerts, - has_vehicles, - has_trips, - has_alerts, - vehicles_response_code, - trips_response_code, - alerts_response_code, - time_of_submission_ms - } - ); + self.alpenrose_to_process_queue.push(ProcessAlpenroseData { + chateau_id, + realtime_feed_id, + vehicles, + trips, + alerts, + has_vehicles, + has_trips, + has_alerts, + vehicles_response_code, + trips_response_code, + alerts_response_code, + time_of_submission_ms, + }); true } } @@ -132,6 +118,12 @@ async fn spawn(fut: impl Future + Send + 'static) { tokio::spawn(fut); } +#[derive(Parser, Debug)] +struct Args { + alpenrosethreadcount: usize, + channels: usize, +} + #[tokio::main] async fn main() -> anyhow::Result<()> { // Worker Id for this instance of Aspen @@ -141,7 +133,7 @@ async fn main() -> anyhow::Result<()> { let conn_pool: CatenaryPostgresPool = make_async_pool().await.unwrap(); let arc_conn_pool: Arc = Arc::new(conn_pool); - // let flags = Flags::parse(); + let args = Args::parse(); //init_tracing("Tarpc Example Server")?; let tailscale_ip = catenary::tailscale::interface().expect("no tailscale interface found"); @@ -156,49 +148,69 @@ async fn main() -> anyhow::Result<()> { let chateau_list: Arc>> = Arc::new(Mutex::new(None)); let process_from_alpenrose_queue = Arc::new(Injector::::new()); + let raw_gtfs = Arc::new(SccHashMap::new()); + let authoritative_data_store = Arc::new(SccHashMap::new()); //run both the leader and the listener simultaniously - futures::join!( - { - let workers_nodes = Arc::clone(&workers_nodes); - let chateau_list = Arc::clone(&chateau_list); - let this_worker_id = Arc::clone(&this_worker_id); - let tailscale_ip = Arc::new(tailscale_ip); - let arc_conn_pool = Arc::clone(&arc_conn_pool); - async { - aspen_leader_thread( - workers_nodes, - chateau_list, - this_worker_id, - tailscale_ip, - arc_conn_pool, - ) - .await; - } - }, - listener - // Ignore accept errors. - .filter_map(|r| future::ready(r.ok())) - .map(server::BaseChannel::with_defaults) - // Limit channels to 1 per IP. - .max_channels_per_key(1, |t| t.transport().peer_addr().unwrap().ip()) - // serve is generated by the service attribute. It takes as input any type implementing - // the generated World trait. - .map(|channel| { - let server = AspenServer { - addr: channel.transport().peer_addr().unwrap(), - this_tailscale_ip: tailscale_ip, - worker_id: Arc::clone(&this_worker_id), - authoritative_data_store: Arc::new(DashMap::new()), - conn_pool: Arc::clone(&arc_conn_pool), - process_from_alpenrose_queue: Arc::clone(&process_from_alpenrose_queue) - }; - channel.execute(server.serve()).for_each(spawn) - }) - // Max n channels. - .buffer_unordered(32) - .for_each(|_| async {}) - ); + + let leader_thread_handler = thread::spawn({ + let workers_nodes = Arc::clone(&workers_nodes); + let chateau_list = Arc::clone(&chateau_list); + let this_worker_id = Arc::clone(&this_worker_id); + let tailscale_ip = Arc::new(tailscale_ip); + let arc_conn_pool = Arc::clone(&arc_conn_pool); + move || async { + aspen_leader_thread( + workers_nodes, + chateau_list, + this_worker_id, + tailscale_ip, + arc_conn_pool, + ) + .await; + } + }); + + let async_from_alpenrose_processor_handler = thread::spawn({ + let alpenrose_to_process_queue = Arc::clone(&process_from_alpenrose_queue); + let authoritative_gtfs_rt_store = Arc::clone(&raw_gtfs); + let authoritative_data_store = Arc::clone(&authoritative_data_store); + let conn_pool = Arc::clone(&arc_conn_pool); + let thread_count = args.alpenrosethreadcount.clone(); + move || async move { + async_threads_alpenrose::alpenrose_process_threads( + alpenrose_to_process_queue, + authoritative_gtfs_rt_store, + authoritative_data_store, + conn_pool, + thread_count, + ) + .await; + } + }); + + listener + // Ignore accept errors. + .filter_map(|r| future::ready(r.ok())) + .map(server::BaseChannel::with_defaults) + .map(|channel| { + let server = AspenServer { + addr: channel.transport().peer_addr().unwrap(), + this_tailscale_ip: tailscale_ip, + worker_id: Arc::clone(&this_worker_id), + authoritative_data_store: Arc::clone(&authoritative_data_store), + conn_pool: Arc::clone(&arc_conn_pool), + alpenrose_to_process_queue: Arc::clone(&process_from_alpenrose_queue), + authoritative_gtfs_rt_store: Arc::clone(&raw_gtfs), + }; + channel.execute(server.serve()).for_each(spawn) + }) + // Max n channels. + .buffer_unordered(args.channels) + .for_each(|_| async {}) + .await; + + join!(async_from_alpenrose_processor_handler.join().unwrap(), leader_thread_handler.join().unwrap()); Ok(()) } diff --git a/src/birch/server.rs b/src/birch/server.rs index 5f8c3e4f..f3af3fef 100644 --- a/src/birch/server.rs +++ b/src/birch/server.rs @@ -825,7 +825,7 @@ async fn chateaus( ), ); - let feature = geojson::Feature { + geojson::Feature { bbox: None, geometry: Some(geojson::Geometry { bbox: None, @@ -835,9 +835,7 @@ async fn chateaus( id: Some(geojson::feature::Id::String(chateau.chateau.clone())), properties: Some(properties), foreign_members: None, - }; - - feature + } }) .collect::>(); diff --git a/src/lib.rs b/src/lib.rs index 1d88196b..af9bc290 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -167,7 +167,6 @@ pub mod aspen_dataset { pub impacted_routes_alerts: Option>>, pub impacted_stops_alerts: Option>>, pub impacted_routes_stops_alerts: Option>>, - pub raw_gtfs_rt: BTreeMap, } pub struct AspenisedVehiclePosition { @@ -193,10 +192,11 @@ pub mod aspen_dataset { route_text_colour: Option, } - pub struct GtfsRtDataStore { - pub vehicle_positions: Option, - pub trip_updates: Option, - pub alerts: Option, + #[derive(Copy, Eq, Hash, PartialEq, Clone)] + pub enum GtfsRtType { + VehiclePositions, + TripUpdates, + Alerts, } } diff --git a/src/maple/refresh_metadata_tables.rs b/src/maple/refresh_metadata_tables.rs index 7ae0dfbe..fb2cf76d 100644 --- a/src/maple/refresh_metadata_tables.rs +++ b/src/maple/refresh_metadata_tables.rs @@ -126,9 +126,8 @@ pub async fn refresh_metadata_assignments( false => Some({ let mut merged_hull: geo::MultiPolygon = hulls_from_static_geo_types[0].clone().into(); - for i in 1..hulls_from_static_geo_types.len() { - merged_hull = - merged_hull.union(&hulls_from_static_geo_types[i].clone(), 10000.0); + for hull_subset in hulls_from_static_geo_types.iter().skip(1) { + merged_hull = merged_hull.union(&hull_subset.clone(), 10000.0); } merged_hull }), diff --git a/src/maple_syrup/mod.rs b/src/maple_syrup/mod.rs index b8a1a73e..f2466808 100644 --- a/src/maple_syrup/mod.rs +++ b/src/maple_syrup/mod.rs @@ -155,13 +155,10 @@ pub fn reduce(gtfs: >fs_structures::Gtfs) -> ResponseFromReduce { let itinerary_cover = ItineraryCover { stop_sequences: stop_diffs, - direction_id: match trip.direction_id { - Some(direction) => Some(match direction { - DirectionType::Outbound => false, - DirectionType::Inbound => true, - }), - None => None, - }, + direction_id: trip.direction_id.map(|direction| match direction { + DirectionType::Outbound => false, + DirectionType::Inbound => true, + }), route_id: trip.route_id.clone(), trip_headsign: trip.trip_headsign.clone(), timezone: timezone,