diff --git a/src/alpenrose/single_fetch_time.rs b/src/alpenrose/single_fetch_time.rs index 8378a499..5327fd12 100644 --- a/src/alpenrose/single_fetch_time.rs +++ b/src/alpenrose/single_fetch_time.rs @@ -13,6 +13,8 @@ use std::time::{Duration, Instant}; use tarpc::{client, context, tokio_serde::formats::Bincode}; use tokio::sync::RwLock; use tokio_zookeeper::ZooKeeper; +use scc::HashMap as SccHashMap; +use catenary::ahash_fast_hash; pub async fn single_fetch_time( client: reqwest::Client, @@ -29,17 +31,17 @@ pub async fn single_fetch_time( let assignments_lock = assignments.read().await; + let hashes_of_data:Arc<SccHashMap<(String, UrlType), u64>> = Arc::new(SccHashMap::new()); + futures::stream::iter(assignments_lock.iter().map(|(feed_id, assignment)| { let client = client.clone(); let zk = zk.clone(); + let hashes_of_data = Arc::clone(&hashes_of_data); let last_fetch_per_feed = last_fetch_per_feed.clone(); async move { let start = Instant::now(); - let fetch_interval_ms = match assignment.fetch_interval_ms { - Some(fetch_interval) => fetch_interval, - None => 10_000, - }; + let fetch_interval_ms = assignment.fetch_interval_ms.unwrap_or(10_000); if let Some(last_fetch) = last_fetch_per_feed.get(&feed_id.clone()) { let duration_since_last_fetch = last_fetch.elapsed().as_millis(); @@ -124,19 +126,74 @@ pub async fn single_fetch_time( feed_id.clone(), match vehicle_positions_data { Some(Ok(response)) => { - Some(response.bytes().await.unwrap().as_ref().to_vec()) + let bytes = response.bytes().await.unwrap().as_ref().to_vec(); + + let hash = ahash_fast_hash(&bytes); + + match hashes_of_data.get(&(feed_id.clone(), UrlType::VehiclePositions)) { + Some(old_hash) => { + let old_hash = old_hash.get(); + + //if the data has not changed, don't send it + match hash == *old_hash { + true => None, + false => { + hashes_of_data.entry((feed_id.clone(), UrlType::VehiclePositions)).and_modify(|value| *value = hash).or_insert(hash); + Some(bytes) + } + } + }, + None => Some(bytes) + } + } _ => None, }, match trip_updates_data { Some(Ok(response)) => { - Some(response.bytes().await.unwrap().as_ref().to_vec()) + let bytes = response.bytes().await.unwrap().as_ref().to_vec(); + + let hash = ahash_fast_hash(&bytes); + + match hashes_of_data.get(&(feed_id.clone(), UrlType::TripUpdates)) { + Some(old_hash) => { + let old_hash = old_hash.get(); + + //if the data has not changed, don't send it + match hash == *old_hash { + true => None, + false => { + hashes_of_data.entry((feed_id.clone(), UrlType::TripUpdates)).and_modify(|value| *value = hash).or_insert(hash); + Some(bytes) + } + } + }, + None => Some(bytes) + } } _ => None, }, match alerts_data { Some(Ok(response)) => { - Some(response.bytes().await.unwrap().as_ref().to_vec()) + let bytes = response.bytes().await.unwrap().as_ref().to_vec(); + + let hash = ahash_fast_hash(&bytes); + + match hashes_of_data.get(&(feed_id.clone(), UrlType::Alerts)) { + Some(old_hash) => { + let old_hash = old_hash.get(); + + //if the data has not changed, don't send it + match hash == *old_hash { + true => None, + false => { + hashes_of_data.entry((feed_id.clone(), UrlType::Alerts)).and_modify(|value| *value = hash).or_insert(hash); + Some(bytes) + } + } + }, + None => Some(bytes) + } } _ => None, }, @@ -177,6 +234,7 @@ async fn run_optional_req( } } +#[derive(Debug, Hash, Clone, Eq, PartialEq)] pub enum UrlType { VehiclePositions, TripUpdates, diff --git a/src/lib.rs b/src/lib.rs index df4a59b8..10f8138d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -43,6 +43,7 @@ pub mod schema; use fasthash::MetroHasher; use gtfs_rt::VehicleDescriptor; +use ahash::AHasher; use std::hash::Hash; use std::hash::Hasher; use std::time::Duration; @@ -117,6 +118,12 @@ pub fn fast_hash<T: Hash>(t: &T) -> u64 { s.finish() } +pub fn ahash_fast_hash<T: Hash>(t: &T) -> u64 { + let mut hasher = AHasher::default(); + t.hash(&mut hasher); + hasher.finish() +} + pub fn duration_since_unix_epoch() -> Duration { SystemTime::now().duration_since(UNIX_EPOCH).unwrap() }