Skip to content

Commit

Permalink
Merge pull request #103 from quickwit-oss/trinity--forget-previous-ge…
Browse files Browse the repository at this point in the history
…neration

forget previous generations of a node when discovering a new one
  • Loading branch information
trinity-1686a authored Dec 14, 2023
2 parents a3e3f8b + e729c8b commit 107a764
Show file tree
Hide file tree
Showing 11 changed files with 304 additions and 122 deletions.
14 changes: 11 additions & 3 deletions chitchat-test/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, SystemTime};

use chitchat::transport::UdpTransport;
use chitchat::{spawn_chitchat, Chitchat, ChitchatConfig, ChitchatId, FailureDetectorConfig};
Expand Down Expand Up @@ -28,7 +28,11 @@ impl Api {
cluster_id: chitchat_guard.cluster_id().to_string(),
cluster_state: chitchat_guard.state_snapshot(),
live_nodes: chitchat_guard.live_nodes().cloned().collect::<Vec<_>>(),
dead_nodes: chitchat_guard.dead_nodes().cloned().collect::<Vec<_>>(),
dead_nodes: chitchat_guard
.dead_nodes()
.cloned()
.map(|node| node.0)
.collect::<Vec<_>>(),
};
Json(serde_json::to_value(&response).unwrap())
}
Expand Down Expand Up @@ -84,7 +88,11 @@ async fn main() -> anyhow::Result<()> {
let node_id = opt
.node_id
.unwrap_or_else(|| generate_server_id(public_addr));
let chitchat_id = ChitchatId::new(node_id, 0, public_addr);
let generation = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs();
let chitchat_id = ChitchatId::new(node_id, generation, public_addr);
let config = ChitchatConfig {
cluster_id: "testing".to_string(),
chitchat_id,
Expand Down
49 changes: 31 additions & 18 deletions chitchat/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,40 @@ use std::collections::{BTreeMap, HashSet};
use std::mem;

use crate::serialize::*;
use crate::{ChitchatId, Heartbeat, MaxVersion, VersionedValue};
use crate::{ChitchatId, ChitchatIdGenerationEq, Heartbeat, MaxVersion, VersionedValue};

#[derive(Debug, Default, Eq, PartialEq)]
pub struct Delta {
pub(crate) node_deltas: BTreeMap<ChitchatId, NodeDelta>,
pub(crate) nodes_to_reset: HashSet<ChitchatId>,
pub(crate) node_deltas: BTreeMap<ChitchatIdGenerationEq, NodeDelta>,
pub(crate) nodes_to_reset: HashSet<ChitchatIdGenerationEq>,
}

impl Serializable for Delta {
fn serialize(&self, buf: &mut Vec<u8>) {
(self.node_deltas.len() as u16).serialize(buf);
for (chitchat_id, node_delta) in &self.node_deltas {
chitchat_id.serialize(buf);
chitchat_id.0.serialize(buf);
node_delta.serialize(buf);
}
(self.nodes_to_reset.len() as u16).serialize(buf);
for chitchat_id in &self.nodes_to_reset {
chitchat_id.serialize(buf);
chitchat_id.0.serialize(buf);
}
}

fn deserialize(buf: &mut &[u8]) -> anyhow::Result<Self> {
let mut node_deltas: BTreeMap<ChitchatId, NodeDelta> = Default::default();
let mut node_deltas: BTreeMap<ChitchatIdGenerationEq, NodeDelta> = Default::default();
let num_nodes = u16::deserialize(buf)?;
for _ in 0..num_nodes {
let chitchat_id = ChitchatId::deserialize(buf)?;
let node_delta = NodeDelta::deserialize(buf)?;
node_deltas.insert(chitchat_id, node_delta);
node_deltas.insert(ChitchatIdGenerationEq(chitchat_id), node_delta);
}
let num_nodes_to_reset = u16::deserialize(buf)?;
let mut nodes_to_reset = HashSet::with_capacity(num_nodes_to_reset as usize);
for _ in 0..num_nodes_to_reset {
let chitchat_id = ChitchatId::deserialize(buf)?;
nodes_to_reset.insert(chitchat_id);
nodes_to_reset.insert(ChitchatIdGenerationEq(chitchat_id));
}
Ok(Delta {
node_deltas,
Expand All @@ -46,12 +46,12 @@ impl Serializable for Delta {
fn serialized_len(&self) -> usize {
let mut len = 2;
for (chitchat_id, node_delta) in &self.node_deltas {
len += chitchat_id.serialized_len();
len += chitchat_id.0.serialized_len();
len += node_delta.serialized_len();
}
len += 2;
for chitchat_id in &self.nodes_to_reset {
len += chitchat_id.serialized_len();
len += chitchat_id.0.serialized_len();
}
len
}
Expand All @@ -68,7 +68,7 @@ impl Delta {

pub fn add_node(&mut self, chitchat_id: ChitchatId, heartbeat: Heartbeat) {
self.node_deltas
.entry(chitchat_id)
.entry(ChitchatIdGenerationEq(chitchat_id))
.or_insert_with(|| NodeDelta {
heartbeat,
..Default::default()
Expand All @@ -83,7 +83,10 @@ impl Delta {
version: crate::Version,
tombstone: Option<u64>,
) {
let node_delta = self.node_deltas.get_mut(chitchat_id).unwrap();
let node_delta = self
.node_deltas
.get_mut(&ChitchatIdGenerationEq(chitchat_id.clone()))
.unwrap();

node_delta.max_version = node_delta.max_version.max(version);
node_delta.key_values.insert(
Expand All @@ -97,7 +100,8 @@ impl Delta {
}

pub fn add_node_to_reset(&mut self, chitchat_id: ChitchatId) {
self.nodes_to_reset.insert(chitchat_id);
self.nodes_to_reset
.insert(ChitchatIdGenerationEq(chitchat_id));
}
}

Expand Down Expand Up @@ -141,29 +145,38 @@ impl DeltaWriter {
let chitchat_id_opt = mem::take(&mut self.current_chitchat_id);
let node_delta = mem::take(&mut self.current_node_delta);
if let Some(chitchat_id) = chitchat_id_opt {
self.delta.node_deltas.insert(chitchat_id, node_delta);
self.delta
.node_deltas
.insert(ChitchatIdGenerationEq(chitchat_id), node_delta);
}
}

pub fn add_node_to_reset(&mut self, chitchat_id: ChitchatId) -> bool {
let chitchat_id = ChitchatIdGenerationEq(chitchat_id);
assert!(!self.delta.nodes_to_reset.contains(&chitchat_id));
if !self.attempt_add_bytes(chitchat_id.serialized_len()) {
if !self.attempt_add_bytes(chitchat_id.0.serialized_len()) {
return false;
}
self.delta.nodes_to_reset.insert(chitchat_id);
true
}

pub fn add_node(&mut self, chitchat_id: ChitchatId, heartbeat: Heartbeat) -> bool {
assert!(self.current_chitchat_id.as_ref() != Some(&chitchat_id));
assert!(self
.current_chitchat_id
.as_ref()
.map(|current_node| !current_node.eq_generation(&chitchat_id))
.unwrap_or(true));
let chitchat_id = ChitchatIdGenerationEq(chitchat_id);
assert!(!self.delta.node_deltas.contains_key(&chitchat_id));
self.flush();
// Reserve bytes for [`ChitchatId`], [`Hearbeat`], and for an empty [`NodeDelta`] which has
// a size of 2 bytes.
if !self.attempt_add_bytes(chitchat_id.serialized_len() + heartbeat.serialized_len() + 2) {
if !self.attempt_add_bytes(chitchat_id.0.serialized_len() + heartbeat.serialized_len() + 2)
{
return false;
}
self.current_chitchat_id = Some(chitchat_id);
self.current_chitchat_id = Some(chitchat_id.0);
self.current_node_delta.heartbeat = heartbeat;
true
}
Expand Down
15 changes: 8 additions & 7 deletions chitchat/src/digest.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::BTreeMap;

use crate::serialize::*;
use crate::{ChitchatId, Heartbeat, MaxVersion};
use crate::{ChitchatId, ChitchatIdGenerationEq, Heartbeat, MaxVersion};

#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
pub(crate) struct NodeDigest {
Expand All @@ -25,45 +25,46 @@ impl NodeDigest {
/// peer -> (heartbeat, max version).
#[derive(Debug, Default, Eq, PartialEq)]
pub struct Digest {
pub(crate) node_digests: BTreeMap<ChitchatId, NodeDigest>,
pub(crate) node_digests: BTreeMap<ChitchatIdGenerationEq, NodeDigest>,
}

#[cfg(test)]
impl Digest {
pub fn add_node(&mut self, node: ChitchatId, heartbeat: Heartbeat, max_version: MaxVersion) {
let node_digest = NodeDigest::new(heartbeat, max_version);
self.node_digests.insert(node, node_digest);
self.node_digests
.insert(ChitchatIdGenerationEq(node), node_digest);
}
}

impl Serializable for Digest {
fn serialize(&self, buf: &mut Vec<u8>) {
(self.node_digests.len() as u16).serialize(buf);
for (chitchat_id, node_digest) in &self.node_digests {
chitchat_id.serialize(buf);
chitchat_id.0.serialize(buf);
node_digest.heartbeat.serialize(buf);
node_digest.max_version.serialize(buf);
}
}

fn deserialize(buf: &mut &[u8]) -> anyhow::Result<Self> {
let num_nodes = u16::deserialize(buf)?;
let mut node_digests: BTreeMap<ChitchatId, NodeDigest> = Default::default();
let mut node_digests: BTreeMap<ChitchatIdGenerationEq, NodeDigest> = Default::default();

for _ in 0..num_nodes {
let chitchat_id = ChitchatId::deserialize(buf)?;
let heartbeat = Heartbeat::deserialize(buf)?;
let max_version = u64::deserialize(buf)?;
let node_digest = NodeDigest::new(heartbeat, max_version);
node_digests.insert(chitchat_id, node_digest);
node_digests.insert(ChitchatIdGenerationEq(chitchat_id), node_digest);
}
Ok(Digest { node_digests })
}

fn serialized_len(&self) -> usize {
let mut len = (self.node_digests.len() as u16).serialized_len();
for (chitchat_id, node_digest) in &self.node_digests {
len += chitchat_id.serialized_len();
len += chitchat_id.0.serialized_len();
len += node_digest.heartbeat.serialized_len();
len += node_digest.max_version.serialized_len();
}
Expand Down
Loading

0 comments on commit 107a764

Please sign in to comment.