Skip to content

Commit

Permalink
Switch to individual feed nodes to prevent overflowing 1MB cap in zoo…
Browse files Browse the repository at this point in the history
…keeper
  • Loading branch information
kylerchin committed Apr 12, 2024
1 parent a573144 commit 0570119
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 38 deletions.
96 changes: 58 additions & 38 deletions src/alpenrose/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,6 @@ pub struct RealtimeFeedFetch {
pub fetch_interval_ms: Option<i32>,
}

#[derive(Serialize, Deserialize, Clone, Hash, PartialEq, Eq)]
struct InstructionsPerWorker {
feeds: Vec<RealtimeFeedFetch>,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Sync + Send>> {
let this_worker_id = Arc::new(Uuid::new_v4().to_string());
Expand Down Expand Up @@ -123,7 +118,8 @@ async fn main() -> Result<(), Box<dyn Error + Sync + Send>> {
CreateMode::Ephemeral,
)
.await
.unwrap().unwrap();
.unwrap()
.unwrap();

let workers_assignments = zk
.create(
Expand All @@ -135,10 +131,11 @@ async fn main() -> Result<(), Box<dyn Error + Sync + Send>> {
.await
.unwrap();

//each feed id ephemeral id contains the last time updated, with none meaning the data has not been assigned to the node yet
let this_worker_assignments = zk
.create(
format!("/alpenrose_assignments/{}", this_worker_id).as_str(),
vec![],
bincode::serialize(&None::<u64>).unwrap(),
Acl::open_unsafe(),
CreateMode::Ephemeral,
)
Expand Down Expand Up @@ -216,46 +213,69 @@ async fn main() -> Result<(), Box<dyn Error + Sync + Send>> {
// divide feeds between worker nodes

// feed id -> List of realtime fetch instructions
let mut assignments: BTreeMap<String, Vec<RealtimeFeedFetch>> =
BTreeMap::new();
let mut assignments: BTreeMap<
String,
HashMap<String, RealtimeFeedFetch>,
> = BTreeMap::new();

for (index, (feed_id, realtime_instructions)) in
feeds_map.iter().enumerate()
{
let node_to_assign = &worker_nodes[index % worker_nodes.len()];

//append to list
assignments.entry(node_to_assign.to_string()).and_modify(
|instructions| {
instructions.push(realtime_instructions.clone());
},
).or_insert(vec![realtime_instructions.clone()]);
assignments
.entry(node_to_assign.to_string())
.and_modify(|instructions| {
instructions.insert(
feed_id.clone(),
realtime_instructions.clone(),
);
})
.or_insert({
let mut map = HashMap::new();
map.insert(
feed_id.clone(),
realtime_instructions.clone(),
);
map
});
}
// look at current assignments, delete old assignments

// assign feeds to worker nodes

for (node, instructions) in assignments.iter() {
let instructions_per_worker = InstructionsPerWorker {
feeds: instructions.clone(),
};

//this is capped at 1MB, I should change the data structure to be smaller

// Option 1: Each worker node has a list of feeds, and then has to lookup the associated instructions in zookeeper
// downside, what if associated instructions update?

//Option 2: Store each instruction as a sub zookeeper node of the worker node
// upside is that the instructions disappear when the worker node disappears
//downside, still has too lookup all the sequential data etc

let _ = zk.set_data(
format!("/alpenrose_assignments/{}", node).as_str(),
None,
bincode::serialize(&instructions_per_worker).unwrap(),
)
.await
.unwrap();
//update assignments in zookeeper

for (worker_id, instructions_hashmap) in assignments.iter() {
for (feed_id, realtime_instruction) in instructions_hashmap {
let feed_id_str = feed_id.clone();

//update each feed under the workers node's assignment
let assignment = zk
.create(
format!(
"/alpenrose_assignments/{}/{}",
worker_id, feed_id_str
)
.as_str(),
bincode::serialize(&realtime_instruction).unwrap(),
Acl::open_unsafe(),
CreateMode::Ephemeral,
)
.await?;
}

//update the worker's last updated time
let worker_assignment_metadata = zk
.create(
format!("/alpenrose_assignments/{}", this_worker_id)
.as_str(),
bincode::serialize(&Some(
catenary::duration_since_unix_epoch().as_millis(),
))
.unwrap(),
Acl::open_unsafe(),
CreateMode::Ephemeral,
)
.await?;
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ pub mod schema;
use fasthash::MetroHasher;
use std::hash::Hash;
use std::hash::Hasher;
use std::time::Duration;
use std::time::{SystemTime, UNIX_EPOCH};

pub const WGS_84_SRID: u32 = 4326;

Expand Down Expand Up @@ -102,3 +104,7 @@ pub fn fast_hash<T: Hash>(t: &T) -> u64 {
t.hash(&mut s);
s.finish()
}

pub fn duration_since_unix_epoch() -> Duration {
SystemTime::now().duration_since(UNIX_EPOCH).unwrap()
}

0 comments on commit 0570119

Please sign in to comment.