Skip to content

Commit

Permalink
Temporary idea on feed id assignments
Browse files Browse the repository at this point in the history
  • Loading branch information
kylerchin committed Apr 12, 2024
1 parent 4c6d55f commit a573144
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 20 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ argon2 = "0.5.3"
tzf-rs = "0.4.7"
lazy_static = "1.4.0"
serde_bytes = "0.11.14"
bincode = "1.3.3"

[[bin]]
name = "maple"
Expand Down
6 changes: 3 additions & 3 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,11 @@ The following `cargo clippy` rules are enforced.
clippy::boxed_local,
clippy::assigning_clones,
clippy::redundant_allocation,
bool_comparison,
bind_instead_of_map,
clippy::bool_comparison,
clippy::bind_instead_of_map,
clippy::vec_box,
clippy::while_let_loop,
useless_asref,
clippy::useless_asref,
clippy::repeat_once,
clippy::deref_addrof,
clippy::suspicious_map,
Expand Down
62 changes: 57 additions & 5 deletions src/alpenrose/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@
clippy::boxed_local,
clippy::assigning_clones,
clippy::redundant_allocation,
bool_comparison,
bind_instead_of_map,
clippy::bool_comparison,
clippy::bind_instead_of_map,
clippy::vec_box,
clippy::while_let_loop,
useless_asref,
clippy::useless_asref,
clippy::repeat_once,
clippy::deref_addrof,
clippy::suspicious_map,
Expand Down Expand Up @@ -68,7 +68,7 @@ pub struct RealtimeFeedFetch {

#[derive(Serialize, Deserialize, Clone, Hash, PartialEq, Eq)]
struct InstructionsPerWorker {
feeds: RealtimeFeedFetch,
feeds: Vec<RealtimeFeedFetch>,
}

#[tokio::main]
Expand All @@ -80,6 +80,10 @@ async fn main() -> Result<(), Box<dyn Error + Sync + Send>> {
//hands off data to aspen to do additional cleanup and processing, Aspen will perform association with the GTFS schedule data + update dynamic graphs for routing and map representation,
//aspen will also forward critical alerts to users

//If the worker disconnects from zookeeper, that's okay because tasks will be reassigned.
// When it reconnects, the same worker id can be used and feed instructions will be reassigned to it.
// ingestion won't run when the worker is disconnected from zookeeper due the instructions be written to the worker's ehpehmeral node

// last check time
let last_check_time_ms: Option<u64> = None;
let last_set_of_active_nodes_hash: Option<u64> = None;
Expand Down Expand Up @@ -110,6 +114,17 @@ async fn main() -> Result<(), Box<dyn Error + Sync + Send>> {
.await
.unwrap();

// create this worker as an ephemeral node
let this_worker_assignment = zk
.create(
format!("/alpenrose_workers/{}", this_worker_id).as_str(),
vec![],
Acl::open_unsafe(),
CreateMode::Ephemeral,
)
.await
.unwrap().unwrap();

let workers_assignments = zk
.create(
"/alpenrose_assignments",
Expand All @@ -120,6 +135,16 @@ async fn main() -> Result<(), Box<dyn Error + Sync + Send>> {
.await
.unwrap();

let this_worker_assignments = zk
.create(
format!("/alpenrose_assignments/{}", this_worker_id).as_str(),
vec![],
Acl::open_unsafe(),
CreateMode::Ephemeral,
)
.await
.unwrap();

let leader_exists = zk.watch().exists("/alpenrose_leader").await.unwrap();

if leader_exists.is_none() {
Expand Down Expand Up @@ -199,12 +224,39 @@ async fn main() -> Result<(), Box<dyn Error + Sync + Send>> {
{
let node_to_assign = &worker_nodes[index % worker_nodes.len()];

let _ = assignments.entry(node_to_assign.to_string());
//append to list
assignments.entry(node_to_assign.to_string()).and_modify(
|instructions| {
instructions.push(realtime_instructions.clone());
},
).or_insert(vec![realtime_instructions.clone()]);
}
// look at current assignments, delete old assignments

// assign feeds to worker nodes

for (node, instructions) in assignments.iter() {
let instructions_per_worker = InstructionsPerWorker {
feeds: instructions.clone(),
};

//this is capped at 1MB, I should change the data structure to be smaller

// Option 1: Each worker node has a list of feeds, and then has to lookup the associated instructions in zookeeper
// downside, what if associated instructions update?

//Option 2: Store each instruction as a sub zookeeper node of the worker node
// upside is that the instructions disappear when the worker node disappears
//downside, still has too lookup all the sequential data etc

let _ = zk.set_data(
format!("/alpenrose_assignments/{}", node).as_str(),
None,
bincode::serialize(&instructions_per_worker).unwrap(),
)
.await
.unwrap();
}
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/aspen/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
clippy::boxed_local,
clippy::assigning_clones,
clippy::redundant_allocation,
bool_comparison,
bind_instead_of_map,
clippy::bool_comparison,
clippy::bind_instead_of_map,
clippy::vec_box,
clippy::while_let_loop,
useless_asref,
clippy::useless_asref,
clippy::repeat_once,
clippy::deref_addrof,
clippy::suspicious_map,
Expand Down
6 changes: 3 additions & 3 deletions src/birch/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
clippy::boxed_local,
clippy::assigning_clones,
clippy::redundant_allocation,
bool_comparison,
bind_instead_of_map,
clippy::bool_comparison,
clippy::bind_instead_of_map,
clippy::vec_box,
clippy::while_let_loop,
useless_asref,
clippy::useless_asref,
clippy::repeat_once,
clippy::deref_addrof,
clippy::suspicious_map,
Expand Down
6 changes: 3 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
clippy::boxed_local,
clippy::assigning_clones,
clippy::redundant_allocation,
bool_comparison,
bind_instead_of_map,
clippy::bool_comparison,
clippy::bind_instead_of_map,
clippy::vec_box,
clippy::while_let_loop,
useless_asref,
clippy::useless_asref,
clippy::repeat_once,
clippy::deref_addrof,
clippy::suspicious_map,
Expand Down
6 changes: 3 additions & 3 deletions src/spruce/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
clippy::boxed_local,
clippy::assigning_clones,
clippy::redundant_allocation,
bool_comparison,
bind_instead_of_map,
clippy::bool_comparison,
clippy::bind_instead_of_map,
clippy::vec_box,
clippy::while_let_loop,
useless_asref,
clippy::useless_asref,
clippy::repeat_once,
clippy::deref_addrof,
clippy::suspicious_map,
Expand Down

0 comments on commit a573144

Please sign in to comment.