Skip to content

Commit

Permalink
Mostly finish alpenrose fetch system
Browse files Browse the repository at this point in the history
  • Loading branch information
kylerchin committed Apr 12, 2024
1 parent 41e10bf commit ea0ce7b
Show file tree
Hide file tree
Showing 5 changed files with 267 additions and 41 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ tzf-rs = "0.4.7"
lazy_static = "1.4.0"
serde_bytes = "0.11.14"
bincode = "1.3.3"
evmap = "10.0.2"
dashmap = "5.5.3"

[[bin]]
name = "maple"
Expand Down
161 changes: 120 additions & 41 deletions src/alpenrose/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ use catenary::agency_secret::*;
use catenary::fast_hash;
use catenary::postgres_tools::CatenaryConn;
use catenary::postgres_tools::{make_async_pool, CatenaryPostgresPool};
use catenary::schema::gtfs::admin_credentials::last_updated_ms;
use dashmap::DashMap;
use diesel::query_dsl::methods::FilterDsl;
use diesel::query_dsl::select_dsl::SelectDsl;
use diesel::sql_types::{Float, Integer};
Expand All @@ -44,20 +46,17 @@ use dmfr_folder_reader::read_folders;
use futures::prelude::*;
use serde::Deserialize;
use serde::Serialize;
use std::collections::HashSet;
use std::collections::{BTreeMap, HashMap};
use std::error::Error;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::sync::RwLock;
use tokio_zookeeper::*;
use uuid::Uuid;

// gtfs unix timestamps
struct LastDataFetched {
realtime_vehicle_positions: Option<u64>,
realtime_trip_updates: Option<u64>,
realtime_alerts: Option<u64>,
}
mod single_fetch_time;

#[derive(Serialize, Clone, Deserialize, Debug, Hash, PartialEq, Eq)]
pub struct RealtimeFeedFetch {
Expand Down Expand Up @@ -100,6 +99,26 @@ async fn main() -> Result<(), Box<dyn Error + Sync + Send>> {
let conn_pre = conn_pool.get().await;
let conn = &mut conn_pre?;

let assignments_for_this_worker: Arc<RwLock<HashMap<String, RealtimeFeedFetch>>> =
Arc::new(RwLock::new(HashMap::new()));

let last_updated_ms_for_this_worker: Option<u64> = None;

let last_fetch_per_feed: Arc<DashMap<String, Instant>> = Arc::new(DashMap::new());

//make client for reqwest
//allow various compression algorithms to be used during the download process, as enabled in Cargo.toml
let client = reqwest::ClientBuilder::new()
//timeout queries after 20 seconds
.timeout(Duration::from_secs(20))
.connect_timeout(Duration::from_secs(20))
.deflate(true)
.gzip(true)
.brotli(true)
.cookie_store(true)
.build()
.unwrap();

loop {
//create parent node for workers

Expand Down Expand Up @@ -292,6 +311,64 @@ async fn main() -> Result<(), Box<dyn Error + Sync + Send>> {
}

//read from zookeeper to get the current assignments for this node

let last_updated_assignment_time_zk_fetch = zk
.get_data(format!("/alpenrose_assignments/{}", this_worker_id).as_str())
.await
.unwrap();

if let Some(last_updated_assignment_time) = last_updated_assignment_time_zk_fetch {
let last_updated_assignment_time =
bincode::deserialize(&last_updated_assignment_time.0)
.unwrap_or_else(|_| None::<u64>);

//is the time newer than the last time we updated the assignments for this worker node?
if last_updated_assignment_time != last_updated_ms_for_this_worker {
let feed_ids = zk
.get_children(format!("/alpenrose_assignments/{}", this_worker_id).as_str())
.await
.unwrap();

if let Some(feed_ids) = feed_ids {
let mut assignments_for_this_worker_lock =
assignments_for_this_worker.write().await;

let hashset_of_feed_ids: HashSet<String> =
feed_ids.iter().map(|x| x.to_string()).collect();

for feed_id in feed_ids.iter() {
let assignment_data = zk
.get_data(
format!("/alpenrose_assignments/{}/{}", this_worker_id, feed_id)
.as_str(),
)
.await
.unwrap();

if let Some(assignment_data) = assignment_data {
let realtime_feed_fetch: RealtimeFeedFetch =
bincode::deserialize(&assignment_data.0).unwrap();

assignments_for_this_worker_lock
.insert(feed_id.to_string(), realtime_feed_fetch);
}
}

//cleanup from hashmap this worker is no longer supposed to handle
assignments_for_this_worker_lock
.retain(|key, _value| hashset_of_feed_ids.contains(key));
}
}
}

//get the feed data from the feeds assigned to this worker

single_fetch_time::single_fetch_time(
client.clone(),
Arc::clone(&assignments_for_this_worker),
Arc::clone(&last_fetch_per_feed),
)
.await?;
}
}

Expand Down Expand Up @@ -337,12 +414,9 @@ async fn get_feed_metadata(
let feed_id = realtime_password.onestop_feed_id.clone();
let password_raw_json = realtime_password.passwords.clone();

let password = match password_raw_json {
Some(password_format) => {
Some(serde_json::from_value::<PasswordFormat>(password_format).unwrap())
}
None => None,
};
let password = password_raw_json.map(|password_format| {
serde_json::from_value::<PasswordFormat>(password_format).unwrap()
});

if let Some(password) = password {
realtime_passwords_hashmap.insert(feed_id, password);
Expand All @@ -363,43 +437,49 @@ async fn get_feed_metadata(
let vehicles_url = match realtime_passwords_hashmap.get(feed_id) {
Some(password_format) => match &password_format.override_realtime_vehicle_positions {
Some(url) => Some(url.to_string()),
None => match &realtime_feed_dmfr.urls.realtime_vehicle_positions {
Some(url) => Some(url.as_str().to_string()),
None => None,
},
},
None => match &realtime_feed_dmfr.urls.realtime_vehicle_positions {
Some(url) => Some(url.as_str().to_string()),
None => None,
None => realtime_feed_dmfr
.urls
.realtime_vehicle_positions
.as_ref()
.map(|url| url.as_str().to_string()),
},
None => realtime_feed_dmfr
.urls
.realtime_vehicle_positions
.as_ref()
.map(|url| url.as_str().to_string()),
};

let trip_updates_url = match realtime_passwords_hashmap.get(feed_id) {
Some(password_format) => match &password_format.override_realtime_trip_updates {
Some(url) => Some(url.to_string()),
None => match &realtime_feed_dmfr.urls.realtime_trip_updates {
Some(url) => Some(url.as_str().to_string()),
None => None,
},
},
None => match &realtime_feed_dmfr.urls.realtime_trip_updates {
Some(url) => Some(url.as_str().to_string()),
None => None,
None => realtime_feed_dmfr
.urls
.realtime_trip_updates
.as_ref()
.map(|url| url.as_str().to_string()),
},
None => realtime_feed_dmfr
.urls
.realtime_trip_updates
.as_ref()
.map(|url| url.as_str().to_string()),
};

let alerts_url = match realtime_passwords_hashmap.get(feed_id) {
Some(password_format) => match &password_format.override_alerts {
Some(url) => Some(url.to_string()),
None => match &realtime_feed_dmfr.urls.realtime_alerts {
Some(url) => Some(url.as_str().to_string()),
None => None,
},
},
None => match &realtime_feed_dmfr.urls.realtime_alerts {
Some(url) => Some(url.as_str().to_string()),
None => None,
None => realtime_feed_dmfr
.urls
.realtime_alerts
.as_ref()
.map(|url| url.as_str().to_string()),
},
None => realtime_feed_dmfr
.urls
.realtime_alerts
.as_ref()
.map(|url| url.as_str().to_string()),
};

realtime_feed_fetches.push(RealtimeFeedFetch {
Expand All @@ -412,10 +492,9 @@ async fn get_feed_metadata(
.unwrap()
.key_formats
.clone(),
passwords: match realtime_passwords_hashmap.get(feed_id) {
Some(password_format) => Some(password_format.passwords.clone()),
None => None,
},
passwords: realtime_passwords_hashmap
.get(feed_id)
.map(|password_format| password_format.passwords.clone()),
fetch_interval_ms: match realtime_feeds_hashmap.get(feed_id) {
Some(realtime_feed) => realtime_feed.fetch_interval_ms,
None => None,
Expand Down
139 changes: 139 additions & 0 deletions src/alpenrose/single_fetch_time.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
use crate::KeyFormat;
use crate::RealtimeFeedFetch;
use catenary::postgres_tools::CatenaryPostgresPool;
use futures::StreamExt;
use rand::seq::SliceRandom;
use reqwest::Response;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use dashmap::DashMap;

pub async fn single_fetch_time(
client: reqwest::Client,
assignments: Arc<RwLock<HashMap<String, RealtimeFeedFetch>>>,
last_fetch_per_feed: Arc<DashMap<String, Instant>>,
) -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
let start = Instant::now();

let assignments_lock = assignments.read().await;

futures::stream::iter(assignments_lock.iter().map(|(feed_id, assignment)| {
let client = client.clone();
let last_fetch_per_feed = last_fetch_per_feed.clone();
async move {
let start = Instant::now();

let fetch_interval_ms = match assignment.fetch_interval_ms {
Some(fetch_interval) => fetch_interval,
None => 10_000,
};

if let Some(last_fetch) = last_fetch_per_feed.get(&feed_id.clone()) {
let duration_since_last_fetch = last_fetch.elapsed().as_millis();

if duration_since_last_fetch < fetch_interval_ms as u128 {
//skip because timeout has not been reached
return;
}
}

let vehicle_positions_request =
make_reqwest_for_url(UrlType::VehiclePositions, assignment, client.clone());

let trip_updates_request =
make_reqwest_for_url(UrlType::TripUpdates, assignment, client.clone());

let alerts_request = make_reqwest_for_url(UrlType::Alerts, assignment, client.clone());

//run all requests concurrently
let vehicle_positions_future =
run_optional_req(vehicle_positions_request, client.clone());
let trip_updates_future = run_optional_req(trip_updates_request, client.clone());
let alerts_future = run_optional_req(alerts_request, client.clone());

let (vehicle_positions_data, trip_updates_data, alerts_data) =
futures::join!(vehicle_positions_future, trip_updates_future, alerts_future,);

//send the data to aspen via tarpc

let duration = start.elapsed();
let duration = duration.as_secs_f64();
println!("{}: {:.2?}", feed_id, duration);
}
}))
.buffer_unordered(20)
.collect::<Vec<()>>()
.await;

Ok(())
}

async fn run_optional_req(
request: Option<reqwest::Request>,
client: reqwest::Client,
) -> Option<Result<Response, Box<dyn std::error::Error + Sync + Send>>> {
match request {
Some(request) => Some(client.execute(request).await.map_err(|e| e.into())),
None => None,
}
}

enum UrlType {
VehiclePositions,
TripUpdates,
Alerts,
}

pub fn make_reqwest_for_url(
url_type: UrlType,
assignment: &RealtimeFeedFetch,
client: reqwest::Client,
) -> Option<reqwest::Request> {
let url = match url_type {
UrlType::VehiclePositions => assignment.realtime_vehicle_positions.as_ref(),
UrlType::TripUpdates => assignment.realtime_trip_updates.as_ref(),
UrlType::Alerts => assignment.realtime_alerts.as_ref(),
};

match url {
Some(url) => {
let mut request = client.get(url);

if let Some(passwords) = &assignment.passwords {
//choose random account to use
let password_info = passwords.choose(&mut rand::thread_rng()).unwrap();

if password_info.password.len() == assignment.key_formats.len() {
let mut url_parameter_seq: Vec<(String, String)> = vec![];
for (key_index, key_format) in assignment.key_formats.iter().enumerate() {
match key_format {
KeyFormat::Header(header) => {
request =
request.header(header, &password_info.password[key_index]);
}
KeyFormat::UrlQuery(query) => {
url_parameter_seq.push((
query.to_string(),
password_info.password[key_index].to_string(),
));
}
}
}

request = request.query(&url_parameter_seq);
} else {
println!(
"Password length does not match key format length for feed_id: {}",
assignment.feed_id
);
return None;
}
}

Some(request.build().unwrap())
}
None => None,
}
}
3 changes: 3 additions & 0 deletions src/aspen/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,8 @@ pub trait AspenRpc {
vehicles: Option<Vec<u8>>,
trips: Option<Vec<u8>>,
alerts: Option<Vec<u8>>,
has_vehicles: bool,
has_trips: bool,
has_alerts: bool,
) -> bool;
}
3 changes: 3 additions & 0 deletions src/aspen/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ impl AspenRpc for AspenServer {
vehicles: Option<Vec<u8>>,
trips: Option<Vec<u8>>,
alerts: Option<Vec<u8>>,
has_vehicles: bool,
has_trips: bool,
has_alerts: bool,
) -> bool {
import_kactus::new_rt_kactus(realtime_feed_id, vehicles, trips, alerts).await
}
Expand Down

0 comments on commit ea0ce7b

Please sign in to comment.