Skip to content

Commit

Permalink
Create leader election system for aspen
Browse files Browse the repository at this point in the history
  • Loading branch information
kylerchin committed Apr 18, 2024
1 parent 21daf10 commit fe8526e
Show file tree
Hide file tree
Showing 7 changed files with 529 additions and 60 deletions.
95 changes: 88 additions & 7 deletions src/alpenrose/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
// https://en.wikipedia.org/wiki/Rhododendron_ferrugineum
use catenary::agency_secret::*;
use catenary::fast_hash;
use std::time::Instant;
use catenary::postgres_tools::CatenaryConn;
use catenary::postgres_tools::{make_async_pool, CatenaryPostgresPool};
use catenary::schema::gtfs::admin_credentials::last_updated_ms;
Expand All @@ -53,6 +52,7 @@ use std::error::Error;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use std::time::Instant;
use tokio::sync::Mutex;
use tokio::sync::RwLock;
use tokio_zookeeper::*;
Expand Down Expand Up @@ -123,7 +123,7 @@ async fn main() -> Result<(), Box<dyn Error + Sync + Send>> {
loop {
//create parent node for workers

let workers = zk
let _ = zk
.create(
"/alpenrose_workers",
vec![],
Expand Down Expand Up @@ -166,14 +166,14 @@ async fn main() -> Result<(), Box<dyn Error + Sync + Send>> {
.await
.unwrap();

let leader_exists = zk.watch().exists("/alpenrose_leader").await.unwrap();
let leader_exists = zk.exists("/alpenrose_leader").await.unwrap();

if leader_exists.is_none() {
//attempt to become leader
let leader = zk
.create(
"/alpenrose_leader",
this_worker_id.as_bytes().to_vec(),
bincode::serialize(&this_worker_id).unwrap(),
Acl::open_unsafe(),
CreateMode::Ephemeral,
)
Expand All @@ -188,7 +188,7 @@ async fn main() -> Result<(), Box<dyn Error + Sync + Send>> {
let leader = zk.watch().get_data("/alpenrose_leader").await.unwrap();

if let Some((leader_str_bytes, leader_stats)) = leader {
let leader_id = String::from_utf8(leader_str_bytes).unwrap();
let leader_id: String = bincode::deserialize(&leader_str_bytes).unwrap();

if &leader_id == this_worker_id.as_ref() {
//I am the leader!
Expand Down Expand Up @@ -282,9 +282,50 @@ async fn main() -> Result<(), Box<dyn Error + Sync + Send>> {
.as_str(),
bincode::serialize(&realtime_instruction).unwrap(),
Acl::open_unsafe(),
CreateMode::Ephemeral,
CreateMode::Persistent,
)
.await?;

match assignment {
Ok(_) => {
println!(
"Assigned feed {} to worker {}",
feed_id_str, worker_id
);
}
Err(error::Create::NodeExists) => {
let set_assignment = zk
.set_data(
format!(
"/alpenrose_assignments/{}/{}",
worker_id, feed_id_str
)
.as_str(),
None,
bincode::serialize(&realtime_instruction)
.unwrap(),
)
.await?;

match set_assignment {
Ok(_) => {
println!(
"Reassigned feed {} to worker {}",
feed_id_str, worker_id
);
}
Err(err) => {
eprintln!("Error reassigning feed {} to worker {}: {:?}", feed_id_str, worker_id, err);
}
}
}
Err(err) => {
eprintln!(
"Error assigning feed {} to worker {}: {:?}",
feed_id_str, worker_id, err
);
}
}
}

//update the worker's last updated time
Expand All @@ -297,9 +338,49 @@ async fn main() -> Result<(), Box<dyn Error + Sync + Send>> {
))
.unwrap(),
Acl::open_unsafe(),
CreateMode::Ephemeral,
CreateMode::Persistent,
)
.await?;

match worker_assignment_metadata {
Ok(_) => {
println!("Updated worker assignment metadata");
}
Err(error::Create::NodeExists) => {
let set_worker_assignment_metadata = zk
.set_data(
format!(
"/alpenrose_assignments/{}",
this_worker_id
)
.as_str(),
None,
bincode::serialize(&Some(
catenary::duration_since_unix_epoch()
.as_millis(),
))
.unwrap(),
)
.await?;

match set_worker_assignment_metadata {
Ok(_) => {
println!(
"Reassigned worker assignment metadata"
);
}
Err(err) => {
eprintln!("Error reassigning worker assignment metadata: {:?}", err);
}
}
}
Err(err) => {
eprintln!(
"Error updating worker assignment metadata: {:?}",
err
);
}
}
}
}
}
Expand Down
20 changes: 18 additions & 2 deletions src/alpenrose/single_fetch_time.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use crate::KeyFormat;
use crate::RealtimeFeedFetch;
use catenary::aspen::lib::ChateausLeaderHashMap;
use catenary::postgres_tools::CatenaryPostgresPool;
use dashmap::DashMap;
use futures::StreamExt;
use rand::seq::SliceRandom;
use reqwest::Response;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use dashmap::DashMap;

pub async fn single_fetch_time(
client: reqwest::Client,
Expand Down Expand Up @@ -58,6 +59,21 @@ pub async fn single_fetch_time(

//send the data to aspen via tarpc

let vehicle_positions_http_status = match &vehicle_positions_data {
Some(Ok(response)) => Some(response.status().as_u16()),
_ => None,
};

let trip_updates_http_status = match &trip_updates_data {
Some(Ok(response)) => Some(response.status().as_u16()),
_ => None,
};

let alerts_http_status = match &alerts_data {
Some(Ok(response)) => Some(response.status().as_u16()),
_ => None,
};

let duration = start.elapsed();
let duration = duration.as_secs_f64();
println!("{}: {:.2?}", feed_id, duration);
Expand All @@ -80,7 +96,7 @@ async fn run_optional_req(
}
}

enum UrlType {
pub enum UrlType {
VehiclePositions,
TripUpdates,
Alerts,
Expand Down
Loading

0 comments on commit fe8526e

Please sign in to comment.