Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: New curp conflict pool #675

Merged
merged 2 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions crates/curp-external-api/src/conflict.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#![allow(clippy::module_name_repetitions)]

/// Common operations for conflict pools
pub trait ConflictPoolOp {
/// Entry of the pool
type Entry;

/// Removes a command from the pool
fn remove(&mut self, entry: Self::Entry);

/// Returns all commands in the pool
fn all(&self) -> Vec<Self::Entry>;

/// Clears all entries in the pool
fn clear(&mut self);

/// Returns the number of commands in the pool
fn len(&self) -> usize;

/// Checks if the pool contains some commands that will conflict with all other commands
fn is_empty(&self) -> bool;
}

/// Speculative pool operations
pub trait SpeculativePoolOp: ConflictPoolOp {
/// Inserts a command in to the pool
///
/// Returns the entry if a conflict is detected
fn insert_if_not_conflict(&mut self, entry: Self::Entry) -> Option<Self::Entry>;
}

/// Uncommitted pool operations
pub trait UncommittedPoolOp: ConflictPoolOp {
/// Inserts a command in to the pool
///
/// Returns `true` if a conflict is detected
fn insert(&mut self, entry: Self::Entry) -> bool;

/// Returns all commands in the pool that conflicts with the given command
fn all_conflict(&self, entry: &Self::Entry) -> Vec<Self::Entry>;
}
3 changes: 3 additions & 0 deletions crates/curp-external-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,6 @@ pub type InflightId = u64;
pub mod cmd;
/// The command to be executed
pub mod role_change;

/// Conflict trait
pub mod conflict;
12 changes: 8 additions & 4 deletions crates/curp/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,7 @@ impl From<CurpError> for tonic::Status {

/// Entry of speculative pool
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(test, derive(PartialEq))]
pub(crate) struct PoolEntry<C> {
/// Propose id
pub(crate) id: ProposeId,
Expand All @@ -844,25 +845,28 @@ pub(crate) struct PoolEntry<C> {

/// Inner entry of speculative pool
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(test, derive(PartialEq))]
pub(crate) enum PoolEntryInner<C> {
/// Command entry
Command(Arc<C>),
/// ConfChange entry
ConfChange(Vec<ConfChange>),
}

impl<C> PoolEntry<C>
where
C: Command,
{
impl<C> PoolEntry<C> {
/// Create a new pool entry
pub(crate) fn new(id: ProposeId, inner: impl Into<PoolEntryInner<C>>) -> Self {
Self {
id,
inner: inner.into(),
}
}
}

impl<C> PoolEntry<C>
where
C: Command,
{
/// Check if the entry is conflict with the command
pub(crate) fn is_conflict_with_cmd(&self, c: &C) -> bool {
match self.inner {
Expand Down
101 changes: 101 additions & 0 deletions crates/curp/src/server/conflict/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
#![allow(unused)]
#![allow(unreachable_pub)]

/// Speculative pool
pub(crate) mod spec_pool_new;

/// Uncommitted pool
pub(crate) mod uncommitted_pool;

#[cfg(test)]
mod tests;

use std::{ops::Deref, sync::Arc};

use crate::rpc::{ConfChange, PoolEntry, PoolEntryInner, ProposeId};

// TODO: relpace `PoolEntry` with this
/// Entry stored in conflict pools
pub(super) enum ConflictPoolEntry<C> {
/// A command entry
Command(CommandEntry<C>),
/// A conf change entry
ConfChange(ConfChangeEntry),
}

impl<C> From<PoolEntry<C>> for ConflictPoolEntry<C> {
fn from(entry: PoolEntry<C>) -> Self {
match entry.inner {
PoolEntryInner::Command(c) => ConflictPoolEntry::Command(CommandEntry {
id: entry.id,
cmd: c,
}),
PoolEntryInner::ConfChange(c) => ConflictPoolEntry::ConfChange(ConfChangeEntry {
id: entry.id,
conf_change: c,
}),
}
}
}

/// Command entry type
#[derive(Debug)]
pub struct CommandEntry<C> {
/// The propose id
id: ProposeId,
/// The command
cmd: Arc<C>,
}

impl<C> Clone for CommandEntry<C> {
#[inline]
fn clone(&self) -> Self {
Self {
id: self.id,
cmd: Arc::clone(&self.cmd),
}
}
}

impl<C> Deref for CommandEntry<C> {
type Target = C;

#[inline]
fn deref(&self) -> &Self::Target {
&self.cmd
}
}

impl<C> AsRef<C> for CommandEntry<C> {
#[inline]
fn as_ref(&self) -> &C {
self.cmd.as_ref()
}
}

impl<C> From<CommandEntry<C>> for PoolEntry<C> {
fn from(entry: CommandEntry<C>) -> Self {
PoolEntry {
id: entry.id,
inner: PoolEntryInner::Command(entry.cmd),
}
}
}

/// Conf change entry type
#[derive(Clone, PartialEq)]
pub(super) struct ConfChangeEntry {
/// The propose id
id: ProposeId,
/// The conf change entry
conf_change: Vec<ConfChange>,
}

impl<C> From<ConfChangeEntry> for PoolEntry<C> {
fn from(entry: ConfChangeEntry) -> Self {
PoolEntry {
id: entry.id,
inner: PoolEntryInner::ConfChange(entry.conf_change),
}
}
}
131 changes: 131 additions & 0 deletions crates/curp/src/server/conflict/spec_pool_new.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
use curp_external_api::conflict::{ConflictPoolOp, SpeculativePoolOp};

use crate::rpc::PoolEntry;

use super::{CommandEntry, ConfChangeEntry, ConflictPoolEntry};

/// A speculative pool object
pub type SpObject<C> = Box<dyn SpeculativePoolOp<Entry = CommandEntry<C>> + Send + 'static>;

/// Union type of `SpeculativePool` objects
pub(crate) struct SpeculativePool<C> {
/// Command speculative pools
command_sps: Vec<SpObject<C>>,
Phoenix500526 marked this conversation as resolved.
Show resolved Hide resolved
/// Conf change speculative pool
conf_change_sp: ConfChangeSp,
}

impl<C> SpeculativePool<C> {
/// Creates a new pool
pub(crate) fn new(command_sps: Vec<SpObject<C>>) -> Self {
Self {
command_sps,
conf_change_sp: ConfChangeSp::default(),
}
}

/// Inserts an entry into the pool
pub(crate) fn insert(&mut self, entry: PoolEntry<C>) -> Option<PoolEntry<C>> {
if !self.conf_change_sp.is_empty() {
return Some(entry);
}

match ConflictPoolEntry::from(entry) {
ConflictPoolEntry::Command(c) => {
for csp in &mut self.command_sps {
if let Some(e) = csp.insert_if_not_conflict(c.clone()) {
return Some(e.into());
}
}
}
ConflictPoolEntry::ConfChange(c) => {
if !self
.command_sps
.iter()
.map(AsRef::as_ref)
.all(ConflictPoolOp::is_empty)
{
return Some(c.into());
}
let _ignore = self.conf_change_sp.insert_if_not_conflict(c);
}
}

None
}

// TODO: Use reference instead of clone
/// Removes an entry from the pool
pub(crate) fn remove(&mut self, entry: PoolEntry<C>) {
match ConflictPoolEntry::from(entry) {
ConflictPoolEntry::Command(c) => {
for csp in &mut self.command_sps {
csp.remove(c.clone());
}
}
ConflictPoolEntry::ConfChange(c) => {
self.conf_change_sp.remove(c);
}
}
}

/// Returns all entries in the pool
pub(crate) fn all(&self) -> Vec<PoolEntry<C>> {
let mut entries = Vec::new();
for csp in &self.command_sps {
entries.extend(csp.all().into_iter().map(Into::into));
}
entries.extend(self.conf_change_sp.all().into_iter().map(Into::into));
entries
}

/// Returns the number of entries in the pool
#[allow(clippy::arithmetic_side_effects)] // Pool sizes can't overflow a `usize`
pub(crate) fn len(&self) -> usize {
self.command_sps
.iter()
.fold(0, |sum, pool| sum + pool.len())
+ self.conf_change_sp.len()
}
}

/// Speculative pool for conf change entries
#[derive(Default)]
struct ConfChangeSp {
/// Store current conf change
change: Option<ConfChangeEntry>,
}

impl ConflictPoolOp for ConfChangeSp {
type Entry = ConfChangeEntry;

fn is_empty(&self) -> bool {
self.change.is_none()
}

fn remove(&mut self, _entry: Self::Entry) {
self.change = None;
}

fn all(&self) -> Vec<Self::Entry> {
self.change.clone().into_iter().collect()
}

fn clear(&mut self) {
self.change = None;
}

fn len(&self) -> usize {
self.change.iter().count()
}
}

impl SpeculativePoolOp for ConfChangeSp {
fn insert_if_not_conflict(&mut self, entry: Self::Entry) -> Option<Self::Entry> {
if self.change.is_some() {
return Some(entry);
}
self.change = Some(entry);
None
}
}
Loading
Loading