Skip to content

Commit

Permalink
Multithreaded system for aspen setup
Browse files Browse the repository at this point in the history
  • Loading branch information
kylerchin committed Apr 23, 2024
1 parent baf04de commit bfac42c
Show file tree
Hide file tree
Showing 11 changed files with 240 additions and 149 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ tilejson = "0.4.1"
tile-grid = "=0.2.2"
zstd-safe = "7.1.0"
hitbox = "0.1.1"
cached = "0.49.2"
cached = { version = "0.49.2", features = ["async_tokio_rt_multi_thread", "disk_store"] }
geo-booleanop = "0.3.2"
geo-clipper = "0.8.0"
random-string = "1.1.0"
Expand All @@ -113,6 +113,7 @@ ipnetwork = "0.20.0"
pnet = "0.34.0"
crossbeam = "0.8.4"
leapfrog = { version = "0.3.0", features = ["stable_alloc"] }
scc = "2.1.0"

[[bin]]
name = "maple"
Expand Down
3 changes: 1 addition & 2 deletions src/alpenrose/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,8 +401,7 @@ async fn main() -> Result<(), Box<dyn Error + Sync + Send>> {

if let Some(last_updated_assignment_time) = last_updated_assignment_time_zk_fetch {
let last_updated_assignment_time =
bincode::deserialize(&last_updated_assignment_time.0)
.unwrap_or_else(|_| None::<u64>);
bincode::deserialize(&last_updated_assignment_time.0).unwrap_or(None::<u64>);

//is the time newer than the last time we updated the assignments for this worker node?
if last_updated_assignment_time != last_updated_ms_for_this_worker {
Expand Down
75 changes: 75 additions & 0 deletions src/aspen/async_threads_alpenrose.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use catenary::aspen::lib::*;
use catenary::aspen_dataset::GtfsRtType;
use catenary::postgres_tools::CatenaryPostgresPool;
use crossbeam::deque::{Injector, Steal};
use gtfs_rt::FeedMessage;
use scc::HashMap as SccHashMap;
use std::sync::Arc;
use std::thread;
use std::time::Duration;

use crate::import_alpenrose::new_rt_data;

pub async fn alpenrose_process_threads(
alpenrose_to_process_queue: Arc<Injector<ProcessAlpenroseData>>,
authoritative_gtfs_rt_store: Arc<SccHashMap<(String, GtfsRtType), FeedMessage>>,
authoritative_data_store: Arc<SccHashMap<String, catenary::aspen_dataset::AspenisedData>>,
conn_pool: Arc<CatenaryPostgresPool>,
alpenrosethreadcount: usize,
) {
let mut handler_vec: Vec<thread::JoinHandle<_>> = vec![];

for i in 0..alpenrosethreadcount {
handler_vec.push(thread::spawn({
let alpenrose_to_process_queue = Arc::clone(&alpenrose_to_process_queue);
let authoritative_gtfs_rt_store = Arc::clone(&authoritative_gtfs_rt_store);
let authoritative_data_store = Arc::clone(&authoritative_data_store);
let conn_pool = Arc::clone(&conn_pool);
move || async move {
println!("Starting alpenrose reader thread {}", i);
alpenrose_loop_process_thread(
alpenrose_to_process_queue,
authoritative_gtfs_rt_store,
authoritative_data_store,
conn_pool,
)
.await
}
}));
}

for handle in handler_vec.into_iter() {
handle.join().unwrap().await;
}
}

pub async fn alpenrose_loop_process_thread(
alpenrose_to_process_queue: Arc<Injector<ProcessAlpenroseData>>,
authoritative_gtfs_rt_store: Arc<SccHashMap<(String, GtfsRtType), FeedMessage>>,
authoritative_data_store: Arc<SccHashMap<String, catenary::aspen_dataset::AspenisedData>>,
conn_pool: Arc<CatenaryPostgresPool>,
) {
loop {
if let Steal::Success(new_ingest_task) = alpenrose_to_process_queue.steal() {
new_rt_data(
Arc::clone(&authoritative_data_store),
Arc::clone(&authoritative_gtfs_rt_store),
new_ingest_task.chateau_id,
new_ingest_task.realtime_feed_id,
new_ingest_task.vehicles,
new_ingest_task.trips,
new_ingest_task.alerts,
new_ingest_task.has_vehicles,
new_ingest_task.has_trips,
new_ingest_task.has_alerts,
new_ingest_task.vehicles_response_code,
new_ingest_task.trips_response_code,
new_ingest_task.alerts_response_code,
Arc::clone(&conn_pool),
)
.await;
} else {
thread::sleep(Duration::from_millis(1))
}
}
}
83 changes: 36 additions & 47 deletions src/aspen/import_alpenrose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ use diesel::SelectableHelper;
use diesel_async::RunQueryDsl;
use gtfs_rt::TripUpdate;
use prost::Message;
use scc::HashMap as SccHashMap;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use gtfs_rt::FeedMessage;

const MAKE_VEHICLES_FEED_LIST: [&str; 9] = [
"f-mta~nyc~rt~subway~1~2~3~4~5~6~7",
Expand All @@ -31,7 +33,8 @@ const MAKE_VEHICLES_FEED_LIST: [&str; 9] = [
];

pub async fn new_rt_data(
authoritative_data_store: Arc<DashMap<String, RwLock<catenary::aspen_dataset::AspenisedData>>>,
authoritative_data_store: Arc<SccHashMap<String, catenary::aspen_dataset::AspenisedData>>,
authoritative_gtfs_rt:Arc<SccHashMap<(String, GtfsRtType), FeedMessage>>,
chateau_id: String,
realtime_feed_id: String,
vehicles: Option<Vec<u8>>,
Expand All @@ -49,7 +52,7 @@ pub async fn new_rt_data(
let conn_pre = conn_pool.get().await;
let conn = &mut conn_pre.unwrap();

let vehicle = match vehicles {
let vehicles_gtfs_rt = match vehicles {
Some(v) => match parse_gtfs_rt_message(&v.as_slice()) {
Ok(v) => Some(v),
Err(e) => {
Expand All @@ -60,7 +63,7 @@ pub async fn new_rt_data(
None => None,
};

let trip = match trips {
let trips_gtfs_rt = match trips {
Some(t) => match parse_gtfs_rt_message(&t.as_slice()) {
Ok(t) => Some(t),
Err(e) => {
Expand All @@ -71,7 +74,7 @@ pub async fn new_rt_data(
None => None,
};

let alert = match alerts {
let alerts_gtfs_rt = match alerts {
Some(a) => match parse_gtfs_rt_message(&a.as_slice()) {
Ok(a) => Some(a),
Err(e) => {
Expand All @@ -84,6 +87,23 @@ pub async fn new_rt_data(

//get and update raw gtfs_rt data

if let Some(vehicles_gtfs_rt) = &vehicles_gtfs_rt {
authoritative_gtfs_rt.entry((realtime_feed_id.clone(), GtfsRtType::VehiclePositions)).and_modify(|gtfs_data| *gtfs_data = vehicles_gtfs_rt.clone())
.or_insert(vehicles_gtfs_rt.clone());
}

if let Some(trip_gtfs_rt) = &trips_gtfs_rt {
authoritative_gtfs_rt.entry((realtime_feed_id.clone(), GtfsRtType::TripUpdates))
.and_modify(|gtfs_data| *gtfs_data = trip_gtfs_rt.clone())
.or_insert(trip_gtfs_rt.clone());
}

if let Some(alerts_gtfs_rt) = &alerts_gtfs_rt {
authoritative_gtfs_rt.entry((realtime_feed_id.clone(), GtfsRtType::Alerts))
.and_modify(|gtfs_data| *gtfs_data = alerts_gtfs_rt.clone())
.or_insert(alerts_gtfs_rt.clone());
}

let this_chateau_dashmap = authoritative_data_store.get(&realtime_feed_id);

//if this item is empty, create it
Expand All @@ -97,46 +117,13 @@ pub async fn new_rt_data(
impacted_routes_alerts: None,
impacted_stops_alerts: None,
impacted_routes_stops_alerts: None,
raw_gtfs_rt: BTreeMap::new(),
};
authoritative_data_store.insert(realtime_feed_id.clone(), RwLock::new(new_aspenised_data));
let _ = authoritative_data_store.insert(realtime_feed_id.clone(), new_aspenised_data);
}

//now it exists!
let this_chateau_dashmap = authoritative_data_store.get(&realtime_feed_id).unwrap();

let mut this_chateau_lock = this_chateau_dashmap.write().await;

let mutable_raw_gtfs_rt = this_chateau_lock
.raw_gtfs_rt
.get_mut(&realtime_feed_id.clone());

match mutable_raw_gtfs_rt {
Some(m) => {
if m.vehicle_positions.is_none() && vehicle.is_some() {
m.vehicle_positions = vehicle;
}

if m.trip_updates.is_none() && trip.is_some() {
m.trip_updates = trip;
}

if m.alerts.is_none() && alert.is_some() {
m.alerts = alert;
}
}
None => {
let mut new_gtfs_rt_data = catenary::aspen_dataset::GtfsRtDataStore {
vehicle_positions: vehicle,
trip_updates: trip,
alerts: alert,
};
this_chateau_lock
.raw_gtfs_rt
.insert(realtime_feed_id.clone(), new_gtfs_rt_data);
}
}

// take all the gtfs rt data and merge it together

let mut vehicle_positions: Vec<AspenisedVehiclePosition> = Vec::new();
Expand All @@ -145,6 +132,8 @@ pub async fn new_rt_data(
let mut trip_updates_lookup_by_trip_id_to_trip_update_ids: HashMap<String, Vec<String>> =
HashMap::new();

use catenary::schema::gtfs::static_download_attempts as static_download_attempts_pg_schema;
use catenary::schema::gtfs::chateaus as chateaus_pg_schema;
use catenary::schema::gtfs::routes as routes_pg_schema;

//get all routes inside chateau from postgres db
Expand All @@ -155,19 +144,19 @@ pub async fn new_rt_data(
.await
.unwrap();

for (realtime_feed_id, gtfs_dataset) in this_chateau_lock.raw_gtfs_rt.iter() {
if gtfs_dataset.vehicle_positions.is_some() {
//for (realtime_feed_id, gtfs_dataset) in this_chateau_lock.raw_gtfs_rt.iter() {
// if gtfs_dataset.vehicle_positions.is_some() {

//for trips, batch lookups by groups of 100
//collect all common itinerary patterns and look those up
//for trips, batch lookups by groups of 100
//collect all common itinerary patterns and look those up

//combine them together and insert them with the vehicles positions
}
//combine them together and insert them with the vehicles positions
// }

// trips can be left fairly raw for now, with a lot of data references
// trips can be left fairly raw for now, with a lot of data references

// ignore alerts for now, as well as trip modifications
}
// ignore alerts for now, as well as trip modifications
// }

true
}
5 changes: 5 additions & 0 deletions src/aspen/leader_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ pub async fn aspen_leader_thread(
for (index, (chateau_id, chateau)) in
chateau_list_lock.chateaus.iter().enumerate()
{
// in the future, this should be rewritten to prevent full reshuffling of the entire assignment list
// For example, taking only the orphened nodes and redistributing them
// if a new node is added, carefully reassign the data

//for now this simply round robins the chateaus around
let selected_aspen_worker_to_assign =
workers_nodes_lock[index % workers_nodes_lock.len()].clone();

Expand Down
16 changes: 16 additions & 0 deletions src/aspen/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,19 @@ pub struct RealtimeFeedMetadataZookeeper {
pub struct ChateausLeaderHashMap {
pub chateaus: BTreeMap<String, ChateauDataNoGeometry>,
}

#[derive(Clone)]
pub struct ProcessAlpenroseData {
pub chateau_id: String,
pub realtime_feed_id: String,
pub vehicles: Option<Vec<u8>>,
pub trips: Option<Vec<u8>>,
pub alerts: Option<Vec<u8>>,
pub has_vehicles: bool,
pub has_trips: bool,
pub has_alerts: bool,
pub vehicles_response_code: Option<u16>,
pub trips_response_code: Option<u16>,
pub alerts_response_code: Option<u16>,
pub time_of_submission_ms: u64,
}
Loading

0 comments on commit bfac42c

Please sign in to comment.