Skip to content

Commit

Permalink
finish speed up itinerary local search
Browse files Browse the repository at this point in the history
  • Loading branch information
kylerchin committed Dec 1, 2024
1 parent 9c6b713 commit 4d7b10f
Showing 1 changed file with 79 additions and 51 deletions.
130 changes: 79 additions & 51 deletions src/birch/nearby_departures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use catenary::make_weekdays;
use catenary::maple_syrup::DirectionPattern;
use catenary::models::DirectionPatternRow;
use catenary::models::ItineraryPatternMeta;
use catenary::models::ItineraryPatternRowNearbyLookup;
use catenary::models::{CompressedTrip, ItineraryPatternRow};
use catenary::postgres_tools::CatenaryPostgresPool;
use catenary::schema::gtfs::itinerary_pattern;
Expand Down Expand Up @@ -52,6 +51,24 @@ use std::sync::Arc;
use std::time::Instant;
use strumbra::UniqueString;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ItineraryPatternRowMerge {
pub onestop_feed_id: String,
pub attempt_id: String,
pub itinerary_pattern_id: String,
pub arrival_time_since_start: Option<i32>,
pub departure_time_since_start: Option<i32>,
pub interpolated_time_since_start: Option<i32>,
pub stop_id: CompactString,
pub chateau: CompactString,
pub gtfs_stop_sequence: u32,
pub direction_pattern_id: String,
pub trip_headsign: Option<String>,
pub trip_headsign_translations: Option<serde_json::Value>,
pub timezone: String,
pub route_id: CompactString
}

#[derive(Deserialize, Clone, Debug)]
struct NearbyFromCoords {
lat: f64,
Expand All @@ -70,7 +87,8 @@ struct DeparturesFromStop {
struct DeparturesDebug {
stop_lookup_ms: u128,
directions_ms: u128,
itineraries_ms: u128,
itinerary_meta_ms: u128,
itinerary_row_ms: u128,
trips_ms: u128,
total_time_ms: u128,
}
Expand Down Expand Up @@ -119,7 +137,7 @@ pub struct ValidTripSet {
pub trip_id: CompactString,
pub frequencies: Option<Vec<gtfs_structures::Frequency>>,
pub trip_service_date: chrono::NaiveDate,
pub itinerary_options: Vec<ItineraryPatternRowNearbyLookup>,
pub itinerary_options: Vec<ItineraryPatternRowMerge>,
pub reference_start_of_service_date: chrono::DateTime<chrono_tz::Tz>,
pub itinerary_pattern_id: String,
pub direction_pattern_id: String,
Expand Down Expand Up @@ -533,51 +551,33 @@ pub async fn nearby_from_coords(
//lock the table to prevent further changes
let itineraries_and_seq_to_lookup = itineraries_and_seq_to_lookup;

let mut itineraries_and_seq_to_lookup_join = vec![];

for (chateau, set_of_directions) in itineraries_and_seq_to_lookup.iter() {
for (itinerary_pattern_id, stop_sequence) in set_of_directions.iter() {
itineraries_and_seq_to_lookup_join.push((
chateau.clone(),
itinerary_pattern_id.clone(),
*stop_sequence,
));
}
}

println!("Starting to search for itineraries");

let itineraries_timer = Instant::now();

let seek_for_itineraries_list = futures::stream::iter(hashmap_of_directions_lookup.into_iter().map(
|(chateau, set_of_directions)|
{
let formatted_ask = format!(
"({})",
set_of_directions
.into_iter()
.map(|x| format!("('{}',{})" , x.0, x.1))
.collect::<Vec<String>>()
.join(",")
);

//EXTREMELY SLOW QUERY
diesel::sql_query(
format!(
"SELECT
itinerary_pattern.onestop_feed_id,
itinerary_pattern.attempt_id,
itinerary_pattern.itinerary_pattern_id,
itinerary_pattern.stop_sequence,
itinerary_pattern.arrival_time_since_start,
itinerary_pattern.departure_time_since_start,
itinerary_pattern.interpolated_time_since_start,
itinerary_pattern.stop_id,
itinerary_pattern.chateau,
itinerary_pattern.gtfs_stop_sequence,
itinerary_pattern_meta.direction_pattern_id,
itinerary_pattern_meta.trip_headsign,
itinerary_pattern_meta.trip_headsign_translations,
itinerary_pattern_meta.timezone,
itinerary_pattern_meta.route_id
FROM gtfs.itinerary_pattern JOIN
gtfs.itinerary_pattern_meta ON
itinerary_pattern_meta.itinerary_pattern_id = itinerary_pattern.itinerary_pattern_id
AND itinerary_pattern_meta.chateau = '{chateau}'
AND itinerary_pattern.chateau = '{chateau}' AND
itinerary_pattern.onestop_feed_id = itinerary_pattern_meta.onestop_feed_id
AND
(itinerary_pattern_meta.direction_pattern_id, itinerary_pattern.stop_sequence) IN {}",formatted_ask)).get_results(conn)
let new_seek_itinerary_list = futures::stream::iter(itineraries_and_seq_to_lookup_join.into_iter()
.map(
|(chateau, itin_id, stop_seq)| {
catenary::schema::gtfs::itinerary_pattern::dsl::itinerary_pattern
.filter(catenary::schema::gtfs::itinerary_pattern::dsl::chateau.eq(chateau.clone()))
.filter(catenary::schema::gtfs::itinerary_pattern::dsl::itinerary_pattern_id.eq(itin_id.clone()))
.filter(catenary::schema::gtfs::itinerary_pattern::dsl::stop_sequence.eq(stop_seq as i32))
.select(catenary::models::ItineraryPatternRow::as_select())
.load::<catenary::models::ItineraryPatternRow>(conn)
}
)).buffer_unordered(8).collect::<Vec<diesel::QueryResult<Vec<ItineraryPatternRowNearbyLookup>>>>().await;
)).buffer_unordered(24).collect::<Vec<diesel::QueryResult<Vec<ItineraryPatternRow>>>>().await;

println!(
"Finished getting itineraries in {:?}",
Expand All @@ -590,10 +590,11 @@ pub async fn nearby_from_coords(

let mut itins_per_chateau: HashMap<String, HashSet<String>> = HashMap::new();

let mut itinerary_table: HashMap<(String, String), Vec<ItineraryPatternRowNearbyLookup>> =
//(chateau, itinerary_pattern_id) -> ItineraryPatternRowMerge
let mut itinerary_table: HashMap<(String, String), Vec<ItineraryPatternRowMerge>> =
HashMap::new();

for seek_for_itineraries in seek_for_itineraries_list {
for seek_for_itineraries in new_seek_itinerary_list {
match seek_for_itineraries {
Ok(itineraries) => {
for itinerary in itineraries {
Expand All @@ -606,17 +607,43 @@ pub async fn nearby_from_coords(
}
}

let itin_meta_ref = itin_meta_table
.get(&itinerary.chateau)
.unwrap()
.get(&itinerary.itinerary_pattern_id)
.unwrap();

if let Some(itinerary_direction_id) = &itin_meta_ref.direction_pattern_id {

let new_itinerary = ItineraryPatternRowMerge {
onestop_feed_id: itinerary.onestop_feed_id.clone(),
attempt_id: itinerary.attempt_id.clone(),
itinerary_pattern_id: itinerary.itinerary_pattern_id.clone(),
arrival_time_since_start: itinerary.arrival_time_since_start,
departure_time_since_start: itinerary.departure_time_since_start,
interpolated_time_since_start: itinerary.interpolated_time_since_start,
stop_id: itinerary.stop_id.clone(),
chateau: itinerary.chateau.clone().into(),
gtfs_stop_sequence: itinerary.gtfs_stop_sequence,
direction_pattern_id: itinerary_direction_id.clone(),
trip_headsign: itin_meta_ref.trip_headsign.clone(),
trip_headsign_translations: itin_meta_ref.trip_headsign_translations.clone(),
timezone: itin_meta_ref.timezone.clone(),
route_id: itin_meta_ref.route_id.clone()
};

match itinerary_table.entry((
itinerary.chateau.clone(),
itinerary.itinerary_pattern_id.clone(),
)) {
Entry::Occupied(mut oe) => {
oe.get_mut().push(itinerary);
oe.get_mut().push(new_itinerary);
}
Entry::Vacant(mut ve) => {
ve.insert(vec![itinerary]);
ve.insert(vec![new_itinerary]);
}
}
}
}
}
Err(err) => {
Expand Down Expand Up @@ -831,7 +858,7 @@ pub async fn nearby_from_coords(
.get(&(trip.chateau.clone(), trip.itinerary_pattern_id.clone()))
.unwrap();

let itin_ref: ItineraryPatternRowNearbyLookup = this_itin_list[0].clone();
let itin_ref: ItineraryPatternRowMerge = this_itin_list[0].clone();

let time_since_start = match itin_ref.departure_time_since_start {
Some(departure_time_since_start) => departure_time_since_start,
Expand Down Expand Up @@ -879,7 +906,7 @@ pub async fn nearby_from_coords(
reference_start_of_service_date: date.1,
itinerary_pattern_id: itin_ref.itinerary_pattern_id.clone(),
direction_pattern_id: itin_ref.direction_pattern_id.clone(),
route_id: (&itin_ref.route_id).into(),
route_id: itin_ref.route_id.clone(),
trip_start_time: trip.start_time,
trip_short_name: trip.trip_short_name.clone(),
};
Expand Down Expand Up @@ -1072,7 +1099,7 @@ pub async fn nearby_from_coords(
departure_realtime: departure_time_rt,
arrival_schedule: None,
arrival_realtime: None,
stop_id: (&trip.itinerary_options[0].stop_id).into(),
stop_id: trip.itinerary_options[0].stop_id.clone(),
trip_short_name: trip.trip_short_name.clone(),
tz: trip.timezone.as_ref().unwrap().name().to_string(),
is_frequency: trip.frequencies.is_some(),
Expand Down Expand Up @@ -1172,7 +1199,8 @@ pub async fn nearby_from_coords(
debug: DeparturesDebug {
stop_lookup_ms: end_stops_duration.as_millis(),
directions_ms: directions_lookup_duration.as_millis(),
itineraries_ms: itinerary_duration.as_millis(),
itinerary_meta_ms: itin_meta_timer.elapsed().as_millis(),
itinerary_row_ms: itinerary_duration.as_millis(),
trips_ms: trip_lookup_elapsed.as_millis(),
total_time_ms: total_elapsed_time.as_millis(),
},
Expand Down

0 comments on commit 4d7b10f

Please sign in to comment.