Skip to content

Commit

Permalink
refactor: enable new sp/ucp implementation
Browse files Browse the repository at this point in the history
Signed-off-by: bsbds <[email protected]>
  • Loading branch information
bsbds committed Apr 28, 2024
1 parent f48a96a commit 2ceee54
Show file tree
Hide file tree
Showing 15 changed files with 209 additions and 304 deletions.
7 changes: 7 additions & 0 deletions crates/curp-test-utils/src/test_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,13 @@ impl Command for TestCommand {
fn keys(&self) -> &[Self::K] {
&self.keys
}

fn is_read_only(&self) -> bool {
match self.cmd_type {
TestCommandType::Get => true,
TestCommandType::Put(_) => false,
}
}
}

impl ConflictCheck for TestCommand {
Expand Down
13 changes: 0 additions & 13 deletions crates/curp/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -863,19 +863,6 @@ impl<C> PoolEntry<C> {
}
}

impl<C> PoolEntry<C>
where
C: Command,
{
/// Check if the entry is conflict with the command
pub(crate) fn is_conflict_with_cmd(&self, c: &C) -> bool {
match self.inner {
PoolEntryInner::Command(ref cmd) => cmd.is_conflict(c),
PoolEntryInner::ConfChange(ref _conf_change) => true,
}
}
}

impl<C> ConflictCheck for PoolEntry<C>
where
C: ConflictCheck,
Expand Down
32 changes: 23 additions & 9 deletions crates/curp/src/server/cmd_worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
cmd::{Command, CommandExecutor},
log_entry::{EntryData, LogEntry},
role_change::RoleChange,
rpc::ConfChangeType,
rpc::{ConfChangeType, PoolEntry},
server::cmd_worker::conflict_checked_mpmc::TaskType,
snapshot::{Snapshot, SnapshotMeta},
};
Expand Down Expand Up @@ -108,7 +108,11 @@ async fn worker_exe<C: Command, CE: CommandExecutor<C>, RC: RoleChange>(
ce: &CE,
curp: &RawCurp<C, RC>,
) -> bool {
let (cb, sp, ucp) = (curp.cmd_board(), curp.spec_pool(), curp.uncommitted_pool());
let (cb, sp, ucp) = (
curp.cmd_board(),
curp.new_spec_pool(),
curp.new_uncommitted_pool(),
);
let id = curp.id();
let success = match entry.entry_data {
EntryData::Command(ref cmd) => {
Expand All @@ -120,8 +124,10 @@ async fn worker_exe<C: Command, CE: CommandExecutor<C>, RC: RoleChange>(
let er_ok = er.is_ok();
cb.write().insert_er(entry.propose_id, er);
if !er_ok {
sp.lock().remove(&entry.propose_id);
let _ig = ucp.lock().remove(&entry.propose_id);
sp.lock()
.remove(PoolEntry::new(entry.propose_id, Arc::clone(cmd)));
ucp.lock()
.remove(PoolEntry::new(entry.propose_id, Arc::clone(cmd)));
}
debug!(
"{id} cmd({}) is speculatively executed, exe status: {er_ok}",
Expand All @@ -147,7 +153,11 @@ async fn worker_as<C: Command, CE: CommandExecutor<C>, RC: RoleChange>(
ce: &CE,
curp: &RawCurp<C, RC>,
) -> bool {
let (cb, sp, ucp) = (curp.cmd_board(), curp.spec_pool(), curp.uncommitted_pool());
let (cb, sp, ucp) = (
curp.cmd_board(),
curp.new_spec_pool(),
curp.new_uncommitted_pool(),
);
let id = curp.id();
let success = match entry.entry_data {
EntryData::Command(ref cmd) => {
Expand All @@ -157,8 +167,10 @@ async fn worker_as<C: Command, CE: CommandExecutor<C>, RC: RoleChange>(
let asr = ce.after_sync(cmd.as_ref(), entry.index, prepare).await;
let asr_ok = asr.is_ok();
cb.write().insert_asr(entry.propose_id, asr);
sp.lock().remove(&entry.propose_id);
let _ig = ucp.lock().remove(&entry.propose_id);
sp.lock()
.remove(PoolEntry::new(entry.propose_id, Arc::clone(cmd)));
ucp.lock()
.remove(PoolEntry::new(entry.propose_id, Arc::clone(cmd)));
debug!("{id} cmd({}) after sync is called", entry.propose_id);
asr_ok
}
Expand All @@ -184,8 +196,10 @@ async fn worker_as<C: Command, CE: CommandExecutor<C>, RC: RoleChange>(
let shutdown_self =
change.change_type() == ConfChangeType::Remove && change.node_id == id;
cb.write().insert_conf(entry.propose_id);
sp.lock().remove(&entry.propose_id);
let _ig = ucp.lock().remove(&entry.propose_id);
sp.lock()
.remove(PoolEntry::new(entry.propose_id, conf_change.clone()));
ucp.lock()
.remove(PoolEntry::new(entry.propose_id, conf_change.clone()));
if shutdown_self {
if let Some(maybe_new_leader) = curp.pick_new_leader() {
info!(
Expand Down
3 changes: 0 additions & 3 deletions crates/curp/src/server/conflict/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
#![allow(unused)]
#![allow(unreachable_pub)]

/// Speculative pool
pub(crate) mod spec_pool_new;

Expand Down
24 changes: 9 additions & 15 deletions crates/curp/src/server/curp_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ use utils::{
use super::{
cmd_board::{CmdBoardRef, CommandBoard},
cmd_worker::{conflict_checked_mpmc, start_cmd_workers},
gc::{gc_cmd_board, gc_spec_pool},
conflict::spec_pool_new::{SpObject, SpeculativePool},
conflict::uncommitted_pool::{UcpObject, UncommittedPool},
gc::gc_cmd_board,
lease_manager::LeaseManager,
raw_curp::{AppendEntries, RawCurp, UncommittedPool, Vote},
spec_pool::{SpecPoolRef, SpeculativePool},
raw_curp::{AppendEntries, RawCurp, Vote},
storage::StorageApi,
};
use crate::{
Expand Down Expand Up @@ -59,8 +60,6 @@ use crate::{
pub(super) struct CurpNode<C: Command, RC: RoleChange> {
/// `RawCurp` state machine
curp: Arc<RawCurp<C, RC>>,
/// The speculative cmd pool, shared with executor
spec_pool: SpecPoolRef<C>,
/// Cmd watch board for tracking the cmd sync results
cmd_board: CmdBoardRef<C>,
/// CE event tx,
Expand Down Expand Up @@ -336,7 +335,7 @@ impl<C: Command, RC: RoleChange> CurpNode<C, RC> {
) -> Result<FetchReadStateResponse, CurpError> {
self.check_cluster_version(req.cluster_version)?;
let cmd = req.cmd()?;
let state = self.curp.handle_fetch_read_state(&cmd);
let state = self.curp.handle_fetch_read_state(Arc::new(cmd));
Ok(FetchReadStateResponse::new(state))
}

Expand Down Expand Up @@ -616,6 +615,8 @@ impl<C: Command, RC: RoleChange> CurpNode<C, RC> {
storage: Arc<DB<C>>,
task_manager: Arc<TaskManager>,
client_tls_config: Option<ClientTlsConfig>,
sps: Vec<SpObject<C>>,
ucps: Vec<UcpObject<C>>,
) -> Result<Self, CurpError> {
let sync_events = cluster_info
.peers_ids()
Expand All @@ -628,9 +629,7 @@ impl<C: Command, RC: RoleChange> CurpNode<C, RC> {
.collect();
let (log_tx, log_rx) = mpsc::unbounded_channel();
let cmd_board = Arc::new(RwLock::new(CommandBoard::new()));
let spec_pool = Arc::new(Mutex::new(SpeculativePool::new()));
let lease_manager = Arc::new(RwLock::new(LeaseManager::new()));
let uncommitted_pool = Arc::new(Mutex::new(UncommittedPool::new()));
let last_applied = cmd_executor
.last_applied()
.map_err(|e| CurpError::internal(format!("get applied index error, {e}")))?;
Expand All @@ -645,9 +644,7 @@ impl<C: Command, RC: RoleChange> CurpNode<C, RC> {
.cluster_info(Arc::clone(&cluster_info))
.is_leader(is_leader)
.cmd_board(Arc::clone(&cmd_board))
.spec_pool(Arc::clone(&spec_pool))
.lease_manager(lease_manager)
.uncommitted_pool(uncommitted_pool)
.cfg(Arc::clone(&curp_cfg))
.cmd_tx(Arc::clone(&ce_event_tx))
.sync_events(sync_events)
Expand All @@ -660,6 +657,8 @@ impl<C: Command, RC: RoleChange> CurpNode<C, RC> {
.entries(entries)
.curp_storage(Arc::clone(&storage))
.client_tls_config(client_tls_config)
.new_sp(Arc::new(Mutex::new(SpeculativePool::new(sps))))
.new_ucp(Arc::new(Mutex::new(UncommittedPool::new(ucps))))
.build_raw_curp()
.map_err(|e| CurpError::internal(format!("build raw curp failed, {e}")))?,
);
Expand All @@ -671,15 +670,11 @@ impl<C: Command, RC: RoleChange> CurpNode<C, RC> {
task_manager.spawn(TaskName::GcCmdBoard, |n| {
gc_cmd_board(Arc::clone(&cmd_board), curp_cfg.gc_interval, n)
});
task_manager.spawn(TaskName::GcSpecPool, |n| {
gc_spec_pool(Arc::clone(&spec_pool), curp_cfg.gc_interval, n)
});

Self::run_bg_tasks(Arc::clone(&curp), Arc::clone(&storage), log_rx);

Ok(Self {
curp,
spec_pool,
cmd_board,
ce_event_tx,
storage,
Expand Down Expand Up @@ -979,7 +974,6 @@ impl<C: Command, RC: RoleChange> Debug for CurpNode<C, RC> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CurpNode")
.field("raw_curp", &self.curp)
.field("spec_pool", &self.spec_pool)
.field("cmd_board", &self.cmd_board)
.finish()
}
Expand Down
109 changes: 7 additions & 102 deletions crates/curp/src/server/gc.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,10 @@
use std::{collections::HashSet, time::Duration};
use std::time::Duration;

use utils::{parking_lot_lock::MutexMap, task_manager::Listener};
use utils::task_manager::Listener;

use super::spec_pool::SpecPoolRef;
use crate::{cmd::Command, rpc::ProposeId, server::cmd_board::CmdBoardRef};
use crate::{cmd::Command, server::cmd_board::CmdBoardRef};

/// Cleanup spec pool
pub(super) async fn gc_spec_pool<C: Command>(
sp: SpecPoolRef<C>,
interval: Duration,
shutdown_listener: Listener,
) {
let mut last_check: HashSet<ProposeId> =
sp.map_lock(|sp_l| sp_l.pool.keys().copied().collect());
#[allow(clippy::arithmetic_side_effects, clippy::ignored_unit_patterns)]
// introduced by tokio select
loop {
tokio::select! {
_ = tokio::time::sleep(interval) => {}
_ = shutdown_listener.wait() => break,
}
let mut sp = sp.lock();
sp.pool.retain(|k, _v| !last_check.contains(k));

last_check = sp.pool.keys().copied().collect();
}
}
// TODO: Speculative pool GC

/// Cleanup cmd board
pub(super) async fn gc_cmd_board<C: Command>(
Expand Down Expand Up @@ -78,21 +57,16 @@ pub(super) async fn gc_cmd_board<C: Command>(
mod tests {
use std::{sync::Arc, time::Duration};

use curp_test_utils::{
sleep_secs,
test_cmd::{TestCommand, TestCommandResult},
};
use parking_lot::{Mutex, RwLock};
use curp_test_utils::test_cmd::{TestCommand, TestCommandResult};
use parking_lot::RwLock;
use test_macros::abort_on_panic;
use utils::task_manager::{tasks::TaskName, TaskManager};

use super::*;
use crate::{
rpc::{PoolEntry, ProposeId},
rpc::ProposeId,
server::{
cmd_board::{CmdBoardRef, CommandBoard},
gc::gc_cmd_board,
spec_pool::{SpecPoolRef, SpeculativePool},
},
};

Expand Down Expand Up @@ -145,73 +119,4 @@ mod tests {
assert_eq!(*board.asr_buffer.get_index(0).unwrap().0, ProposeId(3, 3));
task_manager.shutdown(true).await;
}

#[tokio::test]
#[abort_on_panic]
async fn spec_gc_test() {
let task_manager = TaskManager::new();
let spec: SpecPoolRef<TestCommand> = Arc::new(Mutex::new(SpeculativePool::new()));
task_manager.spawn(TaskName::GcSpecPool, |n| {
gc_spec_pool(Arc::clone(&spec), Duration::from_millis(500), n)
});

tokio::time::sleep(Duration::from_millis(100)).await;
let cmd1 = Arc::new(TestCommand::default());
spec.lock()
.pool
.insert(ProposeId(0, 1), PoolEntry::new(ProposeId(0, 1), cmd1));

tokio::time::sleep(Duration::from_millis(100)).await;
let cmd2 = Arc::new(TestCommand::default());
spec.lock()
.pool
.insert(ProposeId(0, 2), PoolEntry::new(ProposeId(0, 2), cmd2));

// at 600ms
tokio::time::sleep(Duration::from_millis(400)).await;
let cmd3 = Arc::new(TestCommand::default());
spec.lock()
.pool
.insert(ProposeId(0, 3), PoolEntry::new(ProposeId(0, 3), cmd3));

// at 1100ms, the first two kv should be removed
tokio::time::sleep(Duration::from_millis(500)).await;
let spec = spec.lock();
assert_eq!(spec.pool.len(), 1);
assert!(spec.pool.contains_key(&ProposeId(0, 3)));
task_manager.shutdown(true).await;
}

// To verify #206 is fixed
#[tokio::test]
#[abort_on_panic]
async fn spec_gc_will_not_panic() {
let task_manager = TaskManager::new();
let spec: SpecPoolRef<TestCommand> = Arc::new(Mutex::new(SpeculativePool::new()));

let cmd1 = Arc::new(TestCommand::default());
spec.lock()
.pool
.insert(ProposeId(0, 1), PoolEntry::new(ProposeId(0, 1), cmd1));

tokio::time::sleep(Duration::from_millis(100)).await;
let cmd2 = Arc::new(TestCommand::default());
spec.lock()
.pool
.insert(ProposeId(0, 2), PoolEntry::new(ProposeId(0, 2), cmd2));

let cmd3 = Arc::new(TestCommand::default());
spec.lock()
.pool
.insert(ProposeId(0, 2), PoolEntry::new(ProposeId(0, 3), cmd3));

task_manager.spawn(TaskName::GcSpecPool, |n| {
gc_spec_pool(Arc::clone(&spec), Duration::from_millis(500), n)
});

spec.lock().remove(&ProposeId(0, 2));

sleep_secs(1).await;
task_manager.shutdown(true).await;
}
}
2 changes: 1 addition & 1 deletion crates/curp/src/server/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ impl Metrics {
observer.observe_u64(&is_learner, u64::from(learner), &[]);
observer.observe_u64(&server_id, id, &[]);

let sp_size = curp.spec_pool().lock().pool.len();
let sp_size = curp.new_spec_pool().lock().len();
observer.observe_u64(&sp_total, sp_size.numeric_cast(), &[]);

let client_ids = curp.lease_manager().read().expiry_queue.len();
Expand Down
8 changes: 5 additions & 3 deletions crates/curp/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use utils::{config::CurpConfig, task_manager::TaskManager, tracing::Extract};

use self::curp_node::CurpNode;
pub use self::raw_curp::RawCurp;
pub use self::{conflict::spec_pool_new::SpObject, conflict::uncommitted_pool::UcpObject};
use crate::{
cmd::{Command, CommandExecutor},
members::{ClusterInfo, ServerId},
Expand Down Expand Up @@ -38,9 +39,6 @@ mod cmd_board;
/// Conflict pools
pub mod conflict;

/// Speculative pool
mod spec_pool;

/// Background garbage collection for Curp server
mod gc;

Expand Down Expand Up @@ -245,6 +243,8 @@ impl<C: Command, RC: RoleChange> Rpc<C, RC> {
storage: Arc<DB<C>>,
task_manager: Arc<TaskManager>,
client_tls_config: Option<ClientTlsConfig>,
sps: Vec<SpObject<C>>,
ucps: Vec<UcpObject<C>>,
) -> Self {
#[allow(clippy::panic)]
let curp_node = match CurpNode::new(
Expand All @@ -257,6 +257,8 @@ impl<C: Command, RC: RoleChange> Rpc<C, RC> {
storage,
task_manager,
client_tls_config,
sps,
ucps,
)
.await
{
Expand Down
Loading

0 comments on commit 2ceee54

Please sign in to comment.