-
Notifications
You must be signed in to change notification settings - Fork 76
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor: implement conflict pools in curp
Signed-off-by: bsbds <[email protected]>
- Loading branch information
Showing
4 changed files
with
412 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
#![allow(unused)] | ||
#![allow(unreachable_pub)] | ||
|
||
/// Speculative pool | ||
pub(crate) mod spec_pool_new; | ||
|
||
/// Uncommitted pool | ||
pub(crate) mod uncommitted_pool; | ||
|
||
use std::{ops::Deref, sync::Arc}; | ||
|
||
use crate::rpc::{ConfChange, PoolEntry, PoolEntryInner, ProposeId}; | ||
|
||
// TODO: relpace `PoolEntry` with this | ||
/// Entry stored in conflict pools | ||
pub(super) enum SplitEntry<C> { | ||
/// A command entry | ||
Command(CommandEntry<C>), | ||
/// A conf change entry | ||
ConfChange(ConfChangeEntry), | ||
} | ||
|
||
impl<C> From<PoolEntry<C>> for SplitEntry<C> { | ||
fn from(entry: PoolEntry<C>) -> Self { | ||
match entry.inner { | ||
PoolEntryInner::Command(c) => SplitEntry::Command(CommandEntry { | ||
id: entry.id, | ||
cmd: c, | ||
}), | ||
PoolEntryInner::ConfChange(c) => SplitEntry::ConfChange(ConfChangeEntry { | ||
id: entry.id, | ||
conf_change: c, | ||
}), | ||
} | ||
} | ||
} | ||
|
||
/// Command entry type | ||
#[derive(Debug)] | ||
pub struct CommandEntry<C> { | ||
/// The propose id | ||
id: ProposeId, | ||
/// The command | ||
cmd: Arc<C>, | ||
} | ||
|
||
impl<C> Clone for CommandEntry<C> { | ||
#[inline] | ||
fn clone(&self) -> Self { | ||
Self { | ||
id: self.id, | ||
cmd: Arc::clone(&self.cmd), | ||
} | ||
} | ||
} | ||
|
||
impl<C> Deref for CommandEntry<C> { | ||
type Target = C; | ||
|
||
#[inline] | ||
fn deref(&self) -> &Self::Target { | ||
&self.cmd | ||
} | ||
} | ||
|
||
impl<C> AsRef<C> for CommandEntry<C> { | ||
#[inline] | ||
fn as_ref(&self) -> &C { | ||
self.cmd.as_ref() | ||
} | ||
} | ||
|
||
impl<C> From<CommandEntry<C>> for PoolEntry<C> { | ||
fn from(entry: CommandEntry<C>) -> Self { | ||
PoolEntry { | ||
id: entry.id, | ||
inner: PoolEntryInner::Command(entry.cmd), | ||
} | ||
} | ||
} | ||
|
||
impl<C> std::hash::Hash for CommandEntry<C> { | ||
#[inline] | ||
fn hash<H: std::hash::Hasher>(&self, state: &mut H) { | ||
self.id.hash(state); | ||
} | ||
} | ||
|
||
impl<C> PartialEq for CommandEntry<C> { | ||
#[inline] | ||
fn eq(&self, other: &Self) -> bool { | ||
self.id.eq(&other.id) | ||
} | ||
} | ||
|
||
impl<C> Eq for CommandEntry<C> {} | ||
|
||
/// Conf change entry type | ||
#[derive(Clone, PartialEq)] | ||
pub(super) struct ConfChangeEntry { | ||
/// The propose id | ||
id: ProposeId, | ||
/// The conf change entry | ||
conf_change: Vec<ConfChange>, | ||
} | ||
|
||
impl<C> From<ConfChangeEntry> for PoolEntry<C> { | ||
fn from(entry: ConfChangeEntry) -> Self { | ||
PoolEntry { | ||
id: entry.id, | ||
inner: PoolEntryInner::ConfChange(entry.conf_change), | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
use curp_external_api::conflict::SpeculativePool; | ||
|
||
use crate::rpc::PoolEntry; | ||
|
||
use super::{CommandEntry, ConfChangeEntry, SplitEntry}; | ||
|
||
/// A speculative pool object | ||
pub type SpObject<C> = Box<dyn SpeculativePool<Entry = CommandEntry<C>> + Send + 'static>; | ||
|
||
/// Union type of `SpeculativePool` objects | ||
pub(crate) struct SpecPool<C> { | ||
/// Command speculative pools | ||
command_sps: Vec<SpObject<C>>, | ||
/// Conf change speculative pool | ||
conf_change_sp: ConfChangeSpecPool, | ||
} | ||
|
||
impl<C> SpecPool<C> { | ||
/// Creates a new pool | ||
pub(crate) fn new(command_sps: Vec<SpObject<C>>) -> Self { | ||
Self { | ||
command_sps, | ||
conf_change_sp: ConfChangeSpecPool::default(), | ||
} | ||
} | ||
|
||
/// Inserts an entry into the pool | ||
pub(crate) fn insert(&mut self, entry: PoolEntry<C>) -> Option<PoolEntry<C>> { | ||
if !self.conf_change_sp.is_empty() { | ||
return Some(entry); | ||
} | ||
|
||
match SplitEntry::from(entry) { | ||
SplitEntry::Command(c) => { | ||
for csp in &mut self.command_sps { | ||
if let Some(e) = csp.insert(c.clone()) { | ||
return Some(e.into()); | ||
} | ||
} | ||
} | ||
SplitEntry::ConfChange(c) => { | ||
if !self | ||
.command_sps | ||
.iter() | ||
.map(AsRef::as_ref) | ||
.all(SpeculativePool::is_empty) | ||
{ | ||
return Some(c.into()); | ||
} | ||
let _ignore = self.conf_change_sp.insert(c); | ||
} | ||
} | ||
|
||
None | ||
} | ||
|
||
// TODO: Use reference instead of clone | ||
/// Removes an entry from the pool | ||
pub(crate) fn remove(&mut self, entry: PoolEntry<C>) { | ||
match SplitEntry::from(entry) { | ||
SplitEntry::Command(c) => { | ||
for csp in &mut self.command_sps { | ||
csp.remove(c.clone()); | ||
} | ||
} | ||
SplitEntry::ConfChange(c) => { | ||
self.conf_change_sp.remove(c); | ||
} | ||
} | ||
} | ||
|
||
/// Returns all entries in the pool | ||
pub(crate) fn all(&self) -> Vec<PoolEntry<C>> { | ||
let mut entries = Vec::new(); | ||
for csp in &self.command_sps { | ||
entries.extend(csp.all().into_iter().map(Into::into)); | ||
} | ||
entries.extend(self.conf_change_sp.all().into_iter().map(Into::into)); | ||
entries | ||
} | ||
|
||
/// Returns the number of entries in the pool | ||
#[allow(clippy::arithmetic_side_effects)] // Pool sizes can't overflow a `usize` | ||
pub(crate) fn len(&self) -> usize { | ||
self.command_sps | ||
.iter() | ||
.fold(0, |sum, pool| sum + pool.len()) | ||
+ self.conf_change_sp.len() | ||
} | ||
} | ||
|
||
/// Speculative pool for conf change entries | ||
#[derive(Default)] | ||
struct ConfChangeSpecPool { | ||
/// Store current conf change | ||
change: Option<ConfChangeEntry>, | ||
} | ||
|
||
impl SpeculativePool for ConfChangeSpecPool { | ||
type Entry = ConfChangeEntry; | ||
|
||
fn insert(&mut self, entry: Self::Entry) -> Option<Self::Entry> { | ||
if self.change.is_some() { | ||
return Some(entry); | ||
} | ||
self.change = Some(entry); | ||
None | ||
} | ||
|
||
fn is_empty(&self) -> bool { | ||
self.change.is_none() | ||
} | ||
|
||
fn remove(&mut self, _entry: Self::Entry) { | ||
self.change = None; | ||
} | ||
|
||
fn all(&self) -> Vec<Self::Entry> { | ||
self.change.clone().into_iter().collect() | ||
} | ||
|
||
fn clear(&mut self) { | ||
self.change = None; | ||
} | ||
|
||
fn len(&self) -> usize { | ||
self.change.iter().count() | ||
} | ||
} |
Oops, something went wrong.