From a5731440f355d10072335e81eecae54e9980cd9a Mon Sep 17 00:00:00 2001 From: Kyler Chin <7539174+kylerchin@users.noreply.github.com> Date: Fri, 12 Apr 2024 01:32:22 -0700 Subject: [PATCH] Temporary idea on feed id assignments --- Cargo.toml | 1 + readme.md | 6 ++--- src/alpenrose/main.rs | 62 +++++++++++++++++++++++++++++++++++++++---- src/aspen/main.rs | 6 ++--- src/birch/server.rs | 6 ++--- src/lib.rs | 6 ++--- src/spruce/main.rs | 6 ++--- 7 files changed, 73 insertions(+), 20 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a0ee71f0..1cdd9fa9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -107,6 +107,7 @@ argon2 = "0.5.3" tzf-rs = "0.4.7" lazy_static = "1.4.0" serde_bytes = "0.11.14" +bincode = "1.3.3" [[bin]] name = "maple" diff --git a/readme.md b/readme.md index f36fe5bf..7a85efd8 100644 --- a/readme.md +++ b/readme.md @@ -77,11 +77,11 @@ The following `cargo clippy` rules are enforced. clippy::boxed_local, clippy::assigning_clones, clippy::redundant_allocation, - bool_comparison, - bind_instead_of_map, + clippy::bool_comparison, + clippy::bind_instead_of_map, clippy::vec_box, clippy::while_let_loop, - useless_asref, + clippy::useless_asref, clippy::repeat_once, clippy::deref_addrof, clippy::suspicious_map, diff --git a/src/alpenrose/main.rs b/src/alpenrose/main.rs index ab72d125..b53573fe 100644 --- a/src/alpenrose/main.rs +++ b/src/alpenrose/main.rs @@ -10,11 +10,11 @@ clippy::boxed_local, clippy::assigning_clones, clippy::redundant_allocation, - bool_comparison, - bind_instead_of_map, + clippy::bool_comparison, + clippy::bind_instead_of_map, clippy::vec_box, clippy::while_let_loop, - useless_asref, + clippy::useless_asref, clippy::repeat_once, clippy::deref_addrof, clippy::suspicious_map, @@ -68,7 +68,7 @@ pub struct RealtimeFeedFetch { #[derive(Serialize, Deserialize, Clone, Hash, PartialEq, Eq)] struct InstructionsPerWorker { - feeds: RealtimeFeedFetch, + feeds: Vec, } #[tokio::main] @@ -80,6 +80,10 @@ async fn main() -> Result<(), Box> { //hands off data to aspen to do additional cleanup and processing, Aspen will perform association with the GTFS schedule data + update dynamic graphs for routing and map representation, //aspen will also forward critical alerts to users + //If the worker disconnects from zookeeper, that's okay because tasks will be reassigned. + // When it reconnects, the same worker id can be used and feed instructions will be reassigned to it. + // ingestion won't run when the worker is disconnected from zookeeper due the instructions be written to the worker's ehpehmeral node + // last check time let last_check_time_ms: Option = None; let last_set_of_active_nodes_hash: Option = None; @@ -110,6 +114,17 @@ async fn main() -> Result<(), Box> { .await .unwrap(); + // create this worker as an ephemeral node + let this_worker_assignment = zk + .create( + format!("/alpenrose_workers/{}", this_worker_id).as_str(), + vec![], + Acl::open_unsafe(), + CreateMode::Ephemeral, + ) + .await + .unwrap().unwrap(); + let workers_assignments = zk .create( "/alpenrose_assignments", @@ -120,6 +135,16 @@ async fn main() -> Result<(), Box> { .await .unwrap(); + let this_worker_assignments = zk + .create( + format!("/alpenrose_assignments/{}", this_worker_id).as_str(), + vec![], + Acl::open_unsafe(), + CreateMode::Ephemeral, + ) + .await + .unwrap(); + let leader_exists = zk.watch().exists("/alpenrose_leader").await.unwrap(); if leader_exists.is_none() { @@ -199,12 +224,39 @@ async fn main() -> Result<(), Box> { { let node_to_assign = &worker_nodes[index % worker_nodes.len()]; - let _ = assignments.entry(node_to_assign.to_string()); //append to list + assignments.entry(node_to_assign.to_string()).and_modify( + |instructions| { + instructions.push(realtime_instructions.clone()); + }, + ).or_insert(vec![realtime_instructions.clone()]); } // look at current assignments, delete old assignments // assign feeds to worker nodes + + for (node, instructions) in assignments.iter() { + let instructions_per_worker = InstructionsPerWorker { + feeds: instructions.clone(), + }; + + //this is capped at 1MB, I should change the data structure to be smaller + + // Option 1: Each worker node has a list of feeds, and then has to lookup the associated instructions in zookeeper + // downside, what if associated instructions update? + + //Option 2: Store each instruction as a sub zookeeper node of the worker node + // upside is that the instructions disappear when the worker node disappears + //downside, still has too lookup all the sequential data etc + + let _ = zk.set_data( + format!("/alpenrose_assignments/{}", node).as_str(), + None, + bincode::serialize(&instructions_per_worker).unwrap(), + ) + .await + .unwrap(); + } } } } diff --git a/src/aspen/main.rs b/src/aspen/main.rs index a227f72d..b30239ee 100644 --- a/src/aspen/main.rs +++ b/src/aspen/main.rs @@ -4,11 +4,11 @@ clippy::boxed_local, clippy::assigning_clones, clippy::redundant_allocation, - bool_comparison, - bind_instead_of_map, + clippy::bool_comparison, + clippy::bind_instead_of_map, clippy::vec_box, clippy::while_let_loop, - useless_asref, + clippy::useless_asref, clippy::repeat_once, clippy::deref_addrof, clippy::suspicious_map, diff --git a/src/birch/server.rs b/src/birch/server.rs index 055d887a..9cc20fbd 100644 --- a/src/birch/server.rs +++ b/src/birch/server.rs @@ -4,11 +4,11 @@ clippy::boxed_local, clippy::assigning_clones, clippy::redundant_allocation, - bool_comparison, - bind_instead_of_map, + clippy::bool_comparison, + clippy::bind_instead_of_map, clippy::vec_box, clippy::while_let_loop, - useless_asref, + clippy::useless_asref, clippy::repeat_once, clippy::deref_addrof, clippy::suspicious_map, diff --git a/src/lib.rs b/src/lib.rs index 7b4211ea..fb18821d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,11 +8,11 @@ clippy::boxed_local, clippy::assigning_clones, clippy::redundant_allocation, - bool_comparison, - bind_instead_of_map, + clippy::bool_comparison, + clippy::bind_instead_of_map, clippy::vec_box, clippy::while_let_loop, - useless_asref, + clippy::useless_asref, clippy::repeat_once, clippy::deref_addrof, clippy::suspicious_map, diff --git a/src/spruce/main.rs b/src/spruce/main.rs index 55ca8b69..110bc078 100644 --- a/src/spruce/main.rs +++ b/src/spruce/main.rs @@ -4,11 +4,11 @@ clippy::boxed_local, clippy::assigning_clones, clippy::redundant_allocation, - bool_comparison, - bind_instead_of_map, + clippy::bool_comparison, + clippy::bind_instead_of_map, clippy::vec_box, clippy::while_let_loop, - useless_asref, + clippy::useless_asref, clippy::repeat_once, clippy::deref_addrof, clippy::suspicious_map,