Skip to content

Commit

Permalink
Don't resend data if it is the same hash
Browse files Browse the repository at this point in the history
  • Loading branch information
kylerchin committed Apr 24, 2024
1 parent 421ad4e commit b022813
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 7 deletions.
72 changes: 65 additions & 7 deletions src/alpenrose/single_fetch_time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use std::time::{Duration, Instant};
use tarpc::{client, context, tokio_serde::formats::Bincode};
use tokio::sync::RwLock;
use tokio_zookeeper::ZooKeeper;
use scc::HashMap as SccHashMap;
use catenary::ahash_fast_hash;

pub async fn single_fetch_time(
client: reqwest::Client,
Expand All @@ -29,17 +31,17 @@ pub async fn single_fetch_time(

let assignments_lock = assignments.read().await;

let hashes_of_data:Arc<SccHashMap<(String, UrlType), u64>> = Arc::new(SccHashMap::new());

futures::stream::iter(assignments_lock.iter().map(|(feed_id, assignment)| {
let client = client.clone();
let zk = zk.clone();
let hashes_of_data = Arc::clone(&hashes_of_data);
let last_fetch_per_feed = last_fetch_per_feed.clone();
async move {
let start = Instant::now();

let fetch_interval_ms = match assignment.fetch_interval_ms {
Some(fetch_interval) => fetch_interval,
None => 10_000,
};
let fetch_interval_ms = assignment.fetch_interval_ms.unwrap_or(10_000);

if let Some(last_fetch) = last_fetch_per_feed.get(&feed_id.clone()) {
let duration_since_last_fetch = last_fetch.elapsed().as_millis();
Expand Down Expand Up @@ -124,19 +126,74 @@ pub async fn single_fetch_time(
feed_id.clone(),
match vehicle_positions_data {
Some(Ok(response)) => {
Some(response.bytes().await.unwrap().as_ref().to_vec())
let bytes = response.bytes().await.unwrap().as_ref().to_vec();

let hash = ahash_fast_hash(&bytes);

match hashes_of_data.get(&(feed_id.clone(), UrlType::VehiclePositions)) {
Some(old_hash) => {
let old_hash = old_hash.get();

//if the data has not changed, don't send it
match hash == *old_hash {
true => None,
false => {
hashes_of_data.entry((feed_id.clone(), UrlType::VehiclePositions)).and_modify(|value| *value = hash).or_insert(hash);
Some(bytes)
}
}
},
None => Some(bytes)
}

}
_ => None,
},
match trip_updates_data {
Some(Ok(response)) => {
Some(response.bytes().await.unwrap().as_ref().to_vec())
let bytes = response.bytes().await.unwrap().as_ref().to_vec();

let hash = ahash_fast_hash(&bytes);

match hashes_of_data.get(&(feed_id.clone(), UrlType::TripUpdates)) {
Some(old_hash) => {
let old_hash = old_hash.get();

//if the data has not changed, don't send it
match hash == *old_hash {
true => None,
false => {
hashes_of_data.entry((feed_id.clone(), UrlType::TripUpdates)).and_modify(|value| *value = hash).or_insert(hash);
Some(bytes)
}
}
},
None => Some(bytes)
}
}
_ => None,
},
match alerts_data {
Some(Ok(response)) => {
Some(response.bytes().await.unwrap().as_ref().to_vec())
let bytes = response.bytes().await.unwrap().as_ref().to_vec();

let hash = ahash_fast_hash(&bytes);

match hashes_of_data.get(&(feed_id.clone(), UrlType::Alerts)) {
Some(old_hash) => {
let old_hash = old_hash.get();

//if the data has not changed, don't send it
match hash == *old_hash {
true => None,
false => {
hashes_of_data.entry((feed_id.clone(), UrlType::Alerts)).and_modify(|value| *value = hash).or_insert(hash);
Some(bytes)
}
}
},
None => Some(bytes)
}
}
_ => None,
},
Expand Down Expand Up @@ -177,6 +234,7 @@ async fn run_optional_req(
}
}

#[derive(Debug, Hash, Clone, Eq, PartialEq)]
pub enum UrlType {
VehiclePositions,
TripUpdates,
Expand Down
7 changes: 7 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub mod schema;

use fasthash::MetroHasher;
use gtfs_rt::VehicleDescriptor;
use ahash::AHasher;
use std::hash::Hash;
use std::hash::Hasher;
use std::time::Duration;
Expand Down Expand Up @@ -117,6 +118,12 @@ pub fn fast_hash<T: Hash>(t: &T) -> u64 {
s.finish()
}

pub fn ahash_fast_hash<T: Hash>(t: &T) -> u64 {
let mut hasher = AHasher::default();
t.hash(&mut hasher);
hasher.finish()
}

pub fn duration_since_unix_epoch() -> Duration {
SystemTime::now().duration_since(UNIX_EPOCH).unwrap()
}
Expand Down

0 comments on commit b022813

Please sign in to comment.