diff --git a/src/alpenrose/main.rs b/src/alpenrose/main.rs index b53573fe..53b354d1 100644 --- a/src/alpenrose/main.rs +++ b/src/alpenrose/main.rs @@ -66,11 +66,6 @@ pub struct RealtimeFeedFetch { pub fetch_interval_ms: Option, } -#[derive(Serialize, Deserialize, Clone, Hash, PartialEq, Eq)] -struct InstructionsPerWorker { - feeds: Vec, -} - #[tokio::main] async fn main() -> Result<(), Box> { let this_worker_id = Arc::new(Uuid::new_v4().to_string()); @@ -123,7 +118,8 @@ async fn main() -> Result<(), Box> { CreateMode::Ephemeral, ) .await - .unwrap().unwrap(); + .unwrap() + .unwrap(); let workers_assignments = zk .create( @@ -135,10 +131,11 @@ async fn main() -> Result<(), Box> { .await .unwrap(); + //each feed id ephemeral id contains the last time updated, with none meaning the data has not been assigned to the node yet let this_worker_assignments = zk .create( format!("/alpenrose_assignments/{}", this_worker_id).as_str(), - vec![], + bincode::serialize(&None::).unwrap(), Acl::open_unsafe(), CreateMode::Ephemeral, ) @@ -216,8 +213,10 @@ async fn main() -> Result<(), Box> { // divide feeds between worker nodes // feed id -> List of realtime fetch instructions - let mut assignments: BTreeMap> = - BTreeMap::new(); + let mut assignments: BTreeMap< + String, + HashMap, + > = BTreeMap::new(); for (index, (feed_id, realtime_instructions)) in feeds_map.iter().enumerate() @@ -225,37 +224,58 @@ async fn main() -> Result<(), Box> { let node_to_assign = &worker_nodes[index % worker_nodes.len()]; //append to list - assignments.entry(node_to_assign.to_string()).and_modify( - |instructions| { - instructions.push(realtime_instructions.clone()); - }, - ).or_insert(vec![realtime_instructions.clone()]); + assignments + .entry(node_to_assign.to_string()) + .and_modify(|instructions| { + instructions.insert( + feed_id.clone(), + realtime_instructions.clone(), + ); + }) + .or_insert({ + let mut map = HashMap::new(); + map.insert( + feed_id.clone(), + realtime_instructions.clone(), + ); + map + }); } - // look at current assignments, delete old assignments - - // assign feeds to worker nodes - - for (node, instructions) in assignments.iter() { - let instructions_per_worker = InstructionsPerWorker { - feeds: instructions.clone(), - }; - - //this is capped at 1MB, I should change the data structure to be smaller - - // Option 1: Each worker node has a list of feeds, and then has to lookup the associated instructions in zookeeper - // downside, what if associated instructions update? - - //Option 2: Store each instruction as a sub zookeeper node of the worker node - // upside is that the instructions disappear when the worker node disappears - //downside, still has too lookup all the sequential data etc - let _ = zk.set_data( - format!("/alpenrose_assignments/{}", node).as_str(), - None, - bincode::serialize(&instructions_per_worker).unwrap(), - ) - .await - .unwrap(); + //update assignments in zookeeper + + for (worker_id, instructions_hashmap) in assignments.iter() { + for (feed_id, realtime_instruction) in instructions_hashmap { + let feed_id_str = feed_id.clone(); + + //update each feed under the workers node's assignment + let assignment = zk + .create( + format!( + "/alpenrose_assignments/{}/{}", + worker_id, feed_id_str + ) + .as_str(), + bincode::serialize(&realtime_instruction).unwrap(), + Acl::open_unsafe(), + CreateMode::Ephemeral, + ) + .await?; + } + + //update the worker's last updated time + let worker_assignment_metadata = zk + .create( + format!("/alpenrose_assignments/{}", this_worker_id) + .as_str(), + bincode::serialize(&Some( + catenary::duration_since_unix_epoch().as_millis(), + )) + .unwrap(), + Acl::open_unsafe(), + CreateMode::Ephemeral, + ) + .await?; } } } diff --git a/src/lib.rs b/src/lib.rs index fb18821d..f9f9e865 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -40,6 +40,8 @@ pub mod schema; use fasthash::MetroHasher; use std::hash::Hash; use std::hash::Hasher; +use std::time::Duration; +use std::time::{SystemTime, UNIX_EPOCH}; pub const WGS_84_SRID: u32 = 4326; @@ -102,3 +104,7 @@ pub fn fast_hash(t: &T) -> u64 { t.hash(&mut s); s.finish() } + +pub fn duration_since_unix_epoch() -> Duration { + SystemTime::now().duration_since(UNIX_EPOCH).unwrap() +}