From 4c47f6c69e8573461463789551542ef4ae3d0712 Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Thu, 16 Jan 2025 18:20:33 +0000 Subject: [PATCH] [Locality] Location-aware NodeSetChecker This introduces a new sophisticated and performance-optimized nodeset checker for bifrost flexible quorums. It pre-computes aggregates to reduce the cost of repeatative quorum checks. The new nodeset checker has extensive test coverage --- Cargo.lock | 1 + .../logs_controller/nodeset_selection.rs | 2 +- crates/bifrost/Cargo.toml | 1 + .../replicated_loglet/nodeset_selector.rs | 2 +- .../replicated_loglet/replication/checker.rs | 1632 ++++++++++++++--- .../replicated_loglet/replication/mod.rs | 1 + .../replication/spread_selector.rs | 4 +- .../replicated_loglet/sequencer/appender.rs | 2 +- .../replicated_loglet/tasks/check_seal.rs | 3 +- .../replicated_loglet/tasks/digests.rs | 7 +- .../replicated_loglet/tasks/find_tail.rs | 2 +- .../replicated_loglet/tasks/get_trim_point.rs | 2 +- .../providers/replicated_loglet/tasks/mod.rs | 2 +- .../providers/replicated_loglet/tasks/seal.rs | 2 +- .../providers/replicated_loglet/tasks/trim.rs | 2 +- crates/types/src/nodes_config.rs | 20 +- crates/types/src/replicated_loglet/params.rs | 4 +- .../replicated_loglet/replication_property.rs | 83 +- 18 files changed, 1474 insertions(+), 298 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fdaecd429..340d3014c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6118,6 +6118,7 @@ dependencies = [ "serde", "serde_json", "smallvec", + "smartstring", "static_assertions", "tempfile", "test-log", diff --git a/crates/admin/src/cluster_controller/logs_controller/nodeset_selection.rs b/crates/admin/src/cluster_controller/logs_controller/nodeset_selection.rs index ebd05a0d6..1fc012602 100644 --- a/crates/admin/src/cluster_controller/logs_controller/nodeset_selection.rs +++ b/crates/admin/src/cluster_controller/logs_controller/nodeset_selection.rs @@ -79,7 +79,7 @@ impl<'a> NodeSetSelector<'a> { rng: &mut R, preferred_nodes: &NodeSet, ) -> Result { - if replication_property.at_greatest_scope().0 != &LocationScope::Node { + if replication_property.greatest_defined_scope() > LocationScope::Node { // todo: add support for other location scopes unimplemented!("only node-scoped replication is currently supported"); } diff --git a/crates/bifrost/Cargo.toml b/crates/bifrost/Cargo.toml index 43ffd7bc7..b1d5c8f82 100644 --- a/crates/bifrost/Cargo.toml +++ b/crates/bifrost/Cargo.toml @@ -42,6 +42,7 @@ rocksdb = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } smallvec = { workspace = true } +smartstring = { workspace = true } static_assertions = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } diff --git a/crates/bifrost/src/providers/replicated_loglet/nodeset_selector.rs b/crates/bifrost/src/providers/replicated_loglet/nodeset_selector.rs index 4b48b541e..ec4eb412e 100644 --- a/crates/bifrost/src/providers/replicated_loglet/nodeset_selector.rs +++ b/crates/bifrost/src/providers/replicated_loglet/nodeset_selector.rs @@ -89,7 +89,7 @@ impl<'a> NodeSetSelector<'a> { rng: &mut R, preferred_nodes: &NodeSet, ) -> Result { - if replication_property.at_greatest_scope().0 != &LocationScope::Node { + if replication_property.greatest_defined_scope() > LocationScope::Node { // todo: add support for other location scopes unimplemented!("only node-scoped replication is currently supported"); } diff --git a/crates/bifrost/src/providers/replicated_loglet/replication/checker.rs b/crates/bifrost/src/providers/replicated_loglet/replication/checker.rs index 2b7251527..a8cf09d0e 100644 --- a/crates/bifrost/src/providers/replicated_loglet/replication/checker.rs +++ b/crates/bifrost/src/providers/replicated_loglet/replication/checker.rs @@ -8,28 +8,87 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::collections::{hash_map, HashMap}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::fmt::Debug; use std::fmt::Display; +use std::hash::{Hash, Hasher}; +use itertools::Itertools; +use tracing::warn; +use xxhash_rust::xxh3::Xxh3Builder; + +use restate_types::locality::{NodeLocation, NodeLocationScope}; use restate_types::nodes_config::{NodesConfiguration, StorageState}; use restate_types::replicated_loglet::{NodeSet, ReplicationProperty}; use restate_types::Merge; use restate_types::PlainNodeId; -/// NodeSetChecker maintains a set of nodes that can be tagged with -/// an attribute, and provides an API for querying the replication properties of -/// the subset of nodes with a certain values for this attribute, given a -/// replication requirement across several failure domains. +type SmartString = smartstring::SmartString; + +/// Possible results of f-majority checks for a subset of the NodeSet. +/// Read variant docs for details. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum FMajorityResult { + /// The subset of nodes neither satisfies the authoritative f-majority + /// property, nor does it contain sufficient authoritative nodes. + /// + /// * Bad. No f-majority is possible. + None, + /// No f-majority, but all authoritative nodes already matches the attribute and predicate. + /// This indicates that it's the best possible achievable f-majority and it's up to the user to + /// decide whether it to consider this a "safe" majority or not. By design, `passed()` return + /// false for this one. + /// + /// * Bad, but it's the best possible majority given the nodeset status. + BestEffort, + /// the subset of nodes satisfies the f-majority property, but not enough nodes are + /// authoritative in that matching subset. + /// + /// * Okay (depending on the situation) + SuccessWithRisk, + /// the subset of nodes satisfy the authoritative f-majority property, and it + /// has enough authoritative in that matching subset. + /// + /// * Good + Success, +} + +impl FMajorityResult { + pub fn passed(&self) -> bool { + matches!( + self, + FMajorityResult::Success { .. } | FMajorityResult::SuccessWithRisk + ) + } + + pub fn is_authoritative_complete(&self) -> bool { + matches!(self, FMajorityResult::Success) + } +} + +/// Run quorum checks on a set of nodes tagged with certain attribute /// -/// The checker is created with default value of Attribute set on all nodes. +/// It maintains a set of nodes that can be tagged with an attribute, +/// and provides an API for querying the replication properties of the subset of nodes +/// with a certain values for this attribute, given a replication requirement across +/// several failure domains. /// -/// **NOTE:** Currently, this will not perform any failure-domain-aware quorum -/// checks, this will be implemented in the near future. +/// +/// The checker is created without any values assigned to any nodes. Note that `check_write_quorum` +/// and `check_fmajority` will only consider nodes with an attribute assigned to them. +/// +/// If an attribute is needed to be set by default on all nodes, then use `fill_with` to assign the +/// attribute to all nodes. It's best to do this only if [`NodeSetChecker`] is kept for long-ish +/// period of time. /// /// The utility provides two methods: -/// - `check_write_quorum()`: Can be used to check if it'd be possible to replicate a record on the -/// subset of nodes that have a certain value for the attribute +/// - `check_write_quorum()`: Can be used to check if the subset of nodes that are tagged with an attribute +/// matching a predicate may form a legal write quorum. +/// Note that this function doesn't care whether those tagged nodes are +/// writeable or not. +/// It's your responsibility to mark the correct nodes if you want to take +/// StorageState into account. +/// /// - `check_fmajority()`: Used to check if enough nodes have certain values for the /// attribute so that that set of nodes is an f-majority for at /// least one of the scope for which there is a replication @@ -39,327 +98,747 @@ use restate_types::PlainNodeId; /// The seal operation is able to know if it can consider the seal /// to be completed or not. /// -/// Note that this doesn't track changes that happen to the storage-states after instantiation. -/// For a fresh view, rebuild this with a new nodes configuration. -pub struct NodeSetChecker<'a, Attribute> { - node_attribute: HashMap, - /// Mapping between node-id and its log-server storage state - storage_states: HashMap, - replication_property: &'a ReplicationProperty, +/// Note that at the moment this doesn't track changes that happen to the storage-states after instantiation. +/// For a fresh view, rebuild this with a new nodes configuration. This might change in the future +/// with a method to refresh the nodes configuration's view. +/// +/// +/// ## F-Majority and authoritative-ness +/// +/// The concept of "f-majority" can be defined as a set of nodes that intersects every possible +/// legal write quorum. Nodes are assigned a [`StorageState`] that define their authoritative-ness. +/// An authoritative node is one that has not lost data and is not empty. Empty in this context +/// means that it's guaranteed that it has never received data and was never a participant in +/// previous writes. A node is considered non-authoritative if it's in [`StorageState::DataLoss`] +/// which means it has lost data and it cannot be considered a reliable participant in some quorum +/// checks. That said, dataloss doesn't include "corruption". It only means that if a node responds +/// negatively to a read request that we cannot confidently assume that the data was never written. +/// Conversely, if the node responds with a record/data, we can use this data safely. +pub struct NodeSetChecker { + // This is a btree-map to leverage its great cache-locality on small sets like this one. + // this could also be Box<[(PlainNodeId, LocationScopeState)]> but btreemap is nicer to + // use. + scopes: BTreeMap>, + node_to_attr: HashMap, + /// Mapping between node-id and its log-server storage state. Note that we keep all nodes even + /// unreadable ones here because they might become readable after a nodes configuration + /// refresh. That said. Node-ids that have been deleted in nodes-configuration will not appear + /// here. + node_to_storage_state: HashMap, } -impl<'a, Attribute> NodeSetChecker<'a, Attribute> { +impl NodeSetChecker { // Note that this doesn't track changes that happen to the storage-states after instantiation. // For a fresh view, rebuild this with a new nodes configuration. pub fn new( nodeset: &NodeSet, nodes_config: &NodesConfiguration, - replication_property: &'a ReplicationProperty, - ) -> Self - where - Attribute: Default, - { - Self::with_factory(nodeset, nodes_config, replication_property, |_| { - Default::default() - }) - } - - pub fn with_factory( - nodeset: &NodeSet, - nodes_config: &NodesConfiguration, - replication_property: &'a ReplicationProperty, - attribute_factory: impl Fn(PlainNodeId) -> Attribute, + replication_property: &ReplicationProperty, ) -> Self { - let storage_states: HashMap<_, _> = nodeset - .iter() - .filter_map(|n| { - match nodes_config.get_log_server_storage_state(n) { - // storage states. Only include nodes that enable reads or above. - storage_state if !storage_state.empty() => Some((*n, storage_state)), - // node is not readable or doesn't exist. Treat as DISABLED - _ => None, - } - }) - .collect(); + let mut scope = NodeLocationScope::Root; + let mut scopes = BTreeMap::new(); + while let Some(current_scope) = scope.next_smaller_scope() { + // we are not interested in the scope if replication property doesn't + // specify a value for it. + if let Some(replication) = replication_property.copies_at_scope(current_scope) { + scopes.insert(current_scope, LocationScopeState::new(replication)); + } + scope = current_scope; + } - let node_attribute: HashMap<_, _> = storage_states - .keys() - .map(|node_id| (*node_id, attribute_factory(*node_id))) - .collect(); + // we must have at least node-level replication defined + assert!(!scopes.is_empty()); - Self { - node_attribute, - storage_states, - replication_property, + let storage_states = HashMap::with_capacity_and_hasher(nodeset.len(), Xxh3Builder::new()); + let node_attribute = HashMap::with_capacity_and_hasher(nodeset.len(), Xxh3Builder::new()); + + let mut checker = Self { + scopes, + node_to_attr: node_attribute, + node_to_storage_state: storage_states, + }; + + for node_id in nodeset { + if let Ok(config) = nodes_config.find_node_by_id(*node_id) { + checker.add_node( + *node_id, + config.log_server_config.storage_state, + &config.location, + ); + } } + + checker } - pub fn len(&self) -> usize { - self.node_attribute.len() + /// The number of authoritative nodes in this node set + pub fn count_authoritative_nodes(&self) -> u32 { + u32::try_from( + self.node_to_storage_state + .values() + .filter(|s| StorageState::is_authoritative(s)) + .count(), + ) + .expect("number of nodes in a cluster must fit in u32") } - pub fn is_empty(&self) -> bool { - self.node_attribute.is_empty() + /// The number of nodes in that nodeset that has matches the storage state predicate + pub fn count_nodes(&self, predicate: impl Fn(&StorageState) -> bool) -> u32 { + u32::try_from( + self.node_to_storage_state + .values() + .filter(|s| predicate(s)) + .count(), + ) + .expect("number of nodes in a cluster must fit in u32") } - /// resets all attributes for all nodes to this value - pub fn reset_with(&mut self, attribute: Attribute) + /// Checks if the set of nodes with attribute matching the predicate are in fact, all authoritative nodes. + pub fn is_complete_set(&self, predicate: &Predicate) -> bool where - Attribute: Clone, + Predicate: Fn(&Attr) -> bool, { - for (_, v) in self.node_attribute.iter_mut() { - *v = attribute.clone(); - } + // count complete domains + let node_scope = self + .scopes + .get(&NodeLocationScope::Node) + .expect("node scope must be set"); + + // A shortcut to get the number of authoritative nodes that have this attribute matching + let (num_matching_auth_nodes, _) = node_scope.count_matching_domains(&predicate); + // did all authoritative nodes have this attribute? + num_matching_auth_nodes == self.count_authoritative_nodes() } - /// resets all attributes for all nodes with the default value of Attribute - pub fn reset_with_default(&mut self) - where - Attribute: Default, - { - for (_, v) in self.node_attribute.iter_mut() { - *v = Default::default(); - } + /// How many nodes have attributes set. Note that this doesn't count nodes with has not + /// received an attribute yet (set_attribute was not called on such nodes) + pub fn len(&self) -> usize { + self.node_to_attr.len() } - /// Set the attribute value of a node. Note that a node can only be - /// associated with one attribute value at a time, so if the node has an - /// existing attribute value, the value will be cleared. - /// - /// Returns the old attribute if it was set - pub fn set_attribute( - &mut self, - node_id: PlainNodeId, - attribute: Attribute, - ) -> Option { - // ignore if the node is not in the original nodeset - if self.storage_states.contains_key(&node_id) { - self.node_attribute.insert(node_id, attribute) - } else { - None - } + /// Returns true if no attributes were set to any nodes + pub fn is_empty(&self) -> bool { + self.node_to_attr.is_empty() } - pub fn set_attribute_on_each<'b>( + /// Sets this attribute value on every node of the input iterator + pub fn set_attribute_on_each( &mut self, - nodes: impl IntoIterator, - f: impl Fn() -> Attribute, + nodes: impl IntoIterator>, + attribute: Attr, ) { for node in nodes.into_iter() { // ignore if the node is not in the original nodeset - if self.storage_states.contains_key(node) { - self.node_attribute.insert(*node, f()); - } + self.set_attribute(node, attribute.clone()); } } - pub fn remove_attribute(&mut self, node_id: &PlainNodeId) -> Option { - self.node_attribute.remove(node_id) + /// sets all nodes in the node-set to this attribute + pub fn fill_with(&mut self, attribute: Attr) { + let node_ids = self.node_to_storage_state.keys().copied().collect_vec(); + self.set_attribute_on_each(node_ids, attribute); } - pub fn merge_attribute(&mut self, node_id: PlainNodeId, attribute: Attribute) + /// sets all attributes for all nodes with the default value of Attribute + pub fn fill_with_default(&mut self) where - Attribute: Merge, + Attr: Default, { - match self.node_attribute.entry(node_id) { - hash_map::Entry::Occupied(mut existing) => { - existing.get_mut().merge(attribute); - } - hash_map::Entry::Vacant(entry) => { - entry.insert(attribute); - } - } + self.fill_with(Default::default()); } - pub fn get_attribute(&mut self, node_id: &PlainNodeId) -> Option<&Attribute> { - self.node_attribute.get(node_id) + /// Set the attribute value of a node. Note that a node can only be + /// associated with one attribute value at a time, so if the node has an + /// existing attribute value, the value will be swapped with this value. + pub fn set_attribute(&mut self, node_id: impl Into, attribute: Attr) { + let node_id = node_id.into(); + // ignore if the node is not in the original nodeset + let storage_state = self + .node_to_storage_state + .get(&node_id) + .copied() + .unwrap_or(StorageState::Disabled); + if !storage_state.should_read_from() { + return; + } + let old_attribute = self.node_to_attr.insert(node_id, attribute.clone()); + if let Some(old_attribute) = old_attribute { + if old_attribute == attribute { + // nothing to be done here. + return; + } + // update references for old attribute + self.rm_attr_internal(node_id, &old_attribute, storage_state); + } + self.add_attr_internal(node_id, attribute, storage_state); } - /// Check if nodes that match the predicate meet the write-quorum rules according to the - /// replication property. For instance, if replication property is set to {node: 3, zone: 2} - /// then this function will return `True` if nodes that match the predicate are spread across 2 - /// zones. - pub fn check_write_quorum(&self, predicate: Predicate) -> bool + /// Merges the input attributes with the already assigned attribute if set, otherwise, it'll + /// merge with the default value of this attribute type. + pub fn merge_attribute(&mut self, node_id: impl Into, attribute: Attr) where - Predicate: Fn(&Attribute) -> bool, + Attr: Merge + Default, { - let filtered = self.node_attribute.iter().filter(|(node_id, v)| { - predicate(v) - && self - .storage_states - .get(node_id) - .expect("node must be in node-set") - // only consider nodes that are writeable. - .can_write_to() - }); - // todo(asoli): Location-aware quorum check - filtered.count() >= self.replication_property.num_copies().into() + let node_id = node_id.into(); + let storage_state = self + .node_to_storage_state + .get(&node_id) + .copied() + .unwrap_or(StorageState::Disabled); + if !storage_state.should_read_from() { + return; + } + let existing = self.node_to_attr.get(&node_id); + match existing { + Some(existing) => { + let mut new = existing.clone(); + if new.merge(attribute) { + // only attempt to set if the merge resulting in a change + self.set_attribute(node_id, new); + } + } + None => { + let mut new = Attr::default(); + new.merge(attribute); + // we set unconditionally to ensure the attribute is actually assigned to the node + self.set_attribute(node_id, new); + } + } + } + + pub fn get_attribute(&mut self, node_id: &PlainNodeId) -> Option<&Attr> { + self.node_to_attr.get(node_id) } /// Does any node matches the predicate? - pub fn any(&self, predicate: impl Fn(&Attribute) -> bool) -> bool { - self.node_attribute.values().any(predicate) + pub fn any(&self, predicate: impl Fn(&Attr) -> bool) -> bool { + self.node_to_attr.values().any(predicate) } /// Do all nodes match the predicate? - pub fn all(&self, predicate: impl Fn(&Attribute) -> bool) -> bool { - self.node_attribute.values().all(predicate) + pub fn all(&self, predicate: impl Fn(&Attr) -> bool) -> bool { + self.node_to_attr.values().all(predicate) } // Does any node matches the predicate? pub fn filter( &self, - predicate: impl Fn(&Attribute) -> bool, - ) -> impl Iterator { - self.node_attribute + predicate: impl Fn(&Attr) -> bool, + ) -> impl Iterator { + self.node_to_attr .iter() .filter(move |(_, attribute)| predicate(attribute)) } - /// Check if enough nodes have certain values for the attribute so that that - /// set of nodes is an f-majority for at least one of the scope for which - /// there is a replication requirement. + /// Check if nodes that match the predicate meet the write-quorum rules according to the + /// replication property. For instance, if replication property is set to {node: 3, zone: 2} + /// then this function will return `True` if nodes that match the predicate are spread across 2 + /// zones. + pub fn check_write_quorum(&self, predicate: Predicate) -> bool + where + Predicate: Fn(&Attr) -> bool, + { + // To check for write-quorum, the quorum-check needs to pass on *all* scopes. The check on + // every scope ensures we have enough domains to replicate a record according to the + // configured replication property _at_ the specified scope. + for (_scope, scope_state) in self.scopes.iter() { + if !scope_state.check_write_quorum(&predicate) { + // todo(asoli): change return type to encode failed-scope information so upper layers can + // log useful information about why write-quorum cannot be achieved. + return false; + } + } + true + } + + /// Checks if nodes with attribute that matches `predicate` form a legal f-majority. + /// + /// F-majority is possible when any f-majority is achieved on at least one of the scopes + /// for which there is a replication requirement. /// /// Two ways to form a mental model about this: /// 1) Nodes that match the predicate (storage-state considered) will form an f-majority. /// 2) Do we lose quorum-read availability if we lost all nodes that match the predicate? pub fn check_fmajority(&self, predicate: Predicate) -> FMajorityResult where - Predicate: Fn(&Attribute) -> bool, + Predicate: Fn(&Attr) -> bool, { - let filtered = self - .node_attribute - .iter() - .filter(|(_, v)| predicate(v)) - // `node_attribute` nodes must be in storage_states - .map(|(node_id, _)| self.storage_states.get(node_id).unwrap()); - - let mut authoritative = 0; - let mut non_authoritative = 0; - for state in filtered { - // at the moment, data-loss is the only non-authoritative state - if state.is_data_loss() { - non_authoritative += 1; - } else { - authoritative += 1; + // we have an f-majority if at least one scope has f-majority + let mut f_majority = false; + let mut sufficient_auth_domains = false; + // `for (scope, scope_state) in self.scopes.iter()` if you need the scope for debugging. + for scope_state in self.scopes.values() { + let candidate_domains = u32::try_from(scope_state.failure_domains.len()) + .expect("total domains must fit into u32"); + debug_assert!(scope_state.replication_factor > 0); + let f_majority_needed = + if candidate_domains >= u32::from(scope_state.replication_factor - 1) { + candidate_domains.saturating_sub(u32::from(scope_state.replication_factor)) + 1 + } else { + 0 + }; + + let (auth_domains, non_auth_domains) = scope_state.count_matching_domains(&predicate); + let total = auth_domains + non_auth_domains; + // Intentionally left as future reference for debugging + // let replication_at_scope = scope_state.replication_factor; + // println!( + // "[{scope}] replication_at_scope={replication_at_scope} f_majority_needed={f_majority_needed} num_non_empty={candidate_domains} auth_domains={} non_auth_domains={} fmajority? {}", + // auth_domains, non_auth_domains, + // total > 0 && auth_domains + non_auth_domains >= f_majority_needed + // ); + if total > 0 && auth_domains + non_auth_domains >= f_majority_needed { + f_majority = true; + sufficient_auth_domains = auth_domains >= f_majority_needed; + break; } } - if self.storage_states.len() < usize::from(self.replication_property.num_copies()) { - // short-circuit to avoid overflow on subtraction - return FMajorityResult::None; + if f_majority && sufficient_auth_domains { + FMajorityResult::Success + } else if f_majority { + // not enough nodes were fully-authoritative but overall we have f-majority + FMajorityResult::SuccessWithRisk + } else if self.is_complete_set(&predicate) { + FMajorityResult::BestEffort + } else { + FMajorityResult::None } + } - // todo(asoli): Location-aware quorum check - let fmajority_requires: usize = - self.storage_states.len() - usize::from(self.replication_property.num_copies()) + 1; + /// Should be called only at creation time, to populate the internal state of the checker. + /// Keep this as a private method to avoid misuse. + fn add_node( + &mut self, + node_id: PlainNodeId, + storage_state: StorageState, + location: &NodeLocation, + ) { + self.node_to_storage_state.insert(node_id, storage_state); + // we only consider nodes that might have data, everything else is ignored. + if !storage_state.should_read_from() { + return; + } - if non_authoritative + authoritative < fmajority_requires { - // not enough nodes to form an f-majority - return FMajorityResult::None; + for (scope, scope_state) in self.scopes.iter_mut() { + let domain_name = if *scope == NodeLocationScope::Node { + // We don't use `domain_string(*scope, Some(node_id))` to avoid allocating long + // strings at this scope (the node scope has the most number of domains) + node_id.to_string() + } else if location.is_scope_defined(*scope) { + location.domain_string(*scope, Some(node_id)) + } else { + // node doesn't have location information at this scope level + warn!( + "Node {} doesn't have location information at location scope {:?} although replication is configured at this scope", + node_id, scope + ); + continue; + }; + // add this node to the correct domain + let fd = scope_state + .failure_domains + .entry(domain_name.into()) + .or_insert_with(|| Box::new(FailureDomainState::default())); + fd.increment(); + scope_state.node_to_fd.insert(node_id, HashPtr::new(fd)); } + } - if non_authoritative > 0 { - // either BestEffort or SuccessWithRisk depends on how many authoritative nodes - if authoritative >= fmajority_requires { - return FMajorityResult::SuccessWithRisk; + fn rm_attr_internal(&mut self, node_id: PlainNodeId, attr: &Attr, storage_state: StorageState) { + // Apply this attribute on every scope + for scope_state in self.scopes.values_mut() { + let Some(fd) = scope_state.node_to_fd.get_mut(&node_id) else { + // the node's location isn't defined at this scope + continue; + }; + // this was an authoritative node, if the domain was complete on this attribute, removing + // this attribute from this node will render it incomplete. + if storage_state.is_authoritative() + && fd.as_ref().is_domain_complete(attr) + && scope_state + .num_complete_domains + .get_mut(attr) + .is_some_and(|x| { + // it's not a complete domain any more, remove from the completed + // cache + *x -= 1; + *x == 0 + }) + { + scope_state.num_complete_domains.remove(attr); + } + + let Some(counters) = fd.as_mut_ref().per_attribute_counter.get_mut(attr) else { + continue; + }; + + counters.decrement(storage_state); + + // last node in this domain. + if counters.is_empty() { + let replication_set = scope_state.replication_fds.get_mut(attr); + let is_replication_set_empty = if let Some(replication_set) = replication_set { + replication_set.remove(fd); + replication_set.is_empty() + } else { + false + }; + if is_replication_set_empty { + // this domain won't be considered in the replication set for this attribute + // value. It has no nodes that matches this attribute. + scope_state.replication_fds.remove(attr); + } + fd.as_mut_ref().per_attribute_counter.remove(attr); + } + } + } + + fn add_attr_internal(&mut self, node_id: PlainNodeId, attr: Attr, storage_state: StorageState) { + // Apply this attribute on every scope + for scope_state in self.scopes.values_mut() { + let Some(fd) = scope_state.node_to_fd.get_mut(&node_id) else { + // the node's location isn't defined at this scope + continue; + }; + let counters = fd + .as_mut_ref() + .per_attribute_counter + .entry(attr.clone()) + .or_insert(Count::default()); + counters.increment(storage_state); + let counters_num_authoritative_nodes = counters.num_authoritative_nodes; + let counters_num_readable_nodes = counters.num_readable_nodes; + + // it's the first time we see this attribute on this scope, let's add the + // domain into the replication_fds. + // it's cheaper to do this check vs. always attempting to lookup+insert + if counters_num_readable_nodes == 1 { + let replicate_fds = scope_state + .replication_fds + .entry(attr.clone()) + .or_insert_with(|| HashSet::with_hasher(Xxh3Builder::new())); + replicate_fds.insert(fd.clone()); + } + + if storage_state.is_authoritative() { + debug_assert!(counters_num_authoritative_nodes <= counters_num_readable_nodes); + debug_assert!(counters_num_authoritative_nodes <= fd.as_ref().num_nodes); + if fd.as_ref().is_domain_complete(&attr) { + *scope_state + .num_complete_domains + .entry(attr.clone()) + .or_default() += 1; + } } - return FMajorityResult::BestEffort; } - FMajorityResult::Success } } -impl<'a, Attribute: Debug> Debug for NodeSetChecker<'a, Attribute> { +impl Debug for NodeSetChecker { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.node_attribute.fmt(f) + self.node_to_attr.fmt(f) } } -impl<'a, Attribute: Display> Display for NodeSetChecker<'a, Attribute> { +impl Display for NodeSetChecker { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "[")?; - for (node, attr) in self.node_attribute.iter() { + for (node, attr) in self.node_to_attr.iter() { write!(f, "{node} => {attr}, ")?; } write!(f, "]") } } -/// Possible results of f-majority checks for a subset of the NodeSet. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum FMajorityResult { - /// The subset of nodes neither satisfies the authoritative f-majority - /// property, nor does it contain all authoritative nodes. - /// - /// * Bad. No f-majority is possible. - None, - /// there are enough node with `DataLoss` that prevent us from - /// authoritatively deciding the replication state of records. Because of this, - /// the subset of nodes does not satisfy the authoritative f-majority property - /// However, the subset of nodes already contain all authoritative - /// nodes in the NodeSet. As a best effort, the subset of nodes is - /// considered to be non-authoritative f-majority. - /// * Bad but with chance of success - BestEffort, - /// the subset of nodes satisfy the authoritative f-majority property, and it - /// has _all_ authoritative nodes in the NodeSet. - /// - /// * Good - Success, - /// the subset of nodes satisfies the authoritative f-majority property, and - /// it has suffcient but _not_ all authoritative nodes in the - /// NodeSet. +/// Store the number of nodes for each attribute value in a failure domain +#[derive(Default, Debug)] +struct Count { + /// Total number of readable nodes for this attribute + /// read-write + read-only + data-loss + num_readable_nodes: u32, + /// Number of nodes in an authoritative storage state (didn't lose data and not empty) + /// read-write + read-only + num_authoritative_nodes: u32, +} + +impl Count { + /// No nodes with this attribute value in this domain + fn is_empty(&self) -> bool { + self.num_readable_nodes == 0 + } + + /// Number of nodes that are not authoritative + fn num_non_auth_nodes(&self) -> u32 { + self.num_readable_nodes - self.num_authoritative_nodes + } + + /// Increments the corresponding counters according to this storage state + fn increment(&mut self, storage_state: StorageState) { + if storage_state.should_read_from() { + self.num_readable_nodes += 1; + } + + if storage_state.is_authoritative() { + self.num_authoritative_nodes += 1; + } + } + + /// Increments the corresponding counters according to this storage state + fn decrement(&mut self, storage_state: StorageState) { + if self.num_readable_nodes == 0 { + return; + } + + if storage_state.should_read_from() { + self.num_readable_nodes -= 1; + } + if storage_state.is_authoritative() { + self.num_authoritative_nodes -= 1; + } + } +} + +#[derive(Debug)] +struct FailureDomainState { + /// number of nodes in this this domain + num_nodes: u32, + // for each attribute, count the number of nodes that have it set. + per_attribute_counter: HashMap, +} + +impl FailureDomainState { + fn increment(&mut self) { + self.num_nodes += 1; + } + + /// A domain is considered complete when it contains at least one authoritative node, + /// with all authoritative nodes holding this attribute and the rest of the nodes in + /// the same domain are known to be empty. /// - /// * Good - SuccessWithRisk, + /// An all-empty domain (a domain with no authoritative nodes) is always considered incomplete. + fn is_domain_complete(&self, attr: &Attr) -> bool { + self.per_attribute_counter + .get(attr) + .map(|counters| counters.num_authoritative_nodes > 0) + .unwrap_or(false) + } + + fn non_empty_nodes(&self) -> u32 { + self.num_nodes + } + + /// Is the entire domain empty? + fn is_empty(&self) -> bool { + self.num_nodes == 0 + } } -impl FMajorityResult { - pub fn passed(&self) -> bool { - matches!( - self, - FMajorityResult::Success | FMajorityResult::SuccessWithRisk - ) +impl Default for FailureDomainState { + fn default() -> Self { + Self { + num_nodes: 0, + per_attribute_counter: Default::default(), + } + } +} + +/// A wrapper type that to implement Hash and Eq on a raw pointer allocation, equality +/// is based on the underlying allocation pointer. +#[derive(Debug, Clone)] +struct HashPtr(*mut FailureDomainState); + +// **Safety**: HashPtr is safe to send as it's strictly private to NodeSetChecker and is tied to +// a heap-allocation that's also self-owned. +unsafe impl Send for HashPtr {} + +impl HashPtr { + fn as_ref(&self) -> &FailureDomainState { + // Safety: We are the only one who can create HashingPtr and we guarantee that the + // underlying allocation is valid. + unsafe { &*self.0 } + } + fn as_mut_ref(&mut self) -> &mut FailureDomainState { + // Safety: We are the only one who can create HashingPtr and we guarantee that the + // underlying allocation is valid. + unsafe { &mut *self.0 } + } +} + +impl Hash for HashPtr { + fn hash(&self, hasher: &mut H) { + self.0.hash(hasher) + } +} + +impl Eq for HashPtr {} +impl PartialEq for HashPtr { + fn eq(&self, other: &Self) -> bool { + self.0 == other.0 + } +} + +impl HashPtr { + fn new(value: &mut Box>) -> Self { + // Critical: We want the address of the underlying allocation, not the box itself so the + // double dereferencing is essential. + // + // We could have made new() accept &mut FailureDomainState but it would have made it + // easy to pass non-boxed references which will move. + let value: *mut _ = &mut **value; + Self(value) + } +} + +#[derive(Debug)] +struct LocationScopeState { + /// domains at that scope + failure_domains: HashMap>, Xxh3Builder>, + /// maps between node-id to the domain it belongs to for fast lookups + node_to_fd: HashMap, Xxh3Builder>, + /// replication factor at that scope + replication_factor: u8, + /// For each attribute, the number of domains at that scope that have all their nodes with the + /// attribute. + num_complete_domains: HashMap, + /// For each Attribute, set of the domains at that scope that have at least one node with the + /// attribute. Those are the failure domains considered for + /// replication/write-quorum/write-quorum. + replication_fds: HashMap, Xxh3Builder>>, +} + +impl LocationScopeState { + /// Checks write-quorum at this specific scope + fn check_write_quorum(&self, predicate: &Predicate) -> bool + where + Predicate: Fn(&Attr) -> bool, + { + // For each attribute, merge the set of FailureDomainState that have at least one node + // with the attribute here. We stop merging immediately after the set's size reaches + // the replication target. This makes `check_write_quorum` O(replication * num_domain_matches * num_scopes) in the + // worst case. + let mut matches = + HashSet::with_capacity_and_hasher(self.failure_domains.len(), Xxh3Builder::new()); + for (attribute, domains) in self.replication_fds.iter() { + if predicate(attribute) { + for domain in domains { + matches.insert(domain); + // we have sufficient domains that match the predicate to satisfy replication + if matches.len() >= usize::from(self.replication_factor) { + return true; + } + } + } + } + false + } + + /// Counts the number of domains within this location scope that has + /// authoritative nodes with the attribute matching the input predicate and the domains with + /// non-authoritative nodes with the attribute matching the input. This excludes domains that + /// has no matches or domains that are all empty + /// Returns (authoritative, non_authoritative) + fn count_matching_domains(&self, predicate: &Predicate) -> (u32, u32) + where + Predicate: Fn(&Attr) -> bool, + { + // For all failure domains within this scope; check if it's a complete domain given the + // input predicate + let mut num_completed_auth_domains = 0; + let mut num_completed_non_auth_domains = 0; + for domain in self.failure_domains.values() { + let mut num_auth_nodes = 0; + let mut num_non_auth_nodes = 0; + if domain.is_empty() { + num_completed_non_auth_domains += 1; + continue; + } + + for (attr, counters) in domain.per_attribute_counter.iter() { + if predicate(attr) { + num_auth_nodes += counters.num_authoritative_nodes; + num_non_auth_nodes += counters.num_non_auth_nodes(); + } + } + + if num_auth_nodes > 0 && num_auth_nodes >= domain.non_empty_nodes() { + num_completed_auth_domains += 1; + } else if num_non_auth_nodes > 0 && num_non_auth_nodes >= domain.non_empty_nodes() { + num_completed_non_auth_domains += 1; + } + } + (num_completed_auth_domains, num_completed_non_auth_domains) + } +} + +impl LocationScopeState { + fn new(replication: u8) -> Self { + Self { + replication_factor: replication, + failure_domains: HashMap::default(), + node_to_fd: HashMap::default(), + num_complete_domains: HashMap::default(), + replication_fds: HashMap::default(), + } } } #[cfg(test)] mod tests { + use std::str::FromStr; + use super::*; use googletest::prelude::*; - use restate_types::Version; - - use crate::providers::replicated_loglet::test_util::{ - generate_logserver_node, generate_logserver_nodes_config, + use restate_types::nodes_config::{ + LogServerConfig, NodeConfig, NodesConfiguration, Role, StorageState, }; + use restate_types::{GenerationalNodeId, PlainNodeId, Version}; + + fn generate_logserver_node( + id: impl Into, + storage_state: StorageState, + location: &str, + ) -> NodeConfig { + let id: PlainNodeId = id.into(); + NodeConfig::new( + format!("node-{id}"), + GenerationalNodeId::new(id.into(), 1), + location.parse().unwrap(), + format!("unix:/tmp/my_socket-{id}").parse().unwrap(), + Role::LogServer.into(), + LogServerConfig { storage_state }, + ) + } #[test] - fn test_replication_checker_basics() -> Result<()> { + fn nodeset_checker_basics() -> Result<()> { + const NUM_NODES: u32 = 10; + const LOCATION: &str = ""; + // all_authoritative, all flat structure (no location) + let mut nodes_config = NodesConfiguration::new(Version::MIN, "test-cluster".to_owned()); // all_authoritative - let nodes_config = generate_logserver_nodes_config(10, StorageState::ReadWrite); + for i in 1..=NUM_NODES { + nodes_config.upsert_node(generate_logserver_node( + i, + StorageState::ReadWrite, + LOCATION, + )); + } let nodeset: NodeSet = (1..=5).collect(); + // flat node-level replication let replication = ReplicationProperty::new(3.try_into().unwrap()); let mut checker: NodeSetChecker = NodeSetChecker::new(&nodeset, &nodes_config, &replication); // all nodes in the nodeset are authoritative - assert_that!(checker.len(), eq(5)); + assert_that!(checker.count_nodes(StorageState::is_authoritative), eq(5)); // all nodes are false by default. Can't establish write quorum. assert_that!(checker.check_write_quorum(|attr| *attr), eq(false)); - checker.set_attribute_on_each( - &[ - PlainNodeId::new(1), - PlainNodeId::new(2), - PlainNodeId::new(4), - ], - || true, - ); - // all nodes are false by default. Can't establish write-quorum. + checker.set_attribute_on_each([1, 2, 4], true); assert_that!(checker.check_write_quorum(|attr| *attr), eq(true)); // 2 nodes are false in this node-set, not enough for write-quorum @@ -370,7 +849,7 @@ mod tests { eq(FMajorityResult::Success) ); - // we only have 2 nodes with false, impossible to achieve fmajority. + // we only have 2 nodes with false, fmajority not possible assert_that!( checker.check_fmajority(|attr| !(*attr)), eq(FMajorityResult::None) @@ -379,76 +858,201 @@ mod tests { } #[test] - fn test_replication_checker_mixed() -> Result<()> { + fn nodeset_checker_mixed() -> Result<()> { + const LOCATION: &str = ""; + // still flat let mut nodes_config = NodesConfiguration::new(Version::MIN, "test-cluster".to_owned()); - nodes_config.upsert_node(generate_logserver_node(1, StorageState::Disabled)); - nodes_config.upsert_node(generate_logserver_node(2, StorageState::ReadWrite)); - nodes_config.upsert_node(generate_logserver_node(3, StorageState::ReadOnly)); - nodes_config.upsert_node(generate_logserver_node(4, StorageState::ReadWrite)); - nodes_config.upsert_node(generate_logserver_node(5, StorageState::DataLoss)); - nodes_config.upsert_node(generate_logserver_node(6, StorageState::DataLoss)); + nodes_config.upsert_node(generate_logserver_node(1, StorageState::Disabled, LOCATION)); + nodes_config.upsert_node(generate_logserver_node( + 2, + StorageState::ReadWrite, + LOCATION, + )); + nodes_config.upsert_node(generate_logserver_node(3, StorageState::ReadOnly, LOCATION)); + nodes_config.upsert_node(generate_logserver_node( + 4, + StorageState::ReadWrite, + LOCATION, + )); + nodes_config.upsert_node(generate_logserver_node(5, StorageState::DataLoss, LOCATION)); + nodes_config.upsert_node(generate_logserver_node(6, StorageState::DataLoss, LOCATION)); // effective will be [2-6] because 1 is disabled (authoritatively drained) let nodeset: NodeSet = (1..=6).collect(); + // flat node-level replication let replication = ReplicationProperty::new(3.try_into().unwrap()); let mut checker: NodeSetChecker = NodeSetChecker::new(&nodeset, &nodes_config, &replication); - // 1 is removed - assert_that!(checker.len(), eq(5)); - - checker.set_attribute_on_each( - &[ - // validates that we actually ignore this - PlainNodeId::new(1), - PlainNodeId::new(2), - PlainNodeId::new(3), - PlainNodeId::new(4), - PlainNodeId::new(5), - ], - || true, - ); - // we cannot write on nodes 3, 5. This should fail the write quorum check because we only have - // 2 nodes that pass the predicate *and* are writeable (2, 4) and we need 3 for replication. - assert_that!(checker.check_write_quorum(|attr| *attr), eq(false)); + // len only returns nodes with attribute "set" on them + assert_that!(checker.len(), eq(0)); + assert_that!(checker.count_authoritative_nodes(), eq(3)); + + checker.set_attribute_on_each([1, 2, 3, 4], true); + // 1 is ignored as it's disabled. + assert_that!(checker.len(), eq(3)); + // Despite that node 3 is ReadOnly, check_write_quorum doesn't care about this, it doesn't + // answer the question of whether a record "can" be replicated or not, but if a record has + // been already "replicated" on those nodes, do those node form a legal write-quorum or + // not. + // + // The caller should only set the attribute on nodes with `StorageState::can_write_to() == + // true` if they want to restrict this check. + assert_that!(checker.check_write_quorum(|attr| *attr), eq(true)); // do we have f-majority? // [nodeset] 2 3 4 5 6 - // [predicate] x x x x x + // [predicate] x x x - - // [storage-state] RW RO RW DL DL // // We need 3 nodes of authoritative nodes for successful f-majority. Yes, we have them (2, 3, 4). - // But some nodes are non-authoritative, so we should observe - // FMajorityResult::SuccessWithRisk + // FMajorityResult::Success assert_that!( checker.check_fmajority(|attr| *attr), - eq(FMajorityResult::SuccessWithRisk) + eq(FMajorityResult::Success) ); + checker.set_attribute(3, false); // Can we lose Node 3? No. - checker.set_attribute(PlainNodeId::new(3), false); + // [nodeset] 2 3 4 5 6 + // [predicate] x - x - - + // [storage-state] RW RO RW DL DL + assert_that!( + checker.check_fmajority(|attr| *attr), + eq(FMajorityResult::None) + ); + assert!(!checker.check_fmajority(|attr| *attr).passed()); + // What if we have 6? + // [nodeset] 2 3 4 5 6 + // [predicate] x - x - x + // [storage-state] RW RO RW DL DL + checker.set_attribute(6, true); + assert_that!( + checker.check_fmajority(|attr| *attr), + eq(FMajorityResult::SuccessWithRisk) + ); + + Ok(()) + } + + #[test] + fn nodeset_checker_non_authoritative() -> Result<()> { + const LOCATION: &str = ""; + // still flat + let mut nodes_config = NodesConfiguration::new(Version::MIN, "test-cluster".to_owned()); + nodes_config.upsert_node(generate_logserver_node(1, StorageState::Disabled, LOCATION)); + nodes_config.upsert_node(generate_logserver_node( + 2, + StorageState::ReadWrite, + LOCATION, + )); + nodes_config.upsert_node(generate_logserver_node(3, StorageState::ReadOnly, LOCATION)); + nodes_config.upsert_node(generate_logserver_node(4, StorageState::DataLoss, LOCATION)); + nodes_config.upsert_node(generate_logserver_node(5, StorageState::DataLoss, LOCATION)); + nodes_config.upsert_node(generate_logserver_node(6, StorageState::DataLoss, LOCATION)); + // effective will be 5 nodes [2-6] because 1 is disabled (authoritatively drained) + let nodeset: NodeSet = (1..=6).collect(); + // flat node-level replication + let replication = ReplicationProperty::new(3.try_into().unwrap()); + let mut checker: NodeSetChecker = + NodeSetChecker::new(&nodeset, &nodes_config, &replication); + + checker.set_attribute_on_each([2, 3], true); + // do we have f-majority? + // [nodeset] 2 3 4 5 6 + // [predicate] x x - - - + // [storage-state] RW RO DL DL DL + // + // We need 3 nodes of authoritative nodes for successful f-majority. No, but that's the + // widest majority for authoritative nodes. assert_that!( checker.check_fmajority(|attr| *attr), eq(FMajorityResult::BestEffort) ); + + checker.set_attribute(5, true); + + // do we have f-majority? + // [nodeset] 2 3 4 5 6 + // [predicate] x x - x - + // [storage-state] RW RO DL DL DL + // + // We need 3 nodes of authoritative nodes for a risk-free f-majority. + // But here, we only have 2,3 authoritative and 5 has the attribute but not authoritative + let f_majority = checker.check_fmajority(|attr| *attr); + assert_that!(f_majority, eq(FMajorityResult::SuccessWithRisk)); + assert_that!(f_majority.is_authoritative_complete(), eq(false)); + assert_that!(f_majority.passed(), eq(true)); + + // Can we lose Node 3? No. + checker.set_attribute(3, false); + + assert_that!( + checker.check_fmajority(|attr| *attr), + eq(FMajorityResult::None) + ); assert!(!checker.check_fmajority(|attr| *attr).passed()); + checker.fill_with_default(); + checker.set_attribute_on_each([5, 6], true); + // do we have f-majority? + // [nodeset] 2 3 4 5 6 + // [predicate] - - - x 6 + // [storage-state] RW RO DL DL DL + assert_that!( + checker.check_fmajority(|attr| *attr), + eq(FMajorityResult::None) + ); + checker.set_attribute(4, true); + + // do we have f-majority? + // [nodeset] 2 3 4 5 6 + // [predicate] - - x x x + // [storage-state] RW RO DL DL DL + assert_that!( + checker.check_fmajority(|attr| *attr), + eq(FMajorityResult::SuccessWithRisk) + ); + + checker.set_attribute(2, true); + checker.set_attribute(3, true); + // do we have f-majority? + // [nodeset] 2 3 4 5 6 + // [predicate] x x x x x + // [storage-state] RW RO DL DL DL + // + // Why is still with risk? because we don't have enough authoritative nodes that match the + // predicate. We need 3. It's also not BestEffort because the total number of matches meets + // the majority requirements. + assert_that!( + checker.check_fmajority(|attr| *attr), + eq(FMajorityResult::SuccessWithRisk) + ); + Ok(()) } #[test] - fn test_replication_single_copy_single_node() -> Result<()> { - let nodes_config = generate_logserver_nodes_config(1, StorageState::ReadWrite); + fn nodeset_checker_single_copy_single_node() -> Result<()> { + const NUM_NODES: u32 = 1; + const LOCATION: &str = ""; + // all_authoritative, all flat structure (no location) + let mut nodes_config = NodesConfiguration::new(Version::MIN, "test-cluster".to_owned()); + // all_authoritative + for i in 1..=NUM_NODES { + nodes_config.upsert_node(generate_logserver_node( + i, + StorageState::ReadWrite, + LOCATION, + )); + } let replication = ReplicationProperty::new(1.try_into().unwrap()); - let mut checker: NodeSetChecker = NodeSetChecker::new( - &NodeSet::from_single(PlainNodeId::new(1)), - &nodes_config, - &replication, - ); - assert_that!(checker.len(), eq(1)); - checker.set_attribute(PlainNodeId::new(1), true); + let mut checker: NodeSetChecker = + NodeSetChecker::new(&NodeSet::from_single(1), &nodes_config, &replication); + assert_that!(checker.count_authoritative_nodes(), eq(1)); + checker.set_attribute(1, true); assert_that!(checker.check_write_quorum(|attr| *attr), eq(true)); assert_that!( @@ -460,8 +1064,19 @@ mod tests { } #[test] - fn test_dont_panic_on_replication_factor_exceeding_nodeset_size() { - let nodes_config = generate_logserver_nodes_config(3, StorageState::ReadWrite); + fn nodeset_checker_dont_panic_on_replication_factor_exceeding_nodeset_size() { + const NUM_NODES: u32 = 3; + const LOCATION: &str = ""; + // all_authoritative, all flat structure (no location) + let mut nodes_config = NodesConfiguration::new(Version::MIN, "test-cluster".to_owned()); + // all_authoritative + for i in 1..=NUM_NODES { + nodes_config.upsert_node(generate_logserver_node( + i, + StorageState::ReadWrite, + LOCATION, + )); + } let nodeset: NodeSet = (1..=3).collect(); let replication = ReplicationProperty::new(5.try_into().unwrap()); @@ -477,4 +1092,507 @@ mod tests { eq(FMajorityResult::None) ); } + + // Failure-domain-aware tests + #[test] + fn nodeset_checker_fd_basics() -> Result<()> { + // 3 nodes in each zone, and 3 zones in total in two regions + // additionally, we have 1 node without any region assignment + // total = 10 nodes + // region1 + // .az1 [N1, N2, N3] + // .az2 [N4, N5, N6] + // region2 + // .az1 [N7, N8, N9] + // - [N10] N11 - in Provisioning + let mut nodes_config = NodesConfiguration::new(Version::MIN, "test-cluster".to_owned()); + // all authoritative + // region1.az1 + for i in 1..=3 { + nodes_config.upsert_node(generate_logserver_node( + i, + StorageState::ReadWrite, + "region1.az1", + )); + } + // region1.az2 + for i in 4..=6 { + nodes_config.upsert_node(generate_logserver_node( + i, + StorageState::ReadWrite, + "region1.az2", + )); + } + // region2.az1 + for i in 7..=9 { + nodes_config.upsert_node(generate_logserver_node( + i, + StorageState::ReadWrite, + "region2.az1", + )); + } + // N10 + nodes_config.upsert_node(generate_logserver_node(10, StorageState::ReadWrite, "")); + // N11 + nodes_config.upsert_node(generate_logserver_node(11, StorageState::Provisioning, "")); + + // # Scenario 1 + // - Nodeset with all nodes = 10 nodes + // - Replication is {node: 3} + let nodeset: NodeSet = (1..=11).collect(); + let replication = ReplicationProperty::new(3.try_into().unwrap()); + let mut checker: NodeSetChecker = + NodeSetChecker::new(&nodeset, &nodes_config, &replication); + // all nodes in the nodeset are authoritative + assert_that!(checker.count_nodes(StorageState::is_authoritative), eq(10)); + // all nodes are false by default. Can't establish write quorum. + assert_that!(checker.check_write_quorum(|attr| *attr), eq(false)); + checker.set_attribute_on_each([1, 2, 10], true); + checker.set_attribute(10, false); + // trying 3 instead + checker.set_attribute(3, true); + assert_that!(checker.check_write_quorum(|attr| *attr), eq(true)); + + // We need 8 nodes (anywhere) for f-majority, current [1,2,3] are marked. + assert_that!( + checker.check_fmajority(|attr| *attr), + eq(FMajorityResult::None) + ); + + // total 7 nodes marked + checker.set_attribute_on_each([4, 5, 7, 8], true); + assert_that!( + checker.check_fmajority(|attr| *attr), + eq(FMajorityResult::None) + ); + checker.set_attribute(9, true); + // now we have the required 8 nodes + assert_that!( + checker.check_fmajority(|attr| *attr), + eq(FMajorityResult::Success) + ); + + // # Scenario 2 + // - Nodeset with all nodes = 10 nodes + // - Replication is {zone: 2} + let nodeset: NodeSet = (1..=11).collect(); + let replication = ReplicationProperty::from_str("{zone: 2}").unwrap(); + let mut checker: NodeSetChecker = + NodeSetChecker::new(&nodeset, &nodes_config, &replication); + + assert_that!(checker.check_write_quorum(|attr| *attr), eq(false)); + // two nodes but on the same zone, can't write. + checker.set_attribute_on_each([1, 2], true); + assert_that!(checker.check_write_quorum(|attr| *attr), eq(false)); + checker.fill_with_default(); + // N10 is not on any zone, so we can only consider its domain on the node scope but not + // on the zone. + checker.set_attribute_on_each([1, 10], true); + assert_that!(checker.check_write_quorum(|attr| *attr), eq(false)); + checker.fill_with_default(); + // finally, two nodes across two zones = SUCCESS + checker.set_attribute_on_each([1, 4], true); + assert_that!(checker.check_write_quorum(|attr| *attr), eq(true)); + + // F-majority checks. + checker.fill_with_default(); + // region1.az1 is marked. Not enough for f-majority + checker.set_attribute_on_each([1, 2, 3], true); + assert_that!( + checker.check_fmajority(|attr| *attr), + eq(FMajorityResult::None) + ); + + // most of region1.az2 is marked. Not enough for f-majority + checker.set_attribute_on_each([4, 5], true); + assert_that!( + checker.check_fmajority(|attr| *attr), + eq(FMajorityResult::None) + ); + + // now all of region1.az2 is marked. this is Enough for f-majority + checker.set_attribute_on_each([6], true); + assert_that!( + checker.check_fmajority(|attr| *attr), + eq(FMajorityResult::Success) + ); + + // Another pattern, marking 2 nodes on every zone and the entirety of region1.az2 + checker.fill_with_default(); + checker.set_attribute_on_each([1, 2, 4, 5, 6, 7, 8], true); + // region1 + // .az1 [N1(x), N2(x), N3] + // .az2 [N4(x), N5(x), N6(x)] + // region2 + // .az1 [N7(x), N8(x), N9] + // - [N10] + assert_that!( + checker.check_fmajority(|attr| *attr), + eq(FMajorityResult::None) + ); + // Would N3 gets us to f-majority? (region1.az1, region1.az2 are all marked) + checker.set_attribute_on_each([3], true); + // region1 + // .az1 [N1(x), N2(x), N3(x)] + // .az2 [N4(x), N5(x), N6(x)] + // region2 + // .az1 [N7(x), N8(x), N9] + // - [N10] + // Yes because the remaining N9 and N10 cannot form write-quorum. (N10 is not in a zone) + assert_that!( + checker.check_fmajority(|attr| *attr), + eq(FMajorityResult::Success) + ); + // now that N10 is marked + checker.set_attribute_on_each([10], true); + // region1 + // .az1 [N1(x), N2(x), N3(x)] + // .az2 [N4(x), N5(x), N6(x)] + // region2 + // .az1 [N7(x), N8(x), N9] + // - [N10(x)] + // We have f-majority + assert_that!( + checker.check_fmajority(|attr| *attr), + eq(FMajorityResult::Success) + ); + + Ok(()) + } + + #[test] + fn nodeset_checker_fd_non_authoritative_nodeset() -> Result<()> { + // Did the nodeset become non-authoritative? (many nodes in DataLoss state) + // 3 nodes in each zone, and 3 zones in total in two regions + // additionally, we have 1 node without any region assignment + // The issue here is that we have region1.az1 + N5 in DL which is enough to make the entire nodeset + // non-authoritative; meaning that we cannot achieve better than + // `FMajority::SuccessWithRisk` and when marking only authoritative_nodes, we can't achieve + // f-majority (BestEffort case) + // In other words, all authoritative_nodes < f_majority + // total = 10 nodes - DL == DataLoss + // replication = {zone: 2} + // region1 + // .az1 [N1(DL), N2(DL), N3(DL)] + // .az2 [N4, N5(DL), N6] + // region2 + // .az1 [N7, N8, N9] + // - [N10] N11 - in Provisioning + let mut nodes_config = NodesConfiguration::new(Version::MIN, "test-cluster".to_owned()); + // region1.az1 + nodes_config.upsert_node(generate_logserver_node( + 1, + StorageState::DataLoss, + "region1.az1", + )); + nodes_config.upsert_node(generate_logserver_node( + 2, + StorageState::DataLoss, + "region1.az1", + )); + nodes_config.upsert_node(generate_logserver_node( + 3, + StorageState::DataLoss, + "region1.az1", + )); + // region1.az2 + nodes_config.upsert_node(generate_logserver_node( + 4, + StorageState::ReadWrite, + "region1.az2", + )); + nodes_config.upsert_node(generate_logserver_node( + 5, + StorageState::DataLoss, + "region1.az2", + )); + nodes_config.upsert_node(generate_logserver_node( + 6, + StorageState::ReadWrite, + "region1.az2", + )); + // region2.az1 + for i in 7..=9 { + nodes_config.upsert_node(generate_logserver_node( + i, + StorageState::ReadWrite, + "region2.az1", + )); + } + // N10 + nodes_config.upsert_node(generate_logserver_node(10, StorageState::ReadWrite, "")); + // N11 + nodes_config.upsert_node(generate_logserver_node(11, StorageState::Provisioning, "")); + + let nodeset: NodeSet = (1..=11).collect(); + let replication = ReplicationProperty::from_str("{zone: 2}").unwrap(); + let mut checker: NodeSetChecker = + NodeSetChecker::new(&nodeset, &nodes_config, &replication); + + // marked every possible node + checker.set_attribute_on_each([4, 6, 7, 8, 9, 10], true); + assert_that!( + checker.check_fmajority(|attr| *attr), + eq(FMajorityResult::BestEffort) + ); + + Ok(()) + } + + #[test] + fn nodeset_checker_fd_three_levels() -> Result<()> { + // total = 11 nodes across 3 regions. + // replication = {region: 2, zone: 3} + // region1 + // .az1 [N1, N2, N3, N4] + // .az2 [N5, N6, N7] + // region2 + // .az1 [N8, N9, N10] + // region3 + // .az1 [N11] + let mut nodes_config = NodesConfiguration::new(Version::MIN, "test-cluster".to_owned()); + // region1.az1 + for i in 1..=4 { + nodes_config.upsert_node(generate_logserver_node( + i, + StorageState::ReadWrite, + "region1.az1", + )); + } + // region1.az2 + for i in 5..=7 { + nodes_config.upsert_node(generate_logserver_node( + i, + StorageState::ReadWrite, + "region1.az2", + )); + } + // region2.az1 + for i in 8..=10 { + nodes_config.upsert_node(generate_logserver_node( + i, + StorageState::ReadWrite, + "region2.az1", + )); + } + // region3.az1 + nodes_config.upsert_node(generate_logserver_node( + 11, + StorageState::ReadWrite, + "region3.az1", + )); + + let nodeset: NodeSet = (1..=11).collect(); + let replication = ReplicationProperty::from_str("{region: 2, zone: 3}").unwrap(); + assert_that!( + replication.copies_at_scope(NodeLocationScope::Node), + some(eq(3)) + ); + assert_that!( + replication.copies_at_scope(NodeLocationScope::Zone), + some(eq(3)) + ); + assert_that!( + replication.copies_at_scope(NodeLocationScope::Region), + some(eq(2)) + ); + let mut checker: NodeSetChecker = + NodeSetChecker::new(&nodeset, &nodes_config, &replication); + + // # Write-quorum checks + // Three nodes on two regions but on across 2 zones only + // region1 + // .az1 [N1, N2, N3, N4] + // .az2 [N5, N6, N7] + // region2 + // .az1 [N8, N9, N10] + // region3 + // .az1 [N11] + // No, can't do. + checker.set_attribute_on_each([1, 2, 8], true); + assert_that!(checker.check_write_quorum(|attr| *attr), eq(false)); + // replacing N2 with N5 should fix it + checker.set_attribute(2, false); + checker.set_attribute(5, true); + assert_that!(checker.check_write_quorum(|attr| *attr), eq(true)); + + // # F-majority checks + checker.fill_with_default(); + // losing 1 region is okay (marking everything else) + checker.set_attribute_on_each([8, 9, 10, 11], true); + assert_that!( + checker.check_fmajority(|attr| *attr), + eq(FMajorityResult::Success) + ); + + // we can't afford to lose _any_ other node. + checker.set_attribute_on_each([10], false); + assert_that!( + checker.check_fmajority(|attr| *attr), + eq(FMajorityResult::None) + ); + + Ok(()) + } + + #[test] + fn nodeset_checker_fd_empty_domains() -> Result<()> { + // total = 11 nodes across 3 regions. + // replication = {region: 2, zone: 3} + // region1 + // .az1 [N1, N2(D), N3(D), N4(D)] + // .az2 [N5(D), N6(D), N7(D)] - nodes are Disabled + // region2 + // .az1 [N8, N9, N10] + // region3 + // .az1 [N11] + // region4 -- region is Disabled + // .az1 [N12(D)] + let mut nodes_config = NodesConfiguration::new(Version::MIN, "test-cluster".to_owned()); + // region1.az1 + nodes_config.upsert_node(generate_logserver_node( + 1, + StorageState::ReadWrite, + "region1.az1", + )); + nodes_config.upsert_node(generate_logserver_node( + 2, + StorageState::Provisioning, + "region1.az1", + )); + nodes_config.upsert_node(generate_logserver_node( + 3, + StorageState::Disabled, + "region1.az1", + )); + nodes_config.upsert_node(generate_logserver_node( + 4, + StorageState::Disabled, + "region1.az1", + )); + // region1.az2 + for i in 5..=7 { + nodes_config.upsert_node(generate_logserver_node( + i, + StorageState::Disabled, + "region1.az2", + )); + } + // region2.az1 + for i in 8..=10 { + nodes_config.upsert_node(generate_logserver_node( + i, + StorageState::ReadWrite, + "region2.az1", + )); + } + // region3.az1 + nodes_config.upsert_node(generate_logserver_node( + 11, + StorageState::ReadWrite, + "region3.az1", + )); + // region4.az1 + nodes_config.upsert_node(generate_logserver_node( + 12, + StorageState::Disabled, + "region4.az1", + )); + + let nodeset: NodeSet = (1..=12).collect(); + let replication = ReplicationProperty::from_str("{region: 2, zone: 3}").unwrap(); + let mut checker: NodeSetChecker = + NodeSetChecker::new(&nodeset, &nodes_config, &replication); + + // # Write-quorum checks + // Can we write to N2, N5, N11? No. + // region1 + // .az1 [N1, N2(D), N3(D), N4(D)] + // .az2 [N5(D), N6(D), N7(D)] - nodes are Disabled + // region2 + // .az1 [N8, N9, N10] + // region3 + // .az1 [N11] + // region4 + // .az1 [N12(D)] + checker.set_attribute_on_each([2, 5, 11], true); + assert_that!(checker.check_write_quorum(|attr| *attr), eq(false)); + // if we add N1? we wouldn't have enough zones still + checker.set_attribute_on_each([1], true); + assert_that!(checker.check_write_quorum(|attr| *attr), eq(false)); + // but adding any node from region2 should fix it + checker.set_attribute_on_each([10], true); + // region1 + // .az1 [N1(x), N2(D), N3(D), N4(D)] + // .az2 [N5(D), N6(D), N7(D)] - nodes are Disabled + // region2 + // .az1 [N8, N9, N10(x)] + // region3 + // .az1 [N11(x)] + // region4 + // .az1 [N12(D)] + assert_that!(checker.check_write_quorum(|attr| *attr), eq(true)); + + let mut checker: NodeSetChecker = + NodeSetChecker::new(&nodeset, &nodes_config, &replication); + assert_that!( + checker.check_fmajority(|attr| *attr), + eq(FMajorityResult::None) + ); + + // # F-majority checks + // non_empty_regions = 3, non_empty_zones = 3 + // region_f_majority = 3-2+1=2 zone_f_majority=3-3+1=1 + // In other words, have f-majority if we have one full zone, or any 2 non-empty regions. + // + // region1 + // .az1 [N1, N2(D), N3(D), N4(D)] + // .az2 [N5(D), N6(D), N7(D)] - nodes are Disabled + // region2 + // .az1 [N8, N9, N10] + // region3 + // .az1 [N11] + // region4 + // .az1 [N12(D)] + + // N10 on region2.az1 is not sufficient since other nodes on the same zone are not marked + checker.set_attribute_on_each([10], true); + assert_that!( + checker.check_fmajority(|attr| *attr), + eq(FMajorityResult::None) + ); + // now let's add the rest of the nodes in this zone + checker.set_attribute_on_each([8, 9], true); + // Yay! + assert_that!( + checker.check_fmajority(|attr| *attr), + eq(FMajorityResult::Success) + ); + + // Another more stringent example is region1.az1. Only N1 should be sufficient for + // f-majority in this setup. + checker.fill_with_default(); + checker.set_attribute_on_each([1], true); + // Flexible quorums FTW. + assert_that!( + checker.check_fmajority(|attr| *attr), + eq(FMajorityResult::Success) + ); + // same for N11 + checker.fill_with_default(); + checker.set_attribute_on_each([11], true); + assert_that!( + checker.check_fmajority(|attr| *attr), + eq(FMajorityResult::Success) + ); + + // but not for N9 + checker.fill_with_default(); + checker.set_attribute_on_each([9], true); + assert_that!( + checker.check_fmajority(|attr| *attr), + eq(FMajorityResult::None) + ); + Ok(()) + } } diff --git a/crates/bifrost/src/providers/replicated_loglet/replication/mod.rs b/crates/bifrost/src/providers/replicated_loglet/replication/mod.rs index 1a547793a..aba4efa3b 100644 --- a/crates/bifrost/src/providers/replicated_loglet/replication/mod.rs +++ b/crates/bifrost/src/providers/replicated_loglet/replication/mod.rs @@ -7,6 +7,7 @@ // As of the Change Date specified in that file, in accordance with // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. + mod checker; pub mod spread_selector; diff --git a/crates/bifrost/src/providers/replicated_loglet/replication/spread_selector.rs b/crates/bifrost/src/providers/replicated_loglet/replication/spread_selector.rs index 86d772990..232ad757b 100644 --- a/crates/bifrost/src/providers/replicated_loglet/replication/spread_selector.rs +++ b/crates/bifrost/src/providers/replicated_loglet/replication/spread_selector.rs @@ -104,7 +104,7 @@ impl SpreadSelector { // validate that we can have write quorum with this spread let mut checker = NodeSetChecker::new(&self.nodeset, nodes_config, &self.replication_property); - checker.set_attribute_on_each(&selected, || true); + checker.set_attribute_on_each(selected.iter().copied(), true); if !checker.check_write_quorum(|attr| *attr) { return Err(SpreadSelectorError::InsufficientWriteableNodes); } @@ -153,7 +153,7 @@ impl SpreadSelector { // validate that we can have write quorum with this spread let mut checker = NodeSetChecker::new(&self.nodeset, nodes_config, &self.replication_property); - checker.set_attribute_on_each(&selected, || true); + checker.set_attribute_on_each(selected.iter().copied(), true); if !checker.check_write_quorum(|attr| *attr) { return Err(SpreadSelectorError::InsufficientWriteableNodes); } diff --git a/crates/bifrost/src/providers/replicated_loglet/sequencer/appender.rs b/crates/bifrost/src/providers/replicated_loglet/sequencer/appender.rs index 193b52059..411f39706 100644 --- a/crates/bifrost/src/providers/replicated_loglet/sequencer/appender.rs +++ b/crates/bifrost/src/providers/replicated_loglet/sequencer/appender.rs @@ -415,7 +415,7 @@ impl SequencerAppender { } } -#[derive(Default, Debug)] +#[derive(Default, Debug, Eq, PartialEq, Hash, Clone)] struct NodeAttributes { committed: bool, sealed: bool, diff --git a/crates/bifrost/src/providers/replicated_loglet/tasks/check_seal.rs b/crates/bifrost/src/providers/replicated_loglet/tasks/check_seal.rs index 49398117c..a1fc3bae8 100644 --- a/crates/bifrost/src/providers/replicated_loglet/tasks/check_seal.rs +++ b/crates/bifrost/src/providers/replicated_loglet/tasks/check_seal.rs @@ -66,7 +66,7 @@ impl CheckSealTask { &networking.metadata().nodes_config_ref(), ); - let mut nodeset_checker = NodeSetChecker::<'_, NodeTailStatus>::new( + let mut nodeset_checker = NodeSetChecker::::new( &effective_nodeset, &networking.metadata().nodes_config_ref(), &my_params.replication, @@ -80,6 +80,7 @@ impl CheckSealTask { effective_nodeset = %effective_nodeset, "Checking seal status for loglet", ); + loop { if nodeset_checker .check_fmajority(NodeTailStatus::is_known_unsealed) diff --git a/crates/bifrost/src/providers/replicated_loglet/tasks/digests.rs b/crates/bifrost/src/providers/replicated_loglet/tasks/digests.rs index 794febb0f..aafdaae76 100644 --- a/crates/bifrost/src/providers/replicated_loglet/tasks/digests.rs +++ b/crates/bifrost/src/providers/replicated_loglet/tasks/digests.rs @@ -207,7 +207,7 @@ impl Digests { self.spread_selector.replication_property(), ); // record is already replicated on those nodes - replication_checker.set_attribute_on_each(known_copies, || true); + replication_checker.set_attribute_on_each(known_copies.iter().copied(), true); let payloads = vec![record].into(); @@ -306,7 +306,7 @@ impl Digests { nodes_config, self.spread_selector.replication_property(), ); - checker.set_attribute_on_each(&self.known_nodes, || true); + checker.set_attribute_on_each(self.known_nodes.iter().copied(), true); checker.check_write_quorum(|known| *known) } @@ -324,8 +324,7 @@ impl Digests { ); // walk backwards while let Some((offset, nodes)) = range.next_back() { - checker.reset_with_default(); - checker.set_attribute_on_each(nodes, || true); + checker.set_attribute_on_each(nodes.iter().copied(), true); if checker.check_write_quorum(|known| *known) { // this offset is good, advance to the next one self.update_start_offset(offset.next()); diff --git a/crates/bifrost/src/providers/replicated_loglet/tasks/find_tail.rs b/crates/bifrost/src/providers/replicated_loglet/tasks/find_tail.rs index ced39e8a7..76a177f19 100644 --- a/crates/bifrost/src/providers/replicated_loglet/tasks/find_tail.rs +++ b/crates/bifrost/src/providers/replicated_loglet/tasks/find_tail.rs @@ -200,7 +200,7 @@ impl FindTailTask { // We'll only refresh our view of the effective nodeset if we retry the find-tail // procedure. - let mut nodeset_checker = NodeSetChecker::<'_, NodeTailStatus>::new( + let mut nodeset_checker = NodeSetChecker::::new( &effective_nodeset, &self.networking.metadata().nodes_config_ref(), &self.my_params.replication, diff --git a/crates/bifrost/src/providers/replicated_loglet/tasks/get_trim_point.rs b/crates/bifrost/src/providers/replicated_loglet/tasks/get_trim_point.rs index 4e8d525a7..65912daac 100644 --- a/crates/bifrost/src/providers/replicated_loglet/tasks/get_trim_point.rs +++ b/crates/bifrost/src/providers/replicated_loglet/tasks/get_trim_point.rs @@ -117,7 +117,7 @@ impl<'a> GetTrimPointTask<'a> { }); } - let mut nodeset_checker = NodeSetChecker::<'_, Option>::new( + let mut nodeset_checker = NodeSetChecker::>::new( &effective_nodeset, &networking.metadata().nodes_config_ref(), &self.my_params.replication, diff --git a/crates/bifrost/src/providers/replicated_loglet/tasks/mod.rs b/crates/bifrost/src/providers/replicated_loglet/tasks/mod.rs index 7abbbf971..bd2c8be1c 100644 --- a/crates/bifrost/src/providers/replicated_loglet/tasks/mod.rs +++ b/crates/bifrost/src/providers/replicated_loglet/tasks/mod.rs @@ -29,7 +29,7 @@ pub use trim::*; use restate_types::logs::LogletOffset; use restate_types::Merge; -#[derive(Debug, Default, PartialEq, Eq, derive_more::Display)] +#[derive(Debug, Clone, Hash, Default, PartialEq, Eq, derive_more::Display)] enum NodeTailStatus { #[default] #[display("Unknown")] diff --git a/crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs b/crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs index f79a28ee6..b1e3b13f3 100644 --- a/crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs +++ b/crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs @@ -63,7 +63,7 @@ impl SealTask { let (tx, mut rx) = mpsc::unbounded_channel(); - let mut nodeset_checker = NodeSetChecker::<'_, bool>::new( + let mut nodeset_checker = NodeSetChecker::::new( &effective_nodeset, &networking.metadata().nodes_config_ref(), &self.my_params.replication, diff --git a/crates/bifrost/src/providers/replicated_loglet/tasks/trim.rs b/crates/bifrost/src/providers/replicated_loglet/tasks/trim.rs index 979ac3381..b0ccca8e0 100644 --- a/crates/bifrost/src/providers/replicated_loglet/tasks/trim.rs +++ b/crates/bifrost/src/providers/replicated_loglet/tasks/trim.rs @@ -90,7 +90,7 @@ impl<'a> TrimTask<'a> { return Ok(()); } - let mut nodeset_checker = NodeSetChecker::<'_, bool>::new( + let mut nodeset_checker = NodeSetChecker::::new( &effective_nodeset, &networking.metadata().nodes_config_ref(), &self.my_params.replication, diff --git a/crates/types/src/nodes_config.rs b/crates/types/src/nodes_config.rs index d2c4059cc..cfc28e313 100644 --- a/crates/types/src/nodes_config.rs +++ b/crates/types/src/nodes_config.rs @@ -229,6 +229,15 @@ impl NodesConfiguration { }) } + /// Iterate over nodes with a given role + pub fn iter_role(&self, role: Role) -> impl Iterator { + self.nodes.iter().filter_map(move |(k, v)| match v { + MaybeNode::Node(node) if node.has_role(role) => Some((*k, node)), + _ => None, + }) + } + + /// Iterate over all non-tombstone nodes pub fn iter(&self) -> impl Iterator { self.nodes.iter().filter_map(|(k, v)| { if let MaybeNode::Node(node) = v { @@ -295,7 +304,8 @@ pub enum StorageState { /// can write to: yes ReadWrite, /// Node detected that some/all of its local storage has been deleted and it cannot be used - /// as authoritative source for quorum-dependent queries. + /// as authoritative source for quorum-dependent queries. Some data might have permanently been + /// lost. /// /// should read from: yes (non-quorum reads) /// can write to: no @@ -319,6 +329,14 @@ impl StorageState { } } + pub fn is_authoritative(&self) -> bool { + use StorageState::*; + match self { + Provisioning | Disabled | DataLoss => false, + ReadOnly | ReadWrite => true, + } + } + /// Empty nodes are automatically excluded from node sets. pub fn empty(&self) -> bool { matches!(self, StorageState::Provisioning | StorageState::Disabled) diff --git a/crates/types/src/replicated_loglet/params.rs b/crates/types/src/replicated_loglet/params.rs index f987dbe5e..706a93cdb 100644 --- a/crates/types/src/replicated_loglet/params.rs +++ b/crates/types/src/replicated_loglet/params.rs @@ -62,9 +62,9 @@ impl NodeSet { Self(HashSet::new()) } - pub fn from_single(node: PlainNodeId) -> Self { + pub fn from_single(node: impl Into) -> Self { let mut set = HashSet::new(); - set.insert(node); + set.insert(node.into()); Self(set) } diff --git a/crates/types/src/replicated_loglet/replication_property.rs b/crates/types/src/replicated_loglet/replication_property.rs index aa75ad01f..8bc2cd862 100644 --- a/crates/types/src/replicated_loglet/replication_property.rs +++ b/crates/types/src/replicated_loglet/replication_property.rs @@ -18,6 +18,8 @@ use anyhow::Context; use enum_map::Enum; use regex::Regex; +use crate::locality::NodeLocationScope; + static REPLICATION_PROPERTY_PATTERN: LazyLock = LazyLock::new(|| { Regex::new( r"^(?i)\{\s*(?(?:node|zone|region)\s*:\s*\d+(?:\s*,\s*(?:node|zone|region)\s*:\s*\d+)*)\s*}$", @@ -69,6 +71,27 @@ impl LocationScope { } } +impl From for LocationScope { + fn from(scope: NodeLocationScope) -> Self { + match scope { + NodeLocationScope::Node => Self::Node, + NodeLocationScope::Zone => Self::Zone, + NodeLocationScope::Region => Self::Region, + NodeLocationScope::Root => panic!("Root is not a valid location scope"), + } + } +} + +impl From for NodeLocationScope { + fn from(scope: LocationScope) -> Self { + match scope { + LocationScope::Node => Self::Node, + LocationScope::Zone => Self::Zone, + LocationScope::Region => Self::Region, + } + } +} + #[derive(Debug, thiserror::Error)] #[error("{0}")] pub struct ReplicationPropertyError(String); @@ -119,6 +142,7 @@ impl ReplicationProperty { Ok(self) } + /// Total number of copies required to satisfy the replication property pub fn num_copies(&self) -> u8 { *self .0 @@ -127,20 +151,26 @@ impl ReplicationProperty { .1 } - pub fn at_scope_or_greater(&self, scope: LocationScope) -> Option<(&LocationScope, &u8)> { - self.0.range(scope..).next() - } - - pub fn at_smallest_scope(&self) -> (&LocationScope, &u8) { - self.0 - .first_key_value() - .expect("must have at least one scope") + /// How many copies are required at this location scope. + /// Returns None if no copies are defined at the given scope. + /// For instance {zone: 2, node: 3} replication will return None at region scope. + /// + /// Note that it's guaranteed to get a value for replication at node-level scope. + pub fn copies_at_scope(&self, scope: impl Into) -> Option { + let scope = scope.into(); + if scope == LocationScope::MIN { + Some(self.num_copies()) + } else { + self.0.get(&scope).copied() + } } - pub fn at_greatest_scope(&self) -> (&LocationScope, &u8) { - self.0 + pub fn greatest_defined_scope(&self) -> LocationScope { + *self + .0 .last_key_value() .expect("must have at least one scope") + .0 } } @@ -238,25 +268,26 @@ mod tests { fn test_replication_property() -> Result<()> { let mut r = ReplicationProperty::new(NonZeroU8::new(4).unwrap()); assert_that!(r.num_copies(), eq(4)); - assert_that!(r.at_greatest_scope(), eq((&LocationScope::Node, &4))); - assert_that!( - r.at_scope_or_greater(LocationScope::Node), - some(eq((&LocationScope::Node, &4))) - ); + assert_that!(r.greatest_defined_scope(), eq(LocationScope::Node)); + + assert_that!(r.copies_at_scope(LocationScope::Node), some(eq(4))); + assert_that!(r.copies_at_scope(LocationScope::Zone), none()); + assert_that!(r.copies_at_scope(LocationScope::Region), none()); r.set_scope(LocationScope::Region, NonZeroU8::new(2).unwrap())?; assert_that!(r.num_copies(), eq(4)); - assert_that!( - r.at_scope_or_greater(LocationScope::Zone), - some(eq((&LocationScope::Region, &2))) - ); + + assert_that!(r.copies_at_scope(LocationScope::Node), some(eq(4))); + assert_that!(r.copies_at_scope(LocationScope::Zone), none()); + assert_that!(r.copies_at_scope(LocationScope::Region), some(eq(2))); r.set_scope(LocationScope::Zone, NonZeroU8::new(2).unwrap())?; assert_that!(r.num_copies(), eq(4)); - assert_that!( - r.at_scope_or_greater(LocationScope::Zone), - some(eq((&LocationScope::Zone, &2))) - ); + + assert_that!(r.copies_at_scope(LocationScope::Node), some(eq(4))); + assert_that!(r.copies_at_scope(LocationScope::Zone), some(eq(2))); + assert_that!(r.copies_at_scope(LocationScope::Region), some(eq(2))); + Ok(()) } @@ -275,6 +306,12 @@ mod tests { .unwrap(); assert_that!(r, ok(eq(expected))); + let r: ReplicationProperty = "{zone: 2}".parse().unwrap(); + assert_that!(r.num_copies(), eq(2)); + + assert_that!(r.copies_at_scope(LocationScope::Node), some(eq(2))); + assert_that!(r.copies_at_scope(LocationScope::Zone), some(eq(2))); + assert_that!(r.copies_at_scope(LocationScope::Region), none()); Ok(()) } }