From 4d7b10fba0ddf6d4d446cfeb015d3da250c559b3 Mon Sep 17 00:00:00 2001 From: Kyler Chin <7539174+kylerchin@users.noreply.github.com> Date: Sat, 30 Nov 2024 18:02:10 -0800 Subject: [PATCH] finish speed up itinerary local search --- src/birch/nearby_departures.rs | 130 ++++++++++++++++++++------------- 1 file changed, 79 insertions(+), 51 deletions(-) diff --git a/src/birch/nearby_departures.rs b/src/birch/nearby_departures.rs index e776f6f..ce1a729 100644 --- a/src/birch/nearby_departures.rs +++ b/src/birch/nearby_departures.rs @@ -19,7 +19,6 @@ use catenary::make_weekdays; use catenary::maple_syrup::DirectionPattern; use catenary::models::DirectionPatternRow; use catenary::models::ItineraryPatternMeta; -use catenary::models::ItineraryPatternRowNearbyLookup; use catenary::models::{CompressedTrip, ItineraryPatternRow}; use catenary::postgres_tools::CatenaryPostgresPool; use catenary::schema::gtfs::itinerary_pattern; @@ -52,6 +51,24 @@ use std::sync::Arc; use std::time::Instant; use strumbra::UniqueString; +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ItineraryPatternRowMerge { + pub onestop_feed_id: String, + pub attempt_id: String, + pub itinerary_pattern_id: String, + pub arrival_time_since_start: Option, + pub departure_time_since_start: Option, + pub interpolated_time_since_start: Option, + pub stop_id: CompactString, + pub chateau: CompactString, + pub gtfs_stop_sequence: u32, + pub direction_pattern_id: String, + pub trip_headsign: Option, + pub trip_headsign_translations: Option, + pub timezone: String, + pub route_id: CompactString +} + #[derive(Deserialize, Clone, Debug)] struct NearbyFromCoords { lat: f64, @@ -70,7 +87,8 @@ struct DeparturesFromStop { struct DeparturesDebug { stop_lookup_ms: u128, directions_ms: u128, - itineraries_ms: u128, + itinerary_meta_ms: u128, + itinerary_row_ms: u128, trips_ms: u128, total_time_ms: u128, } @@ -119,7 +137,7 @@ pub struct ValidTripSet { pub trip_id: CompactString, pub frequencies: Option>, pub trip_service_date: chrono::NaiveDate, - pub itinerary_options: Vec, + pub itinerary_options: Vec, pub reference_start_of_service_date: chrono::DateTime, pub itinerary_pattern_id: String, pub direction_pattern_id: String, @@ -533,51 +551,33 @@ pub async fn nearby_from_coords( //lock the table to prevent further changes let itineraries_and_seq_to_lookup = itineraries_and_seq_to_lookup; + let mut itineraries_and_seq_to_lookup_join = vec![]; + + for (chateau, set_of_directions) in itineraries_and_seq_to_lookup.iter() { + for (itinerary_pattern_id, stop_sequence) in set_of_directions.iter() { + itineraries_and_seq_to_lookup_join.push(( + chateau.clone(), + itinerary_pattern_id.clone(), + *stop_sequence, + )); + } + } + println!("Starting to search for itineraries"); let itineraries_timer = Instant::now(); - let seek_for_itineraries_list = futures::stream::iter(hashmap_of_directions_lookup.into_iter().map( - |(chateau, set_of_directions)| - { - let formatted_ask = format!( - "({})", - set_of_directions - .into_iter() - .map(|x| format!("('{}',{})" , x.0, x.1)) - .collect::>() - .join(",") - ); - - //EXTREMELY SLOW QUERY - diesel::sql_query( - format!( - "SELECT - itinerary_pattern.onestop_feed_id, - itinerary_pattern.attempt_id, - itinerary_pattern.itinerary_pattern_id, - itinerary_pattern.stop_sequence, - itinerary_pattern.arrival_time_since_start, - itinerary_pattern.departure_time_since_start, - itinerary_pattern.interpolated_time_since_start, - itinerary_pattern.stop_id, - itinerary_pattern.chateau, - itinerary_pattern.gtfs_stop_sequence, - itinerary_pattern_meta.direction_pattern_id, - itinerary_pattern_meta.trip_headsign, - itinerary_pattern_meta.trip_headsign_translations, - itinerary_pattern_meta.timezone, - itinerary_pattern_meta.route_id - FROM gtfs.itinerary_pattern JOIN - gtfs.itinerary_pattern_meta ON - itinerary_pattern_meta.itinerary_pattern_id = itinerary_pattern.itinerary_pattern_id - AND itinerary_pattern_meta.chateau = '{chateau}' - AND itinerary_pattern.chateau = '{chateau}' AND - itinerary_pattern.onestop_feed_id = itinerary_pattern_meta.onestop_feed_id - AND - (itinerary_pattern_meta.direction_pattern_id, itinerary_pattern.stop_sequence) IN {}",formatted_ask)).get_results(conn) + let new_seek_itinerary_list = futures::stream::iter(itineraries_and_seq_to_lookup_join.into_iter() + .map( + |(chateau, itin_id, stop_seq)| { + catenary::schema::gtfs::itinerary_pattern::dsl::itinerary_pattern + .filter(catenary::schema::gtfs::itinerary_pattern::dsl::chateau.eq(chateau.clone())) + .filter(catenary::schema::gtfs::itinerary_pattern::dsl::itinerary_pattern_id.eq(itin_id.clone())) + .filter(catenary::schema::gtfs::itinerary_pattern::dsl::stop_sequence.eq(stop_seq as i32)) + .select(catenary::models::ItineraryPatternRow::as_select()) + .load::(conn) } - )).buffer_unordered(8).collect::>>>().await; + )).buffer_unordered(24).collect::>>>().await; println!( "Finished getting itineraries in {:?}", @@ -590,10 +590,11 @@ pub async fn nearby_from_coords( let mut itins_per_chateau: HashMap> = HashMap::new(); - let mut itinerary_table: HashMap<(String, String), Vec> = + //(chateau, itinerary_pattern_id) -> ItineraryPatternRowMerge + let mut itinerary_table: HashMap<(String, String), Vec> = HashMap::new(); - for seek_for_itineraries in seek_for_itineraries_list { + for seek_for_itineraries in new_seek_itinerary_list { match seek_for_itineraries { Ok(itineraries) => { for itinerary in itineraries { @@ -606,17 +607,43 @@ pub async fn nearby_from_coords( } } + let itin_meta_ref = itin_meta_table + .get(&itinerary.chateau) + .unwrap() + .get(&itinerary.itinerary_pattern_id) + .unwrap(); + + if let Some(itinerary_direction_id) = &itin_meta_ref.direction_pattern_id { + + let new_itinerary = ItineraryPatternRowMerge { + onestop_feed_id: itinerary.onestop_feed_id.clone(), + attempt_id: itinerary.attempt_id.clone(), + itinerary_pattern_id: itinerary.itinerary_pattern_id.clone(), + arrival_time_since_start: itinerary.arrival_time_since_start, + departure_time_since_start: itinerary.departure_time_since_start, + interpolated_time_since_start: itinerary.interpolated_time_since_start, + stop_id: itinerary.stop_id.clone(), + chateau: itinerary.chateau.clone().into(), + gtfs_stop_sequence: itinerary.gtfs_stop_sequence, + direction_pattern_id: itinerary_direction_id.clone(), + trip_headsign: itin_meta_ref.trip_headsign.clone(), + trip_headsign_translations: itin_meta_ref.trip_headsign_translations.clone(), + timezone: itin_meta_ref.timezone.clone(), + route_id: itin_meta_ref.route_id.clone() + }; + match itinerary_table.entry(( itinerary.chateau.clone(), itinerary.itinerary_pattern_id.clone(), )) { Entry::Occupied(mut oe) => { - oe.get_mut().push(itinerary); + oe.get_mut().push(new_itinerary); } Entry::Vacant(mut ve) => { - ve.insert(vec![itinerary]); + ve.insert(vec![new_itinerary]); } } + } } } Err(err) => { @@ -831,7 +858,7 @@ pub async fn nearby_from_coords( .get(&(trip.chateau.clone(), trip.itinerary_pattern_id.clone())) .unwrap(); - let itin_ref: ItineraryPatternRowNearbyLookup = this_itin_list[0].clone(); + let itin_ref: ItineraryPatternRowMerge = this_itin_list[0].clone(); let time_since_start = match itin_ref.departure_time_since_start { Some(departure_time_since_start) => departure_time_since_start, @@ -879,7 +906,7 @@ pub async fn nearby_from_coords( reference_start_of_service_date: date.1, itinerary_pattern_id: itin_ref.itinerary_pattern_id.clone(), direction_pattern_id: itin_ref.direction_pattern_id.clone(), - route_id: (&itin_ref.route_id).into(), + route_id: itin_ref.route_id.clone(), trip_start_time: trip.start_time, trip_short_name: trip.trip_short_name.clone(), }; @@ -1072,7 +1099,7 @@ pub async fn nearby_from_coords( departure_realtime: departure_time_rt, arrival_schedule: None, arrival_realtime: None, - stop_id: (&trip.itinerary_options[0].stop_id).into(), + stop_id: trip.itinerary_options[0].stop_id.clone(), trip_short_name: trip.trip_short_name.clone(), tz: trip.timezone.as_ref().unwrap().name().to_string(), is_frequency: trip.frequencies.is_some(), @@ -1172,7 +1199,8 @@ pub async fn nearby_from_coords( debug: DeparturesDebug { stop_lookup_ms: end_stops_duration.as_millis(), directions_ms: directions_lookup_duration.as_millis(), - itineraries_ms: itinerary_duration.as_millis(), + itinerary_meta_ms: itin_meta_timer.elapsed().as_millis(), + itinerary_row_ms: itinerary_duration.as_millis(), trips_ms: trip_lookup_elapsed.as_millis(), total_time_ms: total_elapsed_time.as_millis(), },