diff --git a/curp/src/members.rs b/curp/src/members.rs index 03720bae8..e0df97507 100644 --- a/curp/src/members.rs +++ b/curp/src/members.rs @@ -318,7 +318,7 @@ mod tests { let node1_url = node1.self_addrs(); assert!(!peers.contains_key(&node1_id)); assert_eq!(peers.len(), 2); - assert_eq!(node1.voters_len(), peers.len() + 1); // TODO fix test + assert_eq!(node1.voters_len(), peers.len() + 1); let peer_urls = peers.values().collect::>(); diff --git a/curp/src/server/raw_curp/mod.rs b/curp/src/server/raw_curp/mod.rs index 1693f62da..7341f81e3 100644 --- a/curp/src/server/raw_curp/mod.rs +++ b/curp/src/server/raw_curp/mod.rs @@ -314,16 +314,9 @@ impl RawCurp { Err(ConfChangeError::new_propose(ProposeError::NotLeader)), ); } - if let Some(change) = conf_change.changes().first() { - if let ConfChangeType::Promote = change.change_type() { - let learner_index = self.lst.get_match_index(change.node_id); - let leader_index = self.log.read().last_log_index(); - if leader_index.overflow_sub(learner_index) > MAX_PROMOTE_GAP { - return (info, Err(ConfChangeError::LearnerNotCatchUp(()))); - } - } + if let Err(e) = self.check_new_config(conf_change.changes()) { + return (info, Err(e)); } - let pool_entry = PoolEntry::from(conf_change.clone()); let mut conflict = self.insert_sp(pool_entry.clone()); conflict |= self.insert_ucp(pool_entry); @@ -343,10 +336,7 @@ impl RawCurp { } Err(e) => return (info, Err(ConfChangeError::new_propose(e.into()))), }; - let (addrs, name, is_learner) = match self.apply_conf_change(changes) { - Ok(fallback_info) => fallback_info, - Err(e) => return (info, Err(e)), - }; + let (addrs, name, is_learner) = self.apply_conf_change(changes); let _ig = log_w.fallback_contexts.insert( entry.index, FallbackContext::new(Arc::clone(&entry), addrs, name, is_learner), @@ -422,8 +412,7 @@ impl RawCurp { let EntryData::ConfChange(ref cc) = e.entry_data else { unreachable!("cc_entry should be conf change entry"); }; - let (addrs, name, is_learner) = self.apply_conf_change(cc.changes().to_owned()) - .unwrap_or_else(|_e| unreachable!("apply_conf_change should succeed, because the check of conf change already passed on leader")); + let (addrs, name, is_learner) = self.apply_conf_change(cc.changes().to_owned()); let _ig = log_w.fallback_contexts.insert( e.index, FallbackContext::new(Arc::clone(&e), addrs, name, is_learner), @@ -927,17 +916,75 @@ impl RawCurp { self.lst.check_all(|f| f.match_index == leader_commit_index) } + /// Check if the new config is valid + pub(super) fn check_new_config(&self, changes: &[ConfChange]) -> Result<(), ConfChangeError> { + assert_eq!(changes.len(), 1, "Joint consensus is not supported yet"); + let Some(conf_change) = changes.iter().next() else { + unreachable!("conf change is empty"); + }; + let mut statuses_ids = self + .lst + .get_all_statuses() + .keys() + .copied() + .chain([self.id()]) + .collect::>(); + let mut config = self.cst.map_lock(|cst_l| cst_l.config.clone()); + let node_id = conf_change.node_id; + match conf_change.change_type() { + ConfChangeType::Add => { + if !statuses_ids.insert(node_id) || !config.insert(node_id, false) { + return Err(ConfChangeError::NodeAlreadyExists(())); + } + } + ConfChangeType::Remove => { + if !statuses_ids.remove(&node_id) || !config.remove(node_id) { + return Err(ConfChangeError::NodeNotExists(())); + } + } + ConfChangeType::Update => { + if statuses_ids.get(&node_id).is_none() || !config.contains(node_id) { + return Err(ConfChangeError::NodeNotExists(())); + } + } + ConfChangeType::AddLearner => { + if !statuses_ids.insert(node_id) || !config.insert(node_id, true) { + return Err(ConfChangeError::NodeAlreadyExists(())); + } + } + ConfChangeType::Promote => { + if statuses_ids.get(&node_id).is_none() || !config.contains(node_id) { + return Err(ConfChangeError::NodeNotExists(())); + } + let learner_index = self.lst.get_match_index(node_id); + let leader_index = self.log.read().last_log_index(); + if leader_index.overflow_sub(learner_index) > MAX_PROMOTE_GAP { + return Err(ConfChangeError::LearnerNotCatchUp(())); + } + } + } + let mut all_nodes = HashSet::new(); + all_nodes.extend(config.voters()); + all_nodes.extend(&config.learners); + if statuses_ids.len() < 3 + || all_nodes != statuses_ids + || !config.voters().is_disjoint(&config.learners) + { + return Err(ConfChangeError::InvalidConfig(())); + } + Ok(()) + } + /// Apply conf changes and return true if self node is removed pub(super) fn apply_conf_change( &self, changes: Vec, - ) -> Result<(Vec, String, bool), ConfChangeError> { + ) -> (Vec, String, bool) { assert_eq!(changes.len(), 1, "Joint consensus is not supported yet"); let Some(conf_change) = changes.into_iter().next() else { unreachable!("conf change is empty"); }; - self.check_new_config(&conf_change)?; - Ok(self.switch_config(conf_change)) + self.switch_config(conf_change) } /// Fallback conf change @@ -1273,51 +1320,6 @@ impl RawCurp { self.ctx.ucp.lock().clear(); } - /// Check if the new config is valid - fn check_new_config(&self, conf_change: &ConfChange) -> Result<(), ConfChangeError> { - let mut statuses_ids = self - .lst - .get_all_statuses() - .keys() - .copied() - .chain([self.id()]) - .collect::>(); - let mut config = self.cst.map_lock(|cst_l| cst_l.config.clone()); - let node_id = conf_change.node_id; - match conf_change.change_type() { - ConfChangeType::Add => { - if !statuses_ids.insert(node_id) || !config.insert(node_id, false) { - return Err(ConfChangeError::NodeAlreadyExists(())); - } - } - ConfChangeType::Remove => { - if !statuses_ids.remove(&node_id) || !config.remove(node_id) { - return Err(ConfChangeError::NodeNotExists(())); - } - } - ConfChangeType::Update | ConfChangeType::Promote => { - if statuses_ids.get(&node_id).is_none() || !config.contains(node_id) { - return Err(ConfChangeError::NodeNotExists(())); - } - } - ConfChangeType::AddLearner => { - if !statuses_ids.insert(node_id) || !config.insert(node_id, true) { - return Err(ConfChangeError::NodeAlreadyExists(())); - } - } - } - let mut all_nodes = HashSet::new(); - all_nodes.extend(config.voters()); - all_nodes.extend(&config.learners); - if statuses_ids.len() < 3 - || all_nodes != statuses_ids - || !config.voters().is_disjoint(&config.learners) - { - return Err(ConfChangeError::InvalidConfig(())); - } - Ok(()) - } - /// Switch to a new config and return old member infos for fallback fn switch_config(&self, conf_change: ConfChange) -> (Vec, String, bool) { let node_id = conf_change.node_id; diff --git a/curp/src/server/raw_curp/tests.rs b/curp/src/server/raw_curp/tests.rs index 0ec41a964..2b4e31ea3 100644 --- a/curp/src/server/raw_curp/tests.rs +++ b/curp/src/server/raw_curp/tests.rs @@ -750,7 +750,8 @@ fn add_node_should_add_new_node_to_curp() { }; let old_cluster = curp.cluster().clone(); let changes = vec![ConfChange::add(1, vec!["http://127.0.0.1:4567".to_owned()])]; - let infos = curp.apply_conf_change(changes.clone()).unwrap(); + assert!(curp.check_new_config(&changes).is_ok()); + let infos = curp.apply_conf_change(changes.clone()); assert!(curp.contains(1)); curp.fallback_conf_change(changes, infos.0, infos.1, infos.2); let cluster_after_fallback = curp.cluster(); @@ -776,11 +777,13 @@ fn add_learner_node_and_promote_should_success() { 1, vec!["http://127.0.0.1:4567".to_owned()], )]; - assert!(curp.apply_conf_change(changes).is_ok()); + assert!(curp.check_new_config(&changes).is_ok()); + curp.apply_conf_change(changes); assert!(curp.check_learner(1, true)); let changes = vec![ConfChange::promote(1)]; - assert!(curp.apply_conf_change(changes).is_ok()); + assert!(curp.check_new_config(&changes).is_ok()); + curp.apply_conf_change(changes); assert!(curp.check_learner(1, false)); } @@ -796,7 +799,7 @@ fn add_exists_node_should_return_node_already_exists_error() { exists_node_id, vec!["http://127.0.0.1:4567".to_owned()], )]; - let resp = curp.apply_conf_change(changes); + let resp = curp.check_new_config(&changes); let error_match = matches!(resp, Err(ConfChangeError::NodeAlreadyExists(()))); assert!(error_match); } @@ -811,8 +814,8 @@ fn remove_node_should_remove_node_from_curp() { let old_cluster = curp.cluster().clone(); let follower_id = curp.cluster().get_id_by_name("S1").unwrap(); let changes = vec![ConfChange::remove(follower_id)]; - let resp = curp.apply_conf_change(changes.clone()); - let infos = resp.unwrap(); + assert!(curp.check_new_config(&changes).is_ok()); + let infos = curp.apply_conf_change(changes.clone()); assert_eq!(infos, (vec!["S1".to_owned()], "S1".to_owned(), false)); assert!(!curp.contains(follower_id)); curp.fallback_conf_change(changes, infos.0, infos.1, infos.2); @@ -836,7 +839,7 @@ fn remove_non_exists_node_should_return_node_not_exists_error() { Arc::new(RawCurp::new_test(5, exe_tx, mock_role_change())) }; let changes = vec![ConfChange::remove(1)]; - let resp = curp.apply_conf_change(changes); + let resp = curp.check_new_config(&changes); assert!(matches!(resp, Err(ConfChangeError::NodeNotExists(())))); } @@ -849,7 +852,7 @@ fn remove_node_should_return_invalid_config_error_when_nodes_count_less_than_3() }; let follower_id = curp.cluster().get_id_by_name("S1").unwrap(); let changes = vec![ConfChange::remove(follower_id)]; - let resp = curp.apply_conf_change(changes); + let resp = curp.check_new_config(&changes); assert!(matches!(resp, Err(ConfChangeError::InvalidConfig(())))); } @@ -876,8 +879,8 @@ fn update_node_should_update_the_address_of_node() { follower_id, vec!["http://127.0.0.1:4567".to_owned()], )]; - let resp = curp.apply_conf_change(changes.clone()); - let infos = resp.unwrap(); + assert!(curp.check_new_config(&changes).is_ok()); + let infos = curp.apply_conf_change(changes.clone()); assert_eq!(infos, (vec!["S1".to_owned()], String::new(), false)); assert_eq!( curp.cluster().addrs(follower_id),