From 2fea60592a59cb5d406f9e7972ef7c17c40f0f7a Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Fri, 15 Mar 2024 16:38:57 +0800 Subject: [PATCH] test: add tests for curp conflict pools Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- crates/curp/src/rpc/mod.rs | 12 +- crates/curp/src/server/conflict/mod.rs | 19 +- crates/curp/src/server/conflict/tests.rs | 266 +++++++++++++++++++++++ 3 files changed, 277 insertions(+), 20 deletions(-) create mode 100644 crates/curp/src/server/conflict/tests.rs diff --git a/crates/curp/src/rpc/mod.rs b/crates/curp/src/rpc/mod.rs index bcfec2acd..30de9673d 100644 --- a/crates/curp/src/rpc/mod.rs +++ b/crates/curp/src/rpc/mod.rs @@ -835,6 +835,7 @@ impl From for tonic::Status { /// Entry of speculative pool #[derive(Debug, Clone, Serialize, Deserialize)] +#[cfg_attr(test, derive(PartialEq))] pub(crate) struct PoolEntry { /// Propose id pub(crate) id: ProposeId, @@ -844,6 +845,7 @@ pub(crate) struct PoolEntry { /// Inner entry of speculative pool #[derive(Debug, Clone, Serialize, Deserialize)] +#[cfg_attr(test, derive(PartialEq))] pub(crate) enum PoolEntryInner { /// Command entry Command(Arc), @@ -851,10 +853,7 @@ pub(crate) enum PoolEntryInner { ConfChange(Vec), } -impl PoolEntry -where - C: Command, -{ +impl PoolEntry { /// Create a new pool entry pub(crate) fn new(id: ProposeId, inner: impl Into>) -> Self { Self { @@ -862,7 +861,12 @@ where inner: inner.into(), } } +} +impl PoolEntry +where + C: Command, +{ /// Check if the entry is conflict with the command pub(crate) fn is_conflict_with_cmd(&self, c: &C) -> bool { match self.inner { diff --git a/crates/curp/src/server/conflict/mod.rs b/crates/curp/src/server/conflict/mod.rs index ce6563032..07176758b 100644 --- a/crates/curp/src/server/conflict/mod.rs +++ b/crates/curp/src/server/conflict/mod.rs @@ -7,6 +7,9 @@ pub(crate) mod spec_pool_new; /// Uncommitted pool pub(crate) mod uncommitted_pool; +#[cfg(test)] +mod tests; + use std::{ops::Deref, sync::Arc}; use crate::rpc::{ConfChange, PoolEntry, PoolEntryInner, ProposeId}; @@ -79,22 +82,6 @@ impl From> for PoolEntry { } } -impl std::hash::Hash for CommandEntry { - #[inline] - fn hash(&self, state: &mut H) { - self.id.hash(state); - } -} - -impl PartialEq for CommandEntry { - #[inline] - fn eq(&self, other: &Self) -> bool { - self.id.eq(&other.id) - } -} - -impl Eq for CommandEntry {} - /// Conf change entry type #[derive(Clone, PartialEq)] pub(super) struct ConfChangeEntry { diff --git a/crates/curp/src/server/conflict/tests.rs b/crates/curp/src/server/conflict/tests.rs new file mode 100644 index 000000000..ddf04a493 --- /dev/null +++ b/crates/curp/src/server/conflict/tests.rs @@ -0,0 +1,266 @@ +use std::{cmp::Ordering, sync::Arc}; + +use curp_external_api::conflict::{SpeculativePoolOp, UncommittedPoolOp}; + +use crate::{ + rpc::{ConfChange, PoolEntry, PoolEntryInner, ProposeId}, + server::conflict::uncommitted_pool::UncommittedPool, +}; + +use super::{spec_pool_new::SpeculativePool, CommandEntry}; + +#[derive(Debug, Default)] +struct TestSp { + entries: Vec>, +} + +impl SpeculativePoolOp for TestSp { + type Entry = CommandEntry; + + fn insert(&mut self, entry: Self::Entry) -> Option { + if self.entries.iter().any(|e| e.as_ref() == entry.as_ref()) { + return Some(entry); + } + self.entries.push(entry); + None + } + + fn len(&self) -> usize { + self.entries.len() + } + + fn is_empty(&self) -> bool { + self.entries.is_empty() + } + + fn remove(&mut self, entry: Self::Entry) { + if let Some(pos) = self + .entries + .iter() + .position(|e| e.as_ref() == entry.as_ref()) + { + self.entries.remove(pos); + } + } + + fn all(&self) -> Vec { + self.entries.clone() + } + + fn clear(&mut self) { + self.entries.clear(); + } +} + +#[derive(Debug, Default)] +struct TestUcp { + entries: Vec>, +} + +impl UncommittedPoolOp for TestUcp { + type Entry = CommandEntry; + + fn insert(&mut self, entry: Self::Entry) -> bool { + let conflict = self.entries.iter().any(|e| e.as_ref() == entry.as_ref()); + self.entries.push(entry); + conflict + } + + fn all_conflict(&self, entry: &Self::Entry) -> Vec { + self.entries + .iter() + .filter_map(|e| (e.as_ref() == entry.as_ref()).then_some(e.clone())) + .collect() + } + + fn all(&self) -> Vec { + self.entries.clone() + } + + fn len(&self) -> usize { + self.entries.len() + } + + fn is_empty(&self) -> bool { + self.entries.is_empty() + } + + fn remove(&mut self, entry: Self::Entry) { + if let Some(pos) = self + .entries + .iter() + .position(|e| e.as_ref() == entry.as_ref()) + { + self.entries.remove(pos); + } + } + + fn clear(&mut self) { + self.entries.clear(); + } +} + +impl Eq for PoolEntry {} + +impl PartialOrd for PoolEntry { + fn partial_cmp(&self, other: &Self) -> Option { + #[allow(clippy::pattern_type_mismatch)] + match (&self.inner, &other.inner) { + (PoolEntryInner::Command(a), PoolEntryInner::Command(b)) => a.partial_cmp(&b), + (PoolEntryInner::Command(_), PoolEntryInner::ConfChange(_)) => Some(Ordering::Less), + (PoolEntryInner::ConfChange(_), PoolEntryInner::Command(_)) => Some(Ordering::Greater), + (PoolEntryInner::ConfChange(a), PoolEntryInner::ConfChange(b)) => { + for (ae, be) in a.iter().zip(b.iter()) { + let ord = ae.change_type.cmp(&be.change_type).then( + ae.node_id + .cmp(&be.node_id) + .then(ae.address.cmp(&be.address)), + ); + if !matches!(ord, Ordering::Equal) { + return Some(ord); + } + } + if a.len() > b.len() { + return Some(Ordering::Greater); + } + return Some(Ordering::Less); + } + } + } +} + +impl Ord for PoolEntry { + fn cmp(&self, other: &Self) -> Ordering { + self.partial_cmp(other).unwrap() + } +} + +#[test] +fn conflict_should_be_detected_in_sp() { + let mut sp = SpeculativePool::new(vec![Box::new(TestSp::default())]); + let entry1 = PoolEntry::new(ProposeId::default(), Arc::new(0)); + let entry2 = PoolEntry::new(ProposeId::default(), Arc::new(1)); + assert!(sp.insert(entry1.clone()).is_none()); + assert!(sp.insert(entry2).is_none()); + assert!(sp.insert(entry1.clone()).is_some()); + sp.remove(entry1.clone()); + assert!(sp.insert(entry1).is_none()); +} + +#[test] +fn conf_change_should_conflict_should_all_entries_in_sp() { + let mut sp = SpeculativePool::new(vec![Box::new(TestSp::default())]); + let entry1 = PoolEntry::new(ProposeId::default(), Arc::new(0)); + let entry2 = PoolEntry::new(ProposeId::default(), Arc::new(1)); + let entry3 = PoolEntry::::new(ProposeId::default(), vec![ConfChange::default()]); + let entry4 = PoolEntry::::new( + ProposeId::default(), + vec![ConfChange { + change_type: 0, + node_id: 1, + address: vec![], + }], + ); + assert!(sp.insert(entry3.clone()).is_none()); + assert!(sp.insert(entry1.clone()).is_some()); + assert!(sp.insert(entry2.clone()).is_some()); + assert!(sp.insert(entry4).is_some()); + sp.remove(entry3.clone()); + assert!(sp.insert(entry1).is_none()); + assert!(sp.insert(entry3).is_some()); +} + +#[test] +fn sp_should_returns_all_entries() { + let mut sp = SpeculativePool::new(vec![Box::new(TestSp::default())]); + let entries: Vec<_> = (0..10) + .map(|i| PoolEntry::new(ProposeId::default(), Arc::new(i))) + .collect(); + for e in entries.clone() { + sp.insert(e); + } + /// conflict entries should not be inserted + for e in entries.clone() { + assert!(sp.insert(e).is_some()); + } + let results = sp.all(); + assert_eq!(entries, results); + assert_eq!(sp.len(), 10); +} + +#[test] +fn conflict_should_be_detected_in_ucp() { + let mut ucp = UncommittedPool::new(vec![Box::new(TestUcp::default())]); + let entry1 = PoolEntry::new(ProposeId::default(), Arc::new(0)); + let entry2 = PoolEntry::new(ProposeId::default(), Arc::new(1)); + assert!(!ucp.insert(entry1.clone())); + assert!(!ucp.insert(entry2)); + assert!(ucp.insert(entry1.clone())); + ucp.remove(entry1.clone()); + // Ucp allows conflict cmds to co-exist in the same pool. + // Therefore, we should still get `conflict=true` + assert!(ucp.insert(entry1.clone())); + ucp.remove(entry1.clone()); + ucp.remove(entry1.clone()); + assert!(!ucp.insert(entry1)); +} + +#[test] +fn conf_change_should_conflict_should_all_entries_in_ucp() { + let mut ucp = UncommittedPool::new(vec![Box::new(TestUcp::default())]); + let entry1 = PoolEntry::new(ProposeId::default(), Arc::new(0)); + let entry2 = PoolEntry::new(ProposeId::default(), Arc::new(1)); + let entry3 = PoolEntry::::new(ProposeId::default(), vec![ConfChange::default()]); + let entry4 = PoolEntry::::new( + ProposeId::default(), + vec![ConfChange { + change_type: 0, + node_id: 1, + address: vec![], + }], + ); + assert!(!ucp.insert(entry3.clone())); + assert!(ucp.insert(entry1.clone())); + assert!(ucp.insert(entry4.clone())); + ucp.remove(entry3.clone()); + ucp.remove(entry4.clone()); + assert!(!ucp.insert(entry2)); + assert!(ucp.insert(entry3)); +} + +#[test] +fn ucp_should_returns_all_entries() { + let mut ucp = UncommittedPool::new(vec![Box::new(TestUcp::default())]); + let entries: Vec<_> = (0..10) + .map(|i| PoolEntry::new(ProposeId::default(), Arc::new(i))) + .collect(); + for e in entries.clone() { + ucp.insert(e); + } + for e in entries.clone() { + assert!(ucp.insert(e)); + } + let results = ucp.all(); + + let expect: Vec<_> = entries.clone().into_iter().chain(entries).collect(); + assert_eq!(expect, results); +} + +#[test] +fn ucp_should_returns_all_conflict_entries() { + let mut ucp = UncommittedPool::new(vec![Box::new(TestUcp::default())]); + let entries: Vec<_> = (0..10) + .map(|i| PoolEntry::new(ProposeId::default(), Arc::new(i))) + .collect(); + for e in &entries { + ucp.insert(e.clone()); + ucp.insert(e.clone()); + } + let conf_change = PoolEntry::::new(ProposeId::default(), vec![ConfChange::default()]); + ucp.insert(conf_change.clone()); + for e in entries { + let mut all = ucp.all_conflict(e.clone()); + all.sort(); + assert_eq!(all, vec![e.clone(), e.clone(), conf_change.clone()]); + } +}