From bca6be2330fb48c25751d11fe7090a89f46bb9b4 Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Thu, 29 Feb 2024 16:17:28 +0800 Subject: [PATCH] refactor: implement conflict pools in curp Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- crates/curp/src/server/conflict/mod.rs | 114 ++++++++++++ .../curp/src/server/conflict/spec_pool_new.rs | 129 ++++++++++++++ .../src/server/conflict/uncommitted_pool.rs | 166 ++++++++++++++++++ crates/curp/src/server/mod.rs | 3 + 4 files changed, 412 insertions(+) create mode 100644 crates/curp/src/server/conflict/mod.rs create mode 100644 crates/curp/src/server/conflict/spec_pool_new.rs create mode 100644 crates/curp/src/server/conflict/uncommitted_pool.rs diff --git a/crates/curp/src/server/conflict/mod.rs b/crates/curp/src/server/conflict/mod.rs new file mode 100644 index 000000000..ce6563032 --- /dev/null +++ b/crates/curp/src/server/conflict/mod.rs @@ -0,0 +1,114 @@ +#![allow(unused)] +#![allow(unreachable_pub)] + +/// Speculative pool +pub(crate) mod spec_pool_new; + +/// Uncommitted pool +pub(crate) mod uncommitted_pool; + +use std::{ops::Deref, sync::Arc}; + +use crate::rpc::{ConfChange, PoolEntry, PoolEntryInner, ProposeId}; + +// TODO: relpace `PoolEntry` with this +/// Entry stored in conflict pools +pub(super) enum SplitEntry { + /// A command entry + Command(CommandEntry), + /// A conf change entry + ConfChange(ConfChangeEntry), +} + +impl From> for SplitEntry { + fn from(entry: PoolEntry) -> Self { + match entry.inner { + PoolEntryInner::Command(c) => SplitEntry::Command(CommandEntry { + id: entry.id, + cmd: c, + }), + PoolEntryInner::ConfChange(c) => SplitEntry::ConfChange(ConfChangeEntry { + id: entry.id, + conf_change: c, + }), + } + } +} + +/// Command entry type +#[derive(Debug)] +pub struct CommandEntry { + /// The propose id + id: ProposeId, + /// The command + cmd: Arc, +} + +impl Clone for CommandEntry { + #[inline] + fn clone(&self) -> Self { + Self { + id: self.id, + cmd: Arc::clone(&self.cmd), + } + } +} + +impl Deref for CommandEntry { + type Target = C; + + #[inline] + fn deref(&self) -> &Self::Target { + &self.cmd + } +} + +impl AsRef for CommandEntry { + #[inline] + fn as_ref(&self) -> &C { + self.cmd.as_ref() + } +} + +impl From> for PoolEntry { + fn from(entry: CommandEntry) -> Self { + PoolEntry { + id: entry.id, + inner: PoolEntryInner::Command(entry.cmd), + } + } +} + +impl std::hash::Hash for CommandEntry { + #[inline] + fn hash(&self, state: &mut H) { + self.id.hash(state); + } +} + +impl PartialEq for CommandEntry { + #[inline] + fn eq(&self, other: &Self) -> bool { + self.id.eq(&other.id) + } +} + +impl Eq for CommandEntry {} + +/// Conf change entry type +#[derive(Clone, PartialEq)] +pub(super) struct ConfChangeEntry { + /// The propose id + id: ProposeId, + /// The conf change entry + conf_change: Vec, +} + +impl From for PoolEntry { + fn from(entry: ConfChangeEntry) -> Self { + PoolEntry { + id: entry.id, + inner: PoolEntryInner::ConfChange(entry.conf_change), + } + } +} diff --git a/crates/curp/src/server/conflict/spec_pool_new.rs b/crates/curp/src/server/conflict/spec_pool_new.rs new file mode 100644 index 000000000..a968a7c47 --- /dev/null +++ b/crates/curp/src/server/conflict/spec_pool_new.rs @@ -0,0 +1,129 @@ +use curp_external_api::conflict::SpeculativePool; + +use crate::rpc::PoolEntry; + +use super::{CommandEntry, ConfChangeEntry, SplitEntry}; + +/// A speculative pool object +pub type SpObject = Box> + Send + 'static>; + +/// Union type of `SpeculativePool` objects +pub(crate) struct SpecPool { + /// Command speculative pools + command_sps: Vec>, + /// Conf change speculative pool + conf_change_sp: ConfChangeSpecPool, +} + +impl SpecPool { + /// Creates a new pool + pub(crate) fn new(command_sps: Vec>) -> Self { + Self { + command_sps, + conf_change_sp: ConfChangeSpecPool::default(), + } + } + + /// Inserts an entry into the pool + pub(crate) fn insert(&mut self, entry: PoolEntry) -> Option> { + if !self.conf_change_sp.is_empty() { + return Some(entry); + } + + match SplitEntry::from(entry) { + SplitEntry::Command(c) => { + for csp in &mut self.command_sps { + if let Some(e) = csp.insert(c.clone()) { + return Some(e.into()); + } + } + } + SplitEntry::ConfChange(c) => { + if !self + .command_sps + .iter() + .map(AsRef::as_ref) + .all(SpeculativePool::is_empty) + { + return Some(c.into()); + } + let _ignore = self.conf_change_sp.insert(c); + } + } + + None + } + + // TODO: Use reference instead of clone + /// Removes an entry from the pool + pub(crate) fn remove(&mut self, entry: PoolEntry) { + match SplitEntry::from(entry) { + SplitEntry::Command(c) => { + for csp in &mut self.command_sps { + csp.remove(c.clone()); + } + } + SplitEntry::ConfChange(c) => { + self.conf_change_sp.remove(c); + } + } + } + + /// Returns all entries in the pool + pub(crate) fn all(&self) -> Vec> { + let mut entries = Vec::new(); + for csp in &self.command_sps { + entries.extend(csp.all().into_iter().map(Into::into)); + } + entries.extend(self.conf_change_sp.all().into_iter().map(Into::into)); + entries + } + + /// Returns the number of entries in the pool + #[allow(clippy::arithmetic_side_effects)] // Pool sizes can't overflow a `usize` + pub(crate) fn len(&self) -> usize { + self.command_sps + .iter() + .fold(0, |sum, pool| sum + pool.len()) + + self.conf_change_sp.len() + } +} + +/// Speculative pool for conf change entries +#[derive(Default)] +struct ConfChangeSpecPool { + /// Store current conf change + change: Option, +} + +impl SpeculativePool for ConfChangeSpecPool { + type Entry = ConfChangeEntry; + + fn insert(&mut self, entry: Self::Entry) -> Option { + if self.change.is_some() { + return Some(entry); + } + self.change = Some(entry); + None + } + + fn is_empty(&self) -> bool { + self.change.is_none() + } + + fn remove(&mut self, _entry: Self::Entry) { + self.change = None; + } + + fn all(&self) -> Vec { + self.change.clone().into_iter().collect() + } + + fn clear(&mut self) { + self.change = None; + } + + fn len(&self) -> usize { + self.change.iter().count() + } +} diff --git a/crates/curp/src/server/conflict/uncommitted_pool.rs b/crates/curp/src/server/conflict/uncommitted_pool.rs new file mode 100644 index 000000000..69c946de9 --- /dev/null +++ b/crates/curp/src/server/conflict/uncommitted_pool.rs @@ -0,0 +1,166 @@ +use curp_external_api::conflict::UncommittedPool; + +use crate::rpc::PoolEntry; + +use super::{CommandEntry, ConfChangeEntry, SplitEntry}; + +/// A ucpeculative pool object +pub type UcpObject = Box> + Send + 'static>; + +/// Union type of `UncommittedPool` objects +pub(crate) struct UncomPool { + /// Command uncommitted pools + command_ucps: Vec>, + /// Conf change uncommitted pools + conf_change_ucp: ConfChangeUncomPool, +} + +impl UncomPool { + /// Creates a new `UncomPool` + pub(crate) fn new(command_ucps: Vec>) -> Self { + Self { + command_ucps, + conf_change_ucp: ConfChangeUncomPool::default(), + } + } + + /// Insert an entry into the pool + pub(crate) fn insert(&mut self, entry: PoolEntry) -> bool { + let mut conflict = false; + + conflict |= !self.conf_change_ucp.is_empty(); + + match SplitEntry::from(entry) { + SplitEntry::Command(c) => { + for cucp in &mut self.command_ucps { + conflict |= cucp.insert(c.clone()); + } + } + SplitEntry::ConfChange(c) => { + let _ignore = self.conf_change_ucp.insert(c); + conflict |= !self + .command_ucps + .iter() + .map(AsRef::as_ref) + .all(UncommittedPool::is_empty); + } + } + + conflict + } + + /// Removes an entry from the pool + pub(crate) fn remove(&mut self, entry: PoolEntry) { + match SplitEntry::from(entry) { + SplitEntry::Command(c) => { + for cucp in &mut self.command_ucps { + cucp.remove(c.clone()); + } + } + SplitEntry::ConfChange(c) => { + self.conf_change_ucp.remove(c); + } + } + } + + /// Returns all entries in the pool that conflict with the given entry + pub(crate) fn all_conflict(&self, entry: PoolEntry) -> Vec> { + match SplitEntry::from(entry) { + // A command entry conflict with other conflict entries plus all conf change entries + SplitEntry::Command(ref c) => self + .conf_change_ucp + .all() + .into_iter() + .map(Into::into) + .chain( + self.command_ucps + .iter() + .flat_map(|p| p.all_conflict(c)) + .map(Into::into), + ) + .collect(), + // A conf change entry conflict with all other entries + SplitEntry::ConfChange(_) => self + .conf_change_ucp + .all() + .into_iter() + .map(Into::into) + .chain( + self.command_ucps + .iter() + .map(AsRef::as_ref) + .flat_map(UncommittedPool::all) + .map(Into::into), + ) + .collect(), + } + } + + #[cfg(test)] + /// Gets all entries in the pool + pub(crate) fn all(&self) -> Vec> { + let mut entries = Vec::new(); + for csp in &self.command_ucps { + entries.extend(csp.all().into_iter().map(Into::into)); + } + entries.extend(self.conf_change_ucp.all().into_iter().map(Into::into)); + entries + } + + #[cfg(test)] + /// Returns `true` if the pool is empty + pub(crate) fn is_empty(&self) -> bool { + self.command_ucps.iter().all(|ucp| ucp.is_empty()) && self.conf_change_ucp.is_empty() + } + + /// Clears all entries in the pool + pub(crate) fn clear(&mut self) { + for ucp in &mut self.command_ucps { + ucp.clear(); + } + self.conf_change_ucp.clear(); + } +} + +/// Conf change uncommitted pool +#[derive(Default)] +struct ConfChangeUncomPool { + /// entry count + conf_changes: Vec, +} + +impl UncommittedPool for ConfChangeUncomPool { + type Entry = ConfChangeEntry; + + fn insert(&mut self, entry: Self::Entry) -> bool { + let conflict = !self.conf_changes.is_empty(); + self.conf_changes.push(entry); + conflict + } + + fn is_empty(&self) -> bool { + self.conf_changes.is_empty() + } + + fn remove(&mut self, entry: Self::Entry) { + if let Some(pos) = self.conf_changes.iter().position(|x| *x == entry) { + let _ignore = self.conf_changes.remove(pos); + } + } + + fn all_conflict(&self, _entry: &Self::Entry) -> Vec { + self.conf_changes.clone() + } + + fn all(&self) -> Vec { + self.conf_changes.clone() + } + + fn clear(&mut self) { + self.conf_changes.clear(); + } + + fn len(&self) -> usize { + self.conf_changes.len() + } +} diff --git a/crates/curp/src/server/mod.rs b/crates/curp/src/server/mod.rs index 66470ceec..a9933cbe3 100644 --- a/crates/curp/src/server/mod.rs +++ b/crates/curp/src/server/mod.rs @@ -35,6 +35,9 @@ mod raw_curp; /// Command board is the buffer to store command execution result mod cmd_board; +/// Conflict pools +pub mod conflict; + /// Speculative pool mod spec_pool;