Skip to content

Commit

Permalink
refactor: implement conflict pools in curp
Browse files Browse the repository at this point in the history
Signed-off-by: bsbds <[email protected]>
  • Loading branch information
bsbds committed Mar 6, 2024
1 parent cd217d8 commit bca6be2
Show file tree
Hide file tree
Showing 4 changed files with 412 additions and 0 deletions.
114 changes: 114 additions & 0 deletions crates/curp/src/server/conflict/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
#![allow(unused)]
#![allow(unreachable_pub)]

/// Speculative pool
pub(crate) mod spec_pool_new;

/// Uncommitted pool
pub(crate) mod uncommitted_pool;

use std::{ops::Deref, sync::Arc};

use crate::rpc::{ConfChange, PoolEntry, PoolEntryInner, ProposeId};

// TODO: relpace `PoolEntry` with this
/// Entry stored in conflict pools
pub(super) enum SplitEntry<C> {
/// A command entry
Command(CommandEntry<C>),
/// A conf change entry
ConfChange(ConfChangeEntry),
}

impl<C> From<PoolEntry<C>> for SplitEntry<C> {
fn from(entry: PoolEntry<C>) -> Self {
match entry.inner {
PoolEntryInner::Command(c) => SplitEntry::Command(CommandEntry {
id: entry.id,
cmd: c,
}),
PoolEntryInner::ConfChange(c) => SplitEntry::ConfChange(ConfChangeEntry {
id: entry.id,
conf_change: c,
}),
}
}
}

/// Command entry type
#[derive(Debug)]
pub struct CommandEntry<C> {
/// The propose id
id: ProposeId,
/// The command
cmd: Arc<C>,
}

impl<C> Clone for CommandEntry<C> {
#[inline]
fn clone(&self) -> Self {
Self {
id: self.id,
cmd: Arc::clone(&self.cmd),
}
}
}

impl<C> Deref for CommandEntry<C> {
type Target = C;

#[inline]
fn deref(&self) -> &Self::Target {
&self.cmd
}
}

impl<C> AsRef<C> for CommandEntry<C> {
#[inline]
fn as_ref(&self) -> &C {
self.cmd.as_ref()
}
}

impl<C> From<CommandEntry<C>> for PoolEntry<C> {
fn from(entry: CommandEntry<C>) -> Self {
PoolEntry {
id: entry.id,
inner: PoolEntryInner::Command(entry.cmd),
}
}
}

impl<C> std::hash::Hash for CommandEntry<C> {
#[inline]
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.id.hash(state);
}
}

impl<C> PartialEq for CommandEntry<C> {
#[inline]
fn eq(&self, other: &Self) -> bool {
self.id.eq(&other.id)
}
}

impl<C> Eq for CommandEntry<C> {}

/// Conf change entry type
#[derive(Clone, PartialEq)]
pub(super) struct ConfChangeEntry {
/// The propose id
id: ProposeId,
/// The conf change entry
conf_change: Vec<ConfChange>,
}

impl<C> From<ConfChangeEntry> for PoolEntry<C> {
fn from(entry: ConfChangeEntry) -> Self {
PoolEntry {
id: entry.id,
inner: PoolEntryInner::ConfChange(entry.conf_change),
}
}
}
129 changes: 129 additions & 0 deletions crates/curp/src/server/conflict/spec_pool_new.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
use curp_external_api::conflict::SpeculativePool;

use crate::rpc::PoolEntry;

use super::{CommandEntry, ConfChangeEntry, SplitEntry};

/// A speculative pool object
pub type SpObject<C> = Box<dyn SpeculativePool<Entry = CommandEntry<C>> + Send + 'static>;

/// Union type of `SpeculativePool` objects
pub(crate) struct SpecPool<C> {
/// Command speculative pools
command_sps: Vec<SpObject<C>>,
/// Conf change speculative pool
conf_change_sp: ConfChangeSpecPool,
}

impl<C> SpecPool<C> {
/// Creates a new pool
pub(crate) fn new(command_sps: Vec<SpObject<C>>) -> Self {
Self {
command_sps,
conf_change_sp: ConfChangeSpecPool::default(),
}
}

/// Inserts an entry into the pool
pub(crate) fn insert(&mut self, entry: PoolEntry<C>) -> Option<PoolEntry<C>> {
if !self.conf_change_sp.is_empty() {
return Some(entry);
}

match SplitEntry::from(entry) {
SplitEntry::Command(c) => {
for csp in &mut self.command_sps {
if let Some(e) = csp.insert(c.clone()) {
return Some(e.into());
}
}
}
SplitEntry::ConfChange(c) => {
if !self
.command_sps
.iter()
.map(AsRef::as_ref)
.all(SpeculativePool::is_empty)
{
return Some(c.into());
}
let _ignore = self.conf_change_sp.insert(c);
}
}

None
}

// TODO: Use reference instead of clone
/// Removes an entry from the pool
pub(crate) fn remove(&mut self, entry: PoolEntry<C>) {
match SplitEntry::from(entry) {
SplitEntry::Command(c) => {
for csp in &mut self.command_sps {
csp.remove(c.clone());
}
}
SplitEntry::ConfChange(c) => {
self.conf_change_sp.remove(c);
}
}
}

/// Returns all entries in the pool
pub(crate) fn all(&self) -> Vec<PoolEntry<C>> {
let mut entries = Vec::new();
for csp in &self.command_sps {
entries.extend(csp.all().into_iter().map(Into::into));
}
entries.extend(self.conf_change_sp.all().into_iter().map(Into::into));
entries
}

/// Returns the number of entries in the pool
#[allow(clippy::arithmetic_side_effects)] // Pool sizes can't overflow a `usize`
pub(crate) fn len(&self) -> usize {
self.command_sps
.iter()
.fold(0, |sum, pool| sum + pool.len())
+ self.conf_change_sp.len()
}
}

/// Speculative pool for conf change entries
#[derive(Default)]
struct ConfChangeSpecPool {
/// Store current conf change
change: Option<ConfChangeEntry>,
}

impl SpeculativePool for ConfChangeSpecPool {
type Entry = ConfChangeEntry;

fn insert(&mut self, entry: Self::Entry) -> Option<Self::Entry> {
if self.change.is_some() {
return Some(entry);
}
self.change = Some(entry);
None
}

fn is_empty(&self) -> bool {
self.change.is_none()
}

fn remove(&mut self, _entry: Self::Entry) {
self.change = None;
}

fn all(&self) -> Vec<Self::Entry> {
self.change.clone().into_iter().collect()
}

fn clear(&mut self) {
self.change = None;
}

fn len(&self) -> usize {
self.change.iter().count()
}
}
Loading

0 comments on commit bca6be2

Please sign in to comment.