Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

forget previous generations of a node when discovering a new one #103

Merged
merged 1 commit into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions chitchat-test/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, SystemTime};

use chitchat::transport::UdpTransport;
use chitchat::{spawn_chitchat, Chitchat, ChitchatConfig, ChitchatId, FailureDetectorConfig};
Expand Down Expand Up @@ -28,7 +28,11 @@ impl Api {
cluster_id: chitchat_guard.cluster_id().to_string(),
cluster_state: chitchat_guard.state_snapshot(),
live_nodes: chitchat_guard.live_nodes().cloned().collect::<Vec<_>>(),
dead_nodes: chitchat_guard.dead_nodes().cloned().collect::<Vec<_>>(),
dead_nodes: chitchat_guard
.dead_nodes()
.cloned()
.map(|node| node.0)
.collect::<Vec<_>>(),
};
Json(serde_json::to_value(&response).unwrap())
}
Expand Down Expand Up @@ -84,7 +88,11 @@ async fn main() -> anyhow::Result<()> {
let node_id = opt
.node_id
.unwrap_or_else(|| generate_server_id(public_addr));
let chitchat_id = ChitchatId::new(node_id, 0, public_addr);
let generation = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs();
let chitchat_id = ChitchatId::new(node_id, generation, public_addr);
let config = ChitchatConfig {
cluster_id: "testing".to_string(),
chitchat_id,
Expand Down
49 changes: 31 additions & 18 deletions chitchat/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,40 @@ use std::collections::{BTreeMap, HashSet};
use std::mem;

use crate::serialize::*;
use crate::{ChitchatId, Heartbeat, MaxVersion, VersionedValue};
use crate::{ChitchatId, ChitchatIdGenerationEq, Heartbeat, MaxVersion, VersionedValue};

#[derive(Debug, Default, Eq, PartialEq)]
pub struct Delta {
pub(crate) node_deltas: BTreeMap<ChitchatId, NodeDelta>,
pub(crate) nodes_to_reset: HashSet<ChitchatId>,
pub(crate) node_deltas: BTreeMap<ChitchatIdGenerationEq, NodeDelta>,
pub(crate) nodes_to_reset: HashSet<ChitchatIdGenerationEq>,
}

impl Serializable for Delta {
fn serialize(&self, buf: &mut Vec<u8>) {
(self.node_deltas.len() as u16).serialize(buf);
for (chitchat_id, node_delta) in &self.node_deltas {
chitchat_id.serialize(buf);
chitchat_id.0.serialize(buf);
node_delta.serialize(buf);
}
(self.nodes_to_reset.len() as u16).serialize(buf);
for chitchat_id in &self.nodes_to_reset {
chitchat_id.serialize(buf);
chitchat_id.0.serialize(buf);
}
}

fn deserialize(buf: &mut &[u8]) -> anyhow::Result<Self> {
let mut node_deltas: BTreeMap<ChitchatId, NodeDelta> = Default::default();
let mut node_deltas: BTreeMap<ChitchatIdGenerationEq, NodeDelta> = Default::default();
let num_nodes = u16::deserialize(buf)?;
for _ in 0..num_nodes {
let chitchat_id = ChitchatId::deserialize(buf)?;
let node_delta = NodeDelta::deserialize(buf)?;
node_deltas.insert(chitchat_id, node_delta);
node_deltas.insert(ChitchatIdGenerationEq(chitchat_id), node_delta);
}
let num_nodes_to_reset = u16::deserialize(buf)?;
let mut nodes_to_reset = HashSet::with_capacity(num_nodes_to_reset as usize);
for _ in 0..num_nodes_to_reset {
let chitchat_id = ChitchatId::deserialize(buf)?;
nodes_to_reset.insert(chitchat_id);
nodes_to_reset.insert(ChitchatIdGenerationEq(chitchat_id));
}
Ok(Delta {
node_deltas,
Expand All @@ -46,12 +46,12 @@ impl Serializable for Delta {
fn serialized_len(&self) -> usize {
let mut len = 2;
for (chitchat_id, node_delta) in &self.node_deltas {
len += chitchat_id.serialized_len();
len += chitchat_id.0.serialized_len();
len += node_delta.serialized_len();
}
len += 2;
for chitchat_id in &self.nodes_to_reset {
len += chitchat_id.serialized_len();
len += chitchat_id.0.serialized_len();
}
len
}
Expand All @@ -68,7 +68,7 @@ impl Delta {

pub fn add_node(&mut self, chitchat_id: ChitchatId, heartbeat: Heartbeat) {
self.node_deltas
.entry(chitchat_id)
.entry(ChitchatIdGenerationEq(chitchat_id))
.or_insert_with(|| NodeDelta {
heartbeat,
..Default::default()
Expand All @@ -83,7 +83,10 @@ impl Delta {
version: crate::Version,
tombstone: Option<u64>,
) {
let node_delta = self.node_deltas.get_mut(chitchat_id).unwrap();
let node_delta = self
.node_deltas
.get_mut(&ChitchatIdGenerationEq(chitchat_id.clone()))
.unwrap();

node_delta.max_version = node_delta.max_version.max(version);
node_delta.key_values.insert(
Expand All @@ -97,7 +100,8 @@ impl Delta {
}

pub fn add_node_to_reset(&mut self, chitchat_id: ChitchatId) {
self.nodes_to_reset.insert(chitchat_id);
self.nodes_to_reset
.insert(ChitchatIdGenerationEq(chitchat_id));
}
}

Expand Down Expand Up @@ -141,29 +145,38 @@ impl DeltaWriter {
let chitchat_id_opt = mem::take(&mut self.current_chitchat_id);
let node_delta = mem::take(&mut self.current_node_delta);
if let Some(chitchat_id) = chitchat_id_opt {
self.delta.node_deltas.insert(chitchat_id, node_delta);
self.delta
.node_deltas
.insert(ChitchatIdGenerationEq(chitchat_id), node_delta);
}
}

pub fn add_node_to_reset(&mut self, chitchat_id: ChitchatId) -> bool {
let chitchat_id = ChitchatIdGenerationEq(chitchat_id);
assert!(!self.delta.nodes_to_reset.contains(&chitchat_id));
if !self.attempt_add_bytes(chitchat_id.serialized_len()) {
if !self.attempt_add_bytes(chitchat_id.0.serialized_len()) {
return false;
}
self.delta.nodes_to_reset.insert(chitchat_id);
true
}

pub fn add_node(&mut self, chitchat_id: ChitchatId, heartbeat: Heartbeat) -> bool {
assert!(self.current_chitchat_id.as_ref() != Some(&chitchat_id));
assert!(self
.current_chitchat_id
.as_ref()
.map(|current_node| !current_node.eq_generation(&chitchat_id))
.unwrap_or(true));
let chitchat_id = ChitchatIdGenerationEq(chitchat_id);
assert!(!self.delta.node_deltas.contains_key(&chitchat_id));
self.flush();
// Reserve bytes for [`ChitchatId`], [`Hearbeat`], and for an empty [`NodeDelta`] which has
// a size of 2 bytes.
if !self.attempt_add_bytes(chitchat_id.serialized_len() + heartbeat.serialized_len() + 2) {
if !self.attempt_add_bytes(chitchat_id.0.serialized_len() + heartbeat.serialized_len() + 2)
{
return false;
}
self.current_chitchat_id = Some(chitchat_id);
self.current_chitchat_id = Some(chitchat_id.0);
self.current_node_delta.heartbeat = heartbeat;
true
}
Expand Down
15 changes: 8 additions & 7 deletions chitchat/src/digest.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::BTreeMap;

use crate::serialize::*;
use crate::{ChitchatId, Heartbeat, MaxVersion};
use crate::{ChitchatId, ChitchatIdGenerationEq, Heartbeat, MaxVersion};

#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
pub(crate) struct NodeDigest {
Expand All @@ -25,45 +25,46 @@ impl NodeDigest {
/// peer -> (heartbeat, max version).
#[derive(Debug, Default, Eq, PartialEq)]
pub struct Digest {
pub(crate) node_digests: BTreeMap<ChitchatId, NodeDigest>,
pub(crate) node_digests: BTreeMap<ChitchatIdGenerationEq, NodeDigest>,
}

#[cfg(test)]
impl Digest {
pub fn add_node(&mut self, node: ChitchatId, heartbeat: Heartbeat, max_version: MaxVersion) {
let node_digest = NodeDigest::new(heartbeat, max_version);
self.node_digests.insert(node, node_digest);
self.node_digests
.insert(ChitchatIdGenerationEq(node), node_digest);
}
}

impl Serializable for Digest {
fn serialize(&self, buf: &mut Vec<u8>) {
(self.node_digests.len() as u16).serialize(buf);
for (chitchat_id, node_digest) in &self.node_digests {
chitchat_id.serialize(buf);
chitchat_id.0.serialize(buf);
node_digest.heartbeat.serialize(buf);
node_digest.max_version.serialize(buf);
}
}

fn deserialize(buf: &mut &[u8]) -> anyhow::Result<Self> {
let num_nodes = u16::deserialize(buf)?;
let mut node_digests: BTreeMap<ChitchatId, NodeDigest> = Default::default();
let mut node_digests: BTreeMap<ChitchatIdGenerationEq, NodeDigest> = Default::default();

for _ in 0..num_nodes {
let chitchat_id = ChitchatId::deserialize(buf)?;
let heartbeat = Heartbeat::deserialize(buf)?;
let max_version = u64::deserialize(buf)?;
let node_digest = NodeDigest::new(heartbeat, max_version);
node_digests.insert(chitchat_id, node_digest);
node_digests.insert(ChitchatIdGenerationEq(chitchat_id), node_digest);
}
Ok(Digest { node_digests })
}

fn serialized_len(&self) -> usize {
let mut len = (self.node_digests.len() as u16).serialized_len();
for (chitchat_id, node_digest) in &self.node_digests {
len += chitchat_id.serialized_len();
len += chitchat_id.0.serialized_len();
len += node_digest.heartbeat.serialized_len();
len += node_digest.max_version.serialized_len();
}
Expand Down
Loading
Loading