Skip to content

Commit

Permalink
Emit ClusterChange::Remove only if node generation IDs match (#3899)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored Oct 2, 2023
1 parent 5060b6c commit 65895fd
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 48 deletions.
120 changes: 74 additions & 46 deletions quickwit/quickwit-cluster/src/change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::btree_map::Entry;
use std::collections::BTreeMap;

use chitchat::{ChitchatId, NodeState};
Expand All @@ -25,7 +26,7 @@ use quickwit_common::tower::{make_channel, warmup_channel};
use tracing::{info, warn};

use crate::member::NodeStateExt;
use crate::ClusterNode;
use crate::{ClusterNode, NodeId};

#[derive(Debug, Clone)]
pub enum ClusterChange {
Expand All @@ -39,7 +40,7 @@ pub enum ClusterChange {
pub(crate) async fn compute_cluster_change_events(
cluster_id: &str,
self_chitchat_id: &ChitchatId,
previous_nodes: &mut BTreeMap<ChitchatId, ClusterNode>,
previous_nodes: &mut BTreeMap<NodeId, ClusterNode>,
previous_node_states: &BTreeMap<ChitchatId, NodeState>,
new_node_states: &BTreeMap<ChitchatId, NodeState>,
) -> Vec<ClusterChange> {
Expand Down Expand Up @@ -96,17 +97,8 @@ async fn compute_cluster_change_events_on_added(
self_chitchat_id: &ChitchatId,
new_chitchat_id: &ChitchatId,
new_node_state: &NodeState,
previous_nodes: &mut BTreeMap<ChitchatId, ClusterNode>,
previous_nodes: &mut BTreeMap<NodeId, ClusterNode>,
) -> Option<ClusterChange> {
let is_self_node = self_chitchat_id == new_chitchat_id;
if !is_self_node {
info!(
cluster_id=%cluster_id,
node_id=%new_chitchat_id.node_id,
"Node `{}` has joined the cluster.",
new_chitchat_id.node_id
);
}
let grpc_advertise_addr = match new_node_state.grpc_advertise_addr() {
Ok(addr) => addr,
Err(error) => {
Expand All @@ -120,6 +112,7 @@ async fn compute_cluster_change_events_on_added(
}
};
let channel = make_channel(grpc_advertise_addr).await;
let is_self_node = self_chitchat_id == new_chitchat_id;
let new_node = match ClusterNode::try_new(
new_chitchat_id.clone(),
new_node_state,
Expand All @@ -137,8 +130,26 @@ async fn compute_cluster_change_events_on_added(
return None;
}
};
previous_nodes.insert(new_chitchat_id.clone(), new_node.clone());
let new_node_id = new_node.chitchat_id().node_id.clone();
let previous_node_opt = previous_nodes.insert(new_node_id, new_node.clone());

if !is_self_node {
if previous_node_opt.is_some() {
info!(
cluster_id=%cluster_id,
node_id=%new_chitchat_id.node_id,
"Node `{}` has rejoined the cluster.",
new_chitchat_id.node_id
);
} else {
info!(
cluster_id=%cluster_id,
node_id=%new_chitchat_id.node_id,
"Node `{}` has joined the cluster.",
new_chitchat_id.node_id
);
}
}
if new_node.is_ready() {
warmup_channel(new_node.channel()).await;

Expand All @@ -160,9 +171,9 @@ async fn compute_cluster_change_events_on_updated(
self_chitchat_id: &ChitchatId,
updated_chitchat_id: &ChitchatId,
updated_node_state: &NodeState,
previous_nodes: &mut BTreeMap<ChitchatId, ClusterNode>,
previous_nodes: &mut BTreeMap<NodeId, ClusterNode>,
) -> Option<ClusterChange> {
let previous_node = previous_nodes.get(updated_chitchat_id)?.clone();
let previous_node = previous_nodes.get(&updated_chitchat_id.node_id)?.clone();
let previous_channel = previous_node.channel();
let is_self_node = self_chitchat_id == updated_chitchat_id;
let updated_node = match ClusterNode::try_new(
Expand All @@ -182,7 +193,7 @@ async fn compute_cluster_change_events_on_updated(
return None;
}
};
previous_nodes.insert(updated_chitchat_id.clone(), updated_node.clone());
previous_nodes.insert(updated_chitchat_id.node_id.clone(), updated_node.clone());

if !previous_node.is_ready() && updated_node.is_ready() {
warmup_channel(updated_node.channel()).await;
Expand Down Expand Up @@ -217,23 +228,30 @@ fn compute_cluster_change_events_on_removed(
cluster_id: &str,
self_chitchat_id: &ChitchatId,
removed_chitchat_id: &ChitchatId,
previous_nodes: &mut BTreeMap<ChitchatId, ClusterNode>,
previous_nodes: &mut BTreeMap<NodeId, ClusterNode>,
) -> Option<ClusterChange> {
if self_chitchat_id != removed_chitchat_id {
info!(
cluster_id=%cluster_id,
node_id=%removed_chitchat_id.node_id,
"Node `{}` has left the cluster.",
removed_chitchat_id.node_id
);
}
let previous_node = previous_nodes.remove(removed_chitchat_id)?;
let removed_node_id = removed_chitchat_id.node_id.clone();

if let Entry::Occupied(previous_node_entry) = previous_nodes.entry(removed_node_id) {
let previous_node_ref = previous_node_entry.get();

if previous_node_ref.chitchat_id().generation_id == removed_chitchat_id.generation_id {
if self_chitchat_id != removed_chitchat_id {
info!(
cluster_id=%cluster_id,
node_id=%removed_chitchat_id.node_id,
"Node `{}` has left the cluster.",
removed_chitchat_id.node_id
);
}
let previous_node = previous_node_entry.remove();

if previous_node.is_ready() {
Some(ClusterChange::Remove(previous_node))
} else {
None
}
if previous_node.is_ready() {
return Some(ClusterChange::Remove(previous_node));
}
}
};
None
}

#[cfg(test)]
Expand Down Expand Up @@ -361,7 +379,7 @@ mod tests {
.await;
assert!(event.is_none());

let node = previous_nodes.get(&new_chitchat_id).unwrap();
let node = previous_nodes.get(&new_chitchat_id.node_id).unwrap();

assert_eq!(node.chitchat_id(), &new_chitchat_id);
assert_eq!(node.grpc_advertise_addr(), grpc_advertise_addr);
Expand Down Expand Up @@ -396,7 +414,7 @@ mod tests {
assert_eq!(node.grpc_advertise_addr(), grpc_advertise_addr);
assert!(!node.is_self_node());
assert!(node.is_ready());
assert_eq!(previous_nodes.get(&new_chitchat_id).unwrap(), &node);
assert_eq!(previous_nodes.get(&new_chitchat_id.node_id).unwrap(), &node);
}
{
// Self node joined the cluster and is ready.
Expand Down Expand Up @@ -425,7 +443,7 @@ mod tests {
assert_eq!(node.grpc_advertise_addr(), grpc_advertise_addr);
assert!(node.is_self_node());
assert!(node.is_ready());
assert_eq!(previous_nodes.get(&new_chitchat_id).unwrap(), &node);
assert_eq!(previous_nodes.get(&new_chitchat_id.node_id).unwrap(), &node);
}
}

Expand Down Expand Up @@ -453,7 +471,7 @@ mod tests {
)
.unwrap();
let mut previous_nodes =
BTreeMap::from_iter([(updated_chitchat_id.clone(), previous_node)]);
BTreeMap::from_iter([(updated_chitchat_id.node_id.clone(), previous_node)]);

let updated_node_state = NodeStateBuilder::default()
.with_grpc_advertise_addr(grpc_advertise_addr)
Expand All @@ -476,7 +494,10 @@ mod tests {
assert_eq!(node.grpc_advertise_addr(), grpc_advertise_addr);
assert!(node.is_ready());
assert!(!node.is_self_node());
assert_eq!(previous_nodes.get(&updated_chitchat_id).unwrap(), &node);
assert_eq!(
previous_nodes.get(&updated_chitchat_id.node_id).unwrap(),
&node
);
}
{
// Node changed.
Expand All @@ -497,7 +518,7 @@ mod tests {
)
.unwrap();
let mut previous_nodes =
BTreeMap::from_iter([(updated_chitchat_id.clone(), previous_node)]);
BTreeMap::from_iter([(updated_chitchat_id.node_id.clone(), previous_node)]);

let updated_node_state = NodeStateBuilder::default()
.with_grpc_advertise_addr(grpc_advertise_addr)
Expand All @@ -520,7 +541,10 @@ mod tests {
assert_eq!(node.grpc_advertise_addr(), grpc_advertise_addr);
assert!(!node.is_self_node());
assert!(node.is_ready());
assert_eq!(previous_nodes.get(&updated_chitchat_id).unwrap(), &node);
assert_eq!(
previous_nodes.get(&updated_chitchat_id.node_id).unwrap(),
&node
);
}
{
// Node is no longer ready.
Expand All @@ -541,7 +565,7 @@ mod tests {
)
.unwrap();
let mut previous_nodes =
BTreeMap::from_iter([(updated_chitchat_id.clone(), previous_node)]);
BTreeMap::from_iter([(updated_chitchat_id.node_id.clone(), previous_node)]);

let updated_node_state = NodeStateBuilder::default()
.with_grpc_advertise_addr(grpc_advertise_addr)
Expand All @@ -564,7 +588,10 @@ mod tests {
assert_eq!(node.grpc_advertise_addr(), grpc_advertise_addr);
assert!(!node.is_self_node());
assert!(!node.is_ready());
assert_eq!(previous_nodes.get(&updated_chitchat_id).unwrap(), &node);
assert_eq!(
previous_nodes.get(&updated_chitchat_id.node_id).unwrap(),
&node
);
}
}

Expand Down Expand Up @@ -606,7 +633,7 @@ mod tests {
)
.unwrap();
let mut previous_nodes =
BTreeMap::from_iter([(removed_chitchat_id.clone(), previous_node)]);
BTreeMap::from_iter([(removed_chitchat_id.node_id.clone(), previous_node)]);

let event_opt = compute_cluster_change_events_on_removed(
&cluster_id,
Expand All @@ -615,7 +642,7 @@ mod tests {
&mut previous_nodes,
);
assert!(event_opt.is_none());
assert!(!previous_nodes.contains_key(&removed_chitchat_id));
assert!(!previous_nodes.contains_key(&removed_chitchat_id.node_id));
}
{
// Node left the cluster in ready state.
Expand All @@ -630,7 +657,8 @@ mod tests {
let node =
ClusterNode::try_new(removed_chitchat_id.clone(), &new_node_state, channel, false)
.unwrap();
let mut previous_nodes = BTreeMap::from_iter([(removed_chitchat_id.clone(), node)]);
let mut previous_nodes =
BTreeMap::from_iter([(removed_chitchat_id.node_id.clone(), node)]);

let event = compute_cluster_change_events_on_removed(
&cluster_id,
Expand All @@ -647,7 +675,7 @@ mod tests {
assert_eq!(node.grpc_advertise_addr(), grpc_advertise_addr);
assert!(!node.is_self_node());
assert!(node.is_ready());
assert!(!previous_nodes.contains_key(&removed_chitchat_id));
assert!(!previous_nodes.contains_key(&removed_chitchat_id.node_id));
}
}

Expand Down Expand Up @@ -683,7 +711,7 @@ mod tests {
)
.unwrap();
let mut previous_nodes =
BTreeMap::from_iter([(self_chitchat_id.clone(), previous_node)]);
BTreeMap::from_iter([(self_chitchat_id.node_id.clone(), previous_node)]);
let previous_node_states =
BTreeMap::from_iter([(self_chitchat_id.clone(), previous_node_state)]);

Expand Down Expand Up @@ -747,7 +775,7 @@ mod tests {
)
.unwrap();
let mut previous_nodes =
BTreeMap::from_iter([(self_chitchat_id.clone(), previous_node)]);
BTreeMap::from_iter([(self_chitchat_id.node_id.clone(), previous_node)]);
let previous_node_states =
BTreeMap::from_iter([(self_chitchat_id.clone(), previous_node_state)]);

Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-cluster/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use crate::member::{
GRPC_ADVERTISE_ADDR_KEY, INDEXING_TASK_PREFIX, READINESS_KEY, READINESS_VALUE_NOT_READY,
READINESS_VALUE_READY,
};
use crate::ClusterNode;
use crate::{ClusterNode, NodeId};

const GOSSIP_INTERVAL: Duration = if cfg!(any(test, feature = "testsuite")) {
Duration::from_millis(25)
Expand Down Expand Up @@ -422,7 +422,7 @@ struct InnerCluster {
cluster_id: String,
self_chitchat_id: ChitchatId,
chitchat_handle: ChitchatHandle,
live_nodes: BTreeMap<ChitchatId, ClusterNode>,
live_nodes: BTreeMap<NodeId, ClusterNode>,
change_stream_subscribers: Vec<mpsc::UnboundedSender<ClusterChange>>,
ready_members_rx: watch::Receiver<Vec<ClusterMember>>,
}
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ pub use crate::cluster::{Cluster, ClusterSnapshot, NodeIdSchema};
pub use crate::member::ClusterMember;
pub use crate::node::ClusterNode;

pub type NodeId = String;

#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub struct GenerationId(u64);

Expand Down

0 comments on commit 65895fd

Please sign in to comment.