Skip to content

Commit

Permalink
refactor: check conf change before insert it into logs
Browse files Browse the repository at this point in the history
Signed-off-by: themanforfree <[email protected]>
  • Loading branch information
themanforfree authored and mergify[bot] committed Oct 13, 2023
1 parent 817ecd9 commit 6102c7b
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 74 deletions.
2 changes: 1 addition & 1 deletion curp/src/members.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ mod tests {
let node1_url = node1.self_addrs();
assert!(!peers.contains_key(&node1_id));
assert_eq!(peers.len(), 2);
assert_eq!(node1.voters_len(), peers.len() + 1); // TODO fix test
assert_eq!(node1.voters_len(), peers.len() + 1);

let peer_urls = peers.values().collect::<Vec<_>>();

Expand Down
128 changes: 65 additions & 63 deletions curp/src/server/raw_curp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,16 +314,9 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {
Err(ConfChangeError::new_propose(ProposeError::NotLeader)),
);
}
if let Some(change) = conf_change.changes().first() {
if let ConfChangeType::Promote = change.change_type() {
let learner_index = self.lst.get_match_index(change.node_id);
let leader_index = self.log.read().last_log_index();
if leader_index.overflow_sub(learner_index) > MAX_PROMOTE_GAP {
return (info, Err(ConfChangeError::LearnerNotCatchUp(())));
}
}
if let Err(e) = self.check_new_config(conf_change.changes()) {
return (info, Err(e));
}

let pool_entry = PoolEntry::from(conf_change.clone());
let mut conflict = self.insert_sp(pool_entry.clone());
conflict |= self.insert_ucp(pool_entry);
Expand All @@ -343,10 +336,7 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {
}
Err(e) => return (info, Err(ConfChangeError::new_propose(e.into()))),
};
let (addrs, name, is_learner) = match self.apply_conf_change(changes) {
Ok(fallback_info) => fallback_info,
Err(e) => return (info, Err(e)),
};
let (addrs, name, is_learner) = self.apply_conf_change(changes);
let _ig = log_w.fallback_contexts.insert(
entry.index,
FallbackContext::new(Arc::clone(&entry), addrs, name, is_learner),
Expand Down Expand Up @@ -422,8 +412,7 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {
let EntryData::ConfChange(ref cc) = e.entry_data else {
unreachable!("cc_entry should be conf change entry");
};
let (addrs, name, is_learner) = self.apply_conf_change(cc.changes().to_owned())
.unwrap_or_else(|_e| unreachable!("apply_conf_change should succeed, because the check of conf change already passed on leader"));
let (addrs, name, is_learner) = self.apply_conf_change(cc.changes().to_owned());
let _ig = log_w.fallback_contexts.insert(
e.index,
FallbackContext::new(Arc::clone(&e), addrs, name, is_learner),
Expand Down Expand Up @@ -927,17 +916,75 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {
self.lst.check_all(|f| f.match_index == leader_commit_index)
}

/// Check if the new config is valid
pub(super) fn check_new_config(&self, changes: &[ConfChange]) -> Result<(), ConfChangeError> {
assert_eq!(changes.len(), 1, "Joint consensus is not supported yet");
let Some(conf_change) = changes.iter().next() else {
unreachable!("conf change is empty");
};
let mut statuses_ids = self
.lst
.get_all_statuses()
.keys()
.copied()
.chain([self.id()])
.collect::<HashSet<_>>();
let mut config = self.cst.map_lock(|cst_l| cst_l.config.clone());
let node_id = conf_change.node_id;
match conf_change.change_type() {
ConfChangeType::Add => {
if !statuses_ids.insert(node_id) || !config.insert(node_id, false) {
return Err(ConfChangeError::NodeAlreadyExists(()));
}
}
ConfChangeType::Remove => {
if !statuses_ids.remove(&node_id) || !config.remove(node_id) {
return Err(ConfChangeError::NodeNotExists(()));
}
}
ConfChangeType::Update => {
if statuses_ids.get(&node_id).is_none() || !config.contains(node_id) {
return Err(ConfChangeError::NodeNotExists(()));
}
}
ConfChangeType::AddLearner => {
if !statuses_ids.insert(node_id) || !config.insert(node_id, true) {
return Err(ConfChangeError::NodeAlreadyExists(()));
}
}
ConfChangeType::Promote => {
if statuses_ids.get(&node_id).is_none() || !config.contains(node_id) {
return Err(ConfChangeError::NodeNotExists(()));
}
let learner_index = self.lst.get_match_index(node_id);
let leader_index = self.log.read().last_log_index();
if leader_index.overflow_sub(learner_index) > MAX_PROMOTE_GAP {
return Err(ConfChangeError::LearnerNotCatchUp(()));
}
}
}
let mut all_nodes = HashSet::new();
all_nodes.extend(config.voters());
all_nodes.extend(&config.learners);
if statuses_ids.len() < 3
|| all_nodes != statuses_ids
|| !config.voters().is_disjoint(&config.learners)
{
return Err(ConfChangeError::InvalidConfig(()));
}
Ok(())
}

/// Apply conf changes and return true if self node is removed
pub(super) fn apply_conf_change(
&self,
changes: Vec<ConfChange>,
) -> Result<(Vec<String>, String, bool), ConfChangeError> {
) -> (Vec<String>, String, bool) {
assert_eq!(changes.len(), 1, "Joint consensus is not supported yet");
let Some(conf_change) = changes.into_iter().next() else {
unreachable!("conf change is empty");
};
self.check_new_config(&conf_change)?;
Ok(self.switch_config(conf_change))
self.switch_config(conf_change)
}

/// Fallback conf change
Expand Down Expand Up @@ -1273,51 +1320,6 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {
self.ctx.ucp.lock().clear();
}

/// Check if the new config is valid
fn check_new_config(&self, conf_change: &ConfChange) -> Result<(), ConfChangeError> {
let mut statuses_ids = self
.lst
.get_all_statuses()
.keys()
.copied()
.chain([self.id()])
.collect::<HashSet<_>>();
let mut config = self.cst.map_lock(|cst_l| cst_l.config.clone());
let node_id = conf_change.node_id;
match conf_change.change_type() {
ConfChangeType::Add => {
if !statuses_ids.insert(node_id) || !config.insert(node_id, false) {
return Err(ConfChangeError::NodeAlreadyExists(()));
}
}
ConfChangeType::Remove => {
if !statuses_ids.remove(&node_id) || !config.remove(node_id) {
return Err(ConfChangeError::NodeNotExists(()));
}
}
ConfChangeType::Update | ConfChangeType::Promote => {
if statuses_ids.get(&node_id).is_none() || !config.contains(node_id) {
return Err(ConfChangeError::NodeNotExists(()));
}
}
ConfChangeType::AddLearner => {
if !statuses_ids.insert(node_id) || !config.insert(node_id, true) {
return Err(ConfChangeError::NodeAlreadyExists(()));
}
}
}
let mut all_nodes = HashSet::new();
all_nodes.extend(config.voters());
all_nodes.extend(&config.learners);
if statuses_ids.len() < 3
|| all_nodes != statuses_ids
|| !config.voters().is_disjoint(&config.learners)
{
return Err(ConfChangeError::InvalidConfig(()));
}
Ok(())
}

/// Switch to a new config and return old member infos for fallback
fn switch_config(&self, conf_change: ConfChange) -> (Vec<String>, String, bool) {
let node_id = conf_change.node_id;
Expand Down
23 changes: 13 additions & 10 deletions curp/src/server/raw_curp/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,8 @@ fn add_node_should_add_new_node_to_curp() {
};
let old_cluster = curp.cluster().clone();
let changes = vec![ConfChange::add(1, vec!["http://127.0.0.1:4567".to_owned()])];
let infos = curp.apply_conf_change(changes.clone()).unwrap();
assert!(curp.check_new_config(&changes).is_ok());
let infos = curp.apply_conf_change(changes.clone());
assert!(curp.contains(1));
curp.fallback_conf_change(changes, infos.0, infos.1, infos.2);
let cluster_after_fallback = curp.cluster();
Expand All @@ -776,11 +777,13 @@ fn add_learner_node_and_promote_should_success() {
1,
vec!["http://127.0.0.1:4567".to_owned()],
)];
assert!(curp.apply_conf_change(changes).is_ok());
assert!(curp.check_new_config(&changes).is_ok());
curp.apply_conf_change(changes);
assert!(curp.check_learner(1, true));

let changes = vec![ConfChange::promote(1)];
assert!(curp.apply_conf_change(changes).is_ok());
assert!(curp.check_new_config(&changes).is_ok());
curp.apply_conf_change(changes);
assert!(curp.check_learner(1, false));
}

Expand All @@ -796,7 +799,7 @@ fn add_exists_node_should_return_node_already_exists_error() {
exists_node_id,
vec!["http://127.0.0.1:4567".to_owned()],
)];
let resp = curp.apply_conf_change(changes);
let resp = curp.check_new_config(&changes);
let error_match = matches!(resp, Err(ConfChangeError::NodeAlreadyExists(())));
assert!(error_match);
}
Expand All @@ -811,8 +814,8 @@ fn remove_node_should_remove_node_from_curp() {
let old_cluster = curp.cluster().clone();
let follower_id = curp.cluster().get_id_by_name("S1").unwrap();
let changes = vec![ConfChange::remove(follower_id)];
let resp = curp.apply_conf_change(changes.clone());
let infos = resp.unwrap();
assert!(curp.check_new_config(&changes).is_ok());
let infos = curp.apply_conf_change(changes.clone());
assert_eq!(infos, (vec!["S1".to_owned()], "S1".to_owned(), false));
assert!(!curp.contains(follower_id));
curp.fallback_conf_change(changes, infos.0, infos.1, infos.2);
Expand All @@ -836,7 +839,7 @@ fn remove_non_exists_node_should_return_node_not_exists_error() {
Arc::new(RawCurp::new_test(5, exe_tx, mock_role_change()))
};
let changes = vec![ConfChange::remove(1)];
let resp = curp.apply_conf_change(changes);
let resp = curp.check_new_config(&changes);
assert!(matches!(resp, Err(ConfChangeError::NodeNotExists(()))));
}

Expand All @@ -849,7 +852,7 @@ fn remove_node_should_return_invalid_config_error_when_nodes_count_less_than_3()
};
let follower_id = curp.cluster().get_id_by_name("S1").unwrap();
let changes = vec![ConfChange::remove(follower_id)];
let resp = curp.apply_conf_change(changes);
let resp = curp.check_new_config(&changes);
assert!(matches!(resp, Err(ConfChangeError::InvalidConfig(()))));
}

Expand All @@ -876,8 +879,8 @@ fn update_node_should_update_the_address_of_node() {
follower_id,
vec!["http://127.0.0.1:4567".to_owned()],
)];
let resp = curp.apply_conf_change(changes.clone());
let infos = resp.unwrap();
assert!(curp.check_new_config(&changes).is_ok());
let infos = curp.apply_conf_change(changes.clone());
assert_eq!(infos, (vec!["S1".to_owned()], String::new(), false));
assert_eq!(
curp.cluster().addrs(follower_id),
Expand Down

0 comments on commit 6102c7b

Please sign in to comment.