Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: enable new sp/ucp implementation #668

Merged
merged 6 commits into from
May 12, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions crates/curp-external-api/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ pub trait Command: pri::Serializable + ConflictCheck + PbCodec {
/// Get keys of the command
fn keys(&self) -> &[Self::K];

/// Returns `true` if the command is read-only
fn is_read_only(&self) -> bool;

/// Prepare the command
///
/// # Errors
Expand Down
7 changes: 7 additions & 0 deletions crates/curp-test-utils/src/test_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,13 @@ impl Command for TestCommand {
fn keys(&self) -> &[Self::K] {
&self.keys
}

fn is_read_only(&self) -> bool {
match self.cmd_type {
TestCommandType::Get => true,
TestCommandType::Put(_) => false,
}
}
}

impl ConflictCheck for TestCommand {
Expand Down
13 changes: 0 additions & 13 deletions crates/curp/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -863,19 +863,6 @@ impl<C> PoolEntry<C> {
}
}

impl<C> PoolEntry<C>
where
C: Command,
{
/// Check if the entry is conflict with the command
pub(crate) fn is_conflict_with_cmd(&self, c: &C) -> bool {
match self.inner {
PoolEntryInner::Command(ref cmd) => cmd.is_conflict(c),
PoolEntryInner::ConfChange(ref _conf_change) => true,
}
}
}

impl<C> ConflictCheck for PoolEntry<C>
where
C: ConflictCheck,
Expand Down
20 changes: 13 additions & 7 deletions crates/curp/src/server/cmd_worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
cmd::{Command, CommandExecutor},
log_entry::{EntryData, LogEntry},
role_change::RoleChange,
rpc::ConfChangeType,
rpc::{ConfChangeType, PoolEntry},
server::cmd_worker::conflict_checked_mpmc::TaskType,
snapshot::{Snapshot, SnapshotMeta},
};
Expand Down Expand Up @@ -120,8 +120,10 @@ async fn worker_exe<C: Command, CE: CommandExecutor<C>, RC: RoleChange>(
let er_ok = er.is_ok();
cb.write().insert_er(entry.propose_id, er);
if !er_ok {
sp.lock().remove(&entry.propose_id);
let _ig = ucp.lock().remove(&entry.propose_id);
sp.lock()
.remove(PoolEntry::new(entry.propose_id, Arc::clone(cmd)));
ucp.lock()
.remove(PoolEntry::new(entry.propose_id, Arc::clone(cmd)));
}
debug!(
"{id} cmd({}) is speculatively executed, exe status: {er_ok}",
Expand Down Expand Up @@ -157,8 +159,10 @@ async fn worker_as<C: Command, CE: CommandExecutor<C>, RC: RoleChange>(
let asr = ce.after_sync(cmd.as_ref(), entry.index, prepare).await;
let asr_ok = asr.is_ok();
cb.write().insert_asr(entry.propose_id, asr);
sp.lock().remove(&entry.propose_id);
let _ig = ucp.lock().remove(&entry.propose_id);
sp.lock()
.remove(PoolEntry::new(entry.propose_id, Arc::clone(cmd)));
ucp.lock()
.remove(PoolEntry::new(entry.propose_id, Arc::clone(cmd)));
debug!("{id} cmd({}) after sync is called", entry.propose_id);
asr_ok
}
Expand All @@ -184,8 +188,10 @@ async fn worker_as<C: Command, CE: CommandExecutor<C>, RC: RoleChange>(
let shutdown_self =
change.change_type() == ConfChangeType::Remove && change.node_id == id;
cb.write().insert_conf(entry.propose_id);
sp.lock().remove(&entry.propose_id);
let _ig = ucp.lock().remove(&entry.propose_id);
sp.lock()
.remove(PoolEntry::new(entry.propose_id, conf_change.clone()));
ucp.lock()
.remove(PoolEntry::new(entry.propose_id, conf_change.clone()));
if shutdown_self {
if let Some(maybe_new_leader) = curp.pick_new_leader() {
info!(
Expand Down
3 changes: 0 additions & 3 deletions crates/curp/src/server/conflict/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
#![allow(unused)]
#![allow(unreachable_pub)]

/// Speculative pool
pub(crate) mod spec_pool_new;

Expand Down
2 changes: 1 addition & 1 deletion crates/curp/src/server/conflict/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ fn sp_should_returns_all_entries() {
for e in entries.clone() {
sp.insert(e);
}
/// conflict entries should not be inserted
// conflict entries should not be inserted
for e in entries.clone() {
assert!(sp.insert(e).is_some());
}
Expand Down
24 changes: 9 additions & 15 deletions crates/curp/src/server/curp_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ use utils::{
use super::{
cmd_board::{CmdBoardRef, CommandBoard},
cmd_worker::{conflict_checked_mpmc, start_cmd_workers},
gc::{gc_cmd_board, gc_spec_pool},
conflict::spec_pool_new::{SpObject, SpeculativePool},
conflict::uncommitted_pool::{UcpObject, UncommittedPool},
gc::gc_cmd_board,
lease_manager::LeaseManager,
raw_curp::{AppendEntries, RawCurp, UncommittedPool, Vote},
spec_pool::{SpecPoolRef, SpeculativePool},
raw_curp::{AppendEntries, RawCurp, Vote},
storage::StorageApi,
};
use crate::{
Expand Down Expand Up @@ -59,8 +60,6 @@ use crate::{
pub(super) struct CurpNode<C: Command, RC: RoleChange> {
/// `RawCurp` state machine
curp: Arc<RawCurp<C, RC>>,
/// The speculative cmd pool, shared with executor
spec_pool: SpecPoolRef<C>,
/// Cmd watch board for tracking the cmd sync results
cmd_board: CmdBoardRef<C>,
/// CE event tx,
Expand Down Expand Up @@ -336,7 +335,7 @@ impl<C: Command, RC: RoleChange> CurpNode<C, RC> {
) -> Result<FetchReadStateResponse, CurpError> {
self.check_cluster_version(req.cluster_version)?;
let cmd = req.cmd()?;
let state = self.curp.handle_fetch_read_state(&cmd);
let state = self.curp.handle_fetch_read_state(Arc::new(cmd));
Ok(FetchReadStateResponse::new(state))
}

Expand Down Expand Up @@ -616,6 +615,8 @@ impl<C: Command, RC: RoleChange> CurpNode<C, RC> {
storage: Arc<DB<C>>,
task_manager: Arc<TaskManager>,
client_tls_config: Option<ClientTlsConfig>,
sps: Vec<SpObject<C>>,
ucps: Vec<UcpObject<C>>,
) -> Result<Self, CurpError> {
let sync_events = cluster_info
.peers_ids()
Expand All @@ -628,9 +629,7 @@ impl<C: Command, RC: RoleChange> CurpNode<C, RC> {
.collect();
let (log_tx, log_rx) = mpsc::unbounded_channel();
let cmd_board = Arc::new(RwLock::new(CommandBoard::new()));
let spec_pool = Arc::new(Mutex::new(SpeculativePool::new()));
let lease_manager = Arc::new(RwLock::new(LeaseManager::new()));
let uncommitted_pool = Arc::new(Mutex::new(UncommittedPool::new()));
let last_applied = cmd_executor
.last_applied()
.map_err(|e| CurpError::internal(format!("get applied index error, {e}")))?;
Expand All @@ -645,9 +644,7 @@ impl<C: Command, RC: RoleChange> CurpNode<C, RC> {
.cluster_info(Arc::clone(&cluster_info))
.is_leader(is_leader)
.cmd_board(Arc::clone(&cmd_board))
.spec_pool(Arc::clone(&spec_pool))
.lease_manager(lease_manager)
.uncommitted_pool(uncommitted_pool)
.cfg(Arc::clone(&curp_cfg))
.cmd_tx(Arc::clone(&ce_event_tx))
.sync_events(sync_events)
Expand All @@ -660,6 +657,8 @@ impl<C: Command, RC: RoleChange> CurpNode<C, RC> {
.entries(entries)
.curp_storage(Arc::clone(&storage))
.client_tls_config(client_tls_config)
.spec_pool(Arc::new(Mutex::new(SpeculativePool::new(sps))))
.uncommitted_pool(Arc::new(Mutex::new(UncommittedPool::new(ucps))))
.build_raw_curp()
.map_err(|e| CurpError::internal(format!("build raw curp failed, {e}")))?,
);
Expand All @@ -671,15 +670,11 @@ impl<C: Command, RC: RoleChange> CurpNode<C, RC> {
task_manager.spawn(TaskName::GcCmdBoard, |n| {
gc_cmd_board(Arc::clone(&cmd_board), curp_cfg.gc_interval, n)
});
task_manager.spawn(TaskName::GcSpecPool, |n| {
gc_spec_pool(Arc::clone(&spec_pool), curp_cfg.gc_interval, n)
});

Self::run_bg_tasks(Arc::clone(&curp), Arc::clone(&storage), log_rx);

Ok(Self {
curp,
spec_pool,
cmd_board,
ce_event_tx,
storage,
Expand Down Expand Up @@ -979,7 +974,6 @@ impl<C: Command, RC: RoleChange> Debug for CurpNode<C, RC> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CurpNode")
.field("raw_curp", &self.curp)
.field("spec_pool", &self.spec_pool)
.field("cmd_board", &self.cmd_board)
.finish()
}
Expand Down
109 changes: 7 additions & 102 deletions crates/curp/src/server/gc.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,10 @@
use std::{collections::HashSet, time::Duration};
use std::time::Duration;

use utils::{parking_lot_lock::MutexMap, task_manager::Listener};
use utils::task_manager::Listener;

use super::spec_pool::SpecPoolRef;
use crate::{cmd::Command, rpc::ProposeId, server::cmd_board::CmdBoardRef};
use crate::{cmd::Command, server::cmd_board::CmdBoardRef};

/// Cleanup spec pool
pub(super) async fn gc_spec_pool<C: Command>(
sp: SpecPoolRef<C>,
interval: Duration,
shutdown_listener: Listener,
) {
let mut last_check: HashSet<ProposeId> =
sp.map_lock(|sp_l| sp_l.pool.keys().copied().collect());
#[allow(clippy::arithmetic_side_effects, clippy::ignored_unit_patterns)]
// introduced by tokio select
loop {
tokio::select! {
_ = tokio::time::sleep(interval) => {}
_ = shutdown_listener.wait() => break,
}
let mut sp = sp.lock();
sp.pool.retain(|k, _v| !last_check.contains(k));

last_check = sp.pool.keys().copied().collect();
}
}
// TODO: Speculative pool GC

/// Cleanup cmd board
pub(super) async fn gc_cmd_board<C: Command>(
Expand Down Expand Up @@ -78,21 +57,16 @@ pub(super) async fn gc_cmd_board<C: Command>(
mod tests {
use std::{sync::Arc, time::Duration};

use curp_test_utils::{
sleep_secs,
test_cmd::{TestCommand, TestCommandResult},
};
use parking_lot::{Mutex, RwLock};
use curp_test_utils::test_cmd::{TestCommand, TestCommandResult};
use parking_lot::RwLock;
use test_macros::abort_on_panic;
use utils::task_manager::{tasks::TaskName, TaskManager};

use super::*;
use crate::{
rpc::{PoolEntry, ProposeId},
rpc::ProposeId,
server::{
cmd_board::{CmdBoardRef, CommandBoard},
gc::gc_cmd_board,
spec_pool::{SpecPoolRef, SpeculativePool},
},
};

Expand Down Expand Up @@ -145,73 +119,4 @@ mod tests {
assert_eq!(*board.asr_buffer.get_index(0).unwrap().0, ProposeId(3, 3));
task_manager.shutdown(true).await;
}

#[tokio::test]
#[abort_on_panic]
async fn spec_gc_test() {
let task_manager = TaskManager::new();
let spec: SpecPoolRef<TestCommand> = Arc::new(Mutex::new(SpeculativePool::new()));
task_manager.spawn(TaskName::GcSpecPool, |n| {
gc_spec_pool(Arc::clone(&spec), Duration::from_millis(500), n)
});

tokio::time::sleep(Duration::from_millis(100)).await;
let cmd1 = Arc::new(TestCommand::default());
spec.lock()
.pool
.insert(ProposeId(0, 1), PoolEntry::new(ProposeId(0, 1), cmd1));

tokio::time::sleep(Duration::from_millis(100)).await;
let cmd2 = Arc::new(TestCommand::default());
spec.lock()
.pool
.insert(ProposeId(0, 2), PoolEntry::new(ProposeId(0, 2), cmd2));

// at 600ms
tokio::time::sleep(Duration::from_millis(400)).await;
let cmd3 = Arc::new(TestCommand::default());
spec.lock()
.pool
.insert(ProposeId(0, 3), PoolEntry::new(ProposeId(0, 3), cmd3));

// at 1100ms, the first two kv should be removed
tokio::time::sleep(Duration::from_millis(500)).await;
let spec = spec.lock();
assert_eq!(spec.pool.len(), 1);
assert!(spec.pool.contains_key(&ProposeId(0, 3)));
task_manager.shutdown(true).await;
}

// To verify #206 is fixed
#[tokio::test]
#[abort_on_panic]
async fn spec_gc_will_not_panic() {
let task_manager = TaskManager::new();
let spec: SpecPoolRef<TestCommand> = Arc::new(Mutex::new(SpeculativePool::new()));

let cmd1 = Arc::new(TestCommand::default());
spec.lock()
.pool
.insert(ProposeId(0, 1), PoolEntry::new(ProposeId(0, 1), cmd1));

tokio::time::sleep(Duration::from_millis(100)).await;
let cmd2 = Arc::new(TestCommand::default());
spec.lock()
.pool
.insert(ProposeId(0, 2), PoolEntry::new(ProposeId(0, 2), cmd2));

let cmd3 = Arc::new(TestCommand::default());
spec.lock()
.pool
.insert(ProposeId(0, 2), PoolEntry::new(ProposeId(0, 3), cmd3));

task_manager.spawn(TaskName::GcSpecPool, |n| {
gc_spec_pool(Arc::clone(&spec), Duration::from_millis(500), n)
});

spec.lock().remove(&ProposeId(0, 2));

sleep_secs(1).await;
task_manager.shutdown(true).await;
}
}
2 changes: 1 addition & 1 deletion crates/curp/src/server/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ impl Metrics {
observer.observe_u64(&is_learner, u64::from(learner), &[]);
observer.observe_u64(&server_id, id, &[]);

let sp_size = curp.spec_pool().lock().pool.len();
let sp_size = curp.spec_pool().lock().len();
observer.observe_u64(&sp_total, sp_size.numeric_cast(), &[]);

let client_ids = curp.lease_manager().read().expiry_queue.len();
Expand Down
Loading
Loading