Skip to content

Commit

Permalink
also lookup trip id
Browse files Browse the repository at this point in the history
  • Loading branch information
kylerchin committed Oct 3, 2024
1 parent 37f47ac commit a96e1df
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 8 deletions.
17 changes: 16 additions & 1 deletion src/aspen/import_alpenrose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,21 @@ pub async fn new_rt_data(
}
}

//trips updates trip id lookup
if let Some(trip_gtfs_rt_for_feed_id) =
authoritative_gtfs_rt.get(&(realtime_feed_id.clone(), GtfsRtType::TripUpdates))
{
let trip_gtfs_rt_for_feed_id = trip_gtfs_rt_for_feed_id.get();

for trip_entity in trip_gtfs_rt_for_feed_id.entity.iter() {
if let Some(trip_update) = &trip_entity.trip_update {
if let Some(trip_id) = &trip_update.trip.trip_id {
trip_ids_to_lookup.insert(trip_id.clone());
}
}
}
}

//now do the same for alerts updates
if let Some(alert_gtfs_rt_for_feed_id) =
authoritative_gtfs_rt.get(&(realtime_feed_id.clone(), GtfsRtType::Alerts))
Expand Down Expand Up @@ -581,4 +596,4 @@ mod tests {
_ => panic!("Expected Metrolink data"),
}
}
}
}
14 changes: 9 additions & 5 deletions src/aspen/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ use std::collections::HashMap;
use std::collections::HashSet;
mod alerts_responder;
mod aspen_assignment;
use catenary::rt_recent_history::RtCacheEntry;
use catenary::rt_recent_history::RtKey;
use prost::Message;
use std::time::Instant;
use catenary::rt_recent_history::RtKey;
use catenary::rt_recent_history::RtCacheEntry;

#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
pub struct GtfsRealtimeHashStore {
Expand All @@ -89,14 +89,16 @@ pub struct AspenServer {
// Backed up in redis as well, program can be shut down and restarted without data loss
pub authoritative_gtfs_rt_store: Arc<SccHashMap<(String, GtfsRtType), FeedMessage>>,
pub conn_pool: Arc<CatenaryPostgresPool>,
pub authoritative_trip_updates_by_gtfs_feed_history: Arc<SccHashMap<CompactString, AHashMap<RtKey, RtCacheEntry>>>,
pub authoritative_trip_updates_by_gtfs_feed_history:
Arc<SccHashMap<CompactString, AHashMap<RtKey, RtCacheEntry>>>,
pub alpenrose_to_process_queue: Arc<Injector<ProcessAlpenroseData>>,
pub alpenrose_to_process_queue_chateaus: Arc<Mutex<HashSet<String>>>,
pub rough_hash_of_gtfs_rt: Arc<SccHashMap<(String, GtfsRtType), u64>>,
pub hash_of_raw_gtfs_rt_protobuf: Arc<SccHashMap<String, GtfsRealtimeHashStore>>,
pub backup_data_store: Arc<SccHashMap<String, catenary::aspen_dataset::AspenisedData>>,
pub backup_gtfs_rt_store: Arc<SccHashMap<(String, GtfsRtType), FeedMessage>>,
pub backup_trip_updates_by_gtfs_feed_history: Arc<SccHashMap<CompactString, AHashMap<RtKey, RtCacheEntry>>>,
pub backup_trip_updates_by_gtfs_feed_history:
Arc<SccHashMap<CompactString, AHashMap<RtKey, RtCacheEntry>>>,
pub etcd_addresses: Arc<Vec<String>>,
pub etcd_connect_options: Arc<Option<etcd_client::ConnectOptions>>,
pub worker_etcd_lease_id: i64,
Expand Down Expand Up @@ -815,7 +817,9 @@ async fn main() -> anyhow::Result<()> {
etcd_addresses: Arc::clone(&etcd_addresses),
etcd_connect_options: Arc::clone(&arc_etcd_connect_options),
timestamps_of_gtfs_rt: Arc::clone(&timestamps_of_gtfs_rt),
authoritative_trip_updates_by_gtfs_feed_history: Arc::new(SccHashMap::new()),
authoritative_trip_updates_by_gtfs_feed_history: Arc::new(
SccHashMap::new(),
),
backup_trip_updates_by_gtfs_feed_history: Arc::new(SccHashMap::new()),
};
channel.execute(server.serve()).for_each(spawn)
Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,11 @@ pub mod tailscale {
}

pub mod aspen_dataset {
use crate::RtCacheEntry;
use crate::RtKey;
use ahash::AHashMap;
use compact_str::CompactString;
use std::hash::Hash;
use crate::RtKey;
use crate::RtCacheEntry;

#[derive(Clone, Serialize, Deserialize)]
pub struct AspenisedData {
Expand Down

0 comments on commit a96e1df

Please sign in to comment.