Skip to content

Commit

Permalink
refactor: refactor shutdown test
Browse files Browse the repository at this point in the history
add retry count for client
fix curp group recovery

Signed-off-by: themanforfree <[email protected]>
  • Loading branch information
themanforfree committed Sep 8, 2023
1 parent c41a25a commit 6e4769f
Show file tree
Hide file tree
Showing 39 changed files with 558 additions and 308 deletions.
4 changes: 2 additions & 2 deletions USAGE.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ auth_public_key = '/etc/xline/public_key.pem'
auth_private_key = '/etc/xline/private_key.pem'
```

For tuning and development purpose, the cluster section provides two subsections, curp_cfg, and client_timeout, with the following definitions and default values.
For tuning and development purpose, the cluster section provides two subsections, curp_cfg, and client_config, with the following definitions and default values.

```toml
[cluster.curp_config]
Expand All @@ -53,7 +53,7 @@ candidate_timeout_ticks = 2 # if a candidate cannot win an election, it will
# after `candidate_timeout_ticks` ticks. Its default value is 2


[cluster.client_timeout]
[cluster.client_config]
propose_timeout = '1s' # client propose timeout
wait_synced_timeout = '2s' # client wait synced timeout
retry_timeout = '50ms' # the rpc retry interval, of which the default is 50ms
Expand Down
5 changes: 3 additions & 2 deletions benchmark/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use tokio::{
time::{Duration, Instant},
};
use tracing::debug;
use utils::config::ClientTimeout;
use utils::config::ClientConfig;
use xline::client::{kv_types::PutRequest, Client};

use crate::{args::Commands, Benchmark};
Expand Down Expand Up @@ -162,10 +162,11 @@ impl CommandRunner {
let client = Client::new(
self.args.endpoints.clone(),
self.args.use_curp,
ClientTimeout::new(
ClientConfig::new(
Duration::from_secs(10),
Duration::from_secs(5),
Duration::from_millis(250),
3,
),
)
.await?;
Expand Down
3 changes: 3 additions & 0 deletions curp-external-api/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ where
prepare_res: C::PR,
) -> Result<C::ASR, C::Error>;

/// Set the index of the last log entry that has been successfully applied to the command executor
fn set_last_applied(&self, index: LogIndex) -> Result<(), C::Error>;

/// Index of the last log entry that has been successfully applied to the command executor
fn last_applied(&self) -> Result<LogIndex, C::Error>;

Expand Down
2 changes: 0 additions & 2 deletions curp-external-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,5 +139,3 @@ pub type LogIndex = u64;
pub mod cmd;
/// The command to be executed
pub mod role_change;
/// Snapshot
pub mod snapshot;
12 changes: 0 additions & 12 deletions curp-external-api/src/snapshot.rs

This file was deleted.

1 change: 1 addition & 0 deletions curp-test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub mod test_cmd;

pub const TEST_TABLE: &str = "test";
pub const REVISION_TABLE: &str = "revision";
pub const META_TABLE: &str = "meta";

#[derive(Default, Debug)]
pub struct TestRoleChange {
Expand Down
99 changes: 76 additions & 23 deletions curp-test-utils/src/test_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use std::{
};

use async_trait::async_trait;
use clippy_utilities::NumericCast;
use curp_external_api::{
cmd::{Command, CommandExecutor, ConflictCheck, PbCodec, ProposeId},
LogIndex,
Expand All @@ -20,8 +19,12 @@ use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::{sync::mpsc, time::sleep};
use tracing::debug;
use utils::config::StorageConfig;

use crate::{REVISION_TABLE, TEST_TABLE};
use crate::{META_TABLE, REVISION_TABLE, TEST_TABLE};

pub(crate) const APPLIED_INDEX_KEY: &str = "applied_index";
pub(crate) const LAST_REVISION_KEY: &str = "last_revision";

static NEXT_ID: Lazy<AtomicU64> = Lazy::new(|| AtomicU64::new(1));

Expand Down Expand Up @@ -234,7 +237,6 @@ impl PbCodec for TestCommand {
#[derive(Debug, Clone)]
pub struct TestCE {
server_id: String,
last_applied: Arc<AtomicU64>,
revision: Arc<AtomicI64>,
pub store: Arc<Engine>,
exe_sender: mpsc::UnboundedSender<(TestCommand, TestCommandResult)>,
Expand All @@ -249,7 +251,16 @@ impl CommandExecutor<TestCommand> for TestCE {
_index: LogIndex,
) -> Result<<TestCommand as Command>::PR, <TestCommand as Command>::Error> {
let rev = if let TestCommandType::Put(_) = cmd.cmd_type {
self.revision.fetch_add(1, Ordering::Relaxed)
let rev = self.revision.fetch_add(1, Ordering::Relaxed);
let wr_ops = vec![WriteOperation::new_put(
META_TABLE,
LAST_REVISION_KEY.into(),
rev.to_le_bytes().to_vec(),
)];
self.store
.write_batch(wr_ops, true)
.map_err(|e| ExecuteError(e.to_string()))?;
rev
} else {
-1
};
Expand Down Expand Up @@ -315,6 +326,11 @@ impl CommandExecutor<TestCommand> for TestCE {
self.after_sync_sender
.send((cmd.clone(), index))
.expect("failed to send after sync msg");
let mut wr_ops = vec![WriteOperation::new_put(
META_TABLE,
APPLIED_INDEX_KEY.into(),
index.to_le_bytes().to_vec(),
)];
if let TestCommandType::Put(v) = cmd.cmd_type {
debug!(
"cmd {:?}-{} revision is {}",
Expand All @@ -328,19 +344,22 @@ impl CommandExecutor<TestCommand> for TestCE {
.iter()
.map(|k| k.to_be_bytes().to_vec())
.collect_vec();
let wr_ops = keys
.clone()
.into_iter()
.map(|key| WriteOperation::new_put(TEST_TABLE, key, value.clone()))
.chain(keys.into_iter().map(|key| {
WriteOperation::new_put(REVISION_TABLE, key, revision.to_be_bytes().to_vec())
}))
.collect();
wr_ops.extend(
keys.clone()
.into_iter()
.map(|key| WriteOperation::new_put(TEST_TABLE, key, value.clone()))
.chain(keys.into_iter().map(|key| {
WriteOperation::new_put(
REVISION_TABLE,
key,
revision.to_be_bytes().to_vec(),
)
})),
);
self.store
.write_batch(wr_ops, true)
.map_err(|e| ExecuteError(e.to_string()))?;
}
self.last_applied.store(index, Ordering::Relaxed);
debug!(
"{} after sync cmd({:?} - {}), index: {index}",
self.server_id,
Expand All @@ -350,8 +369,27 @@ impl CommandExecutor<TestCommand> for TestCE {
Ok(index.into())
}

fn set_last_applied(&self, index: LogIndex) -> Result<(), <TestCommand as Command>::Error> {
let ops = vec![WriteOperation::new_put(
META_TABLE,
APPLIED_INDEX_KEY.into(),
index.to_le_bytes().to_vec(),
)];
self.store
.write_batch(ops, true)
.map_err(|e| ExecuteError(e.to_string()))?;
Ok(())
}

fn last_applied(&self) -> Result<LogIndex, ExecuteError> {
Ok(self.last_applied.load(Ordering::Relaxed))
let Some(index) = self
.store
.get(META_TABLE, APPLIED_INDEX_KEY)
.map_err(|e| ExecuteError(e.to_string()))? else {
return Ok(0);
};
let index = LogIndex::from_le_bytes(index.as_slice().try_into().unwrap());
Ok(index)
}

async fn snapshot(&self) -> Result<Snapshot, <TestCommand as Command>::Error> {
Expand All @@ -365,15 +403,20 @@ impl CommandExecutor<TestCommand> for TestCE {
snapshot: Option<(Snapshot, LogIndex)>,
) -> Result<(), <TestCommand as Command>::Error> {
let Some((mut snapshot, index)) = snapshot else {
self.last_applied.store(0, Ordering::Relaxed);
let ops = vec![WriteOperation::new_delete_range(TEST_TABLE, &[], &[0xff])];
let ops = vec![WriteOperation::new_delete_range(TEST_TABLE, &[], &[0xff]),WriteOperation::new_delete(META_TABLE, APPLIED_INDEX_KEY.as_ref())];
self.store
.write_batch(ops, true)
.map_err(|e| ExecuteError(e.to_string()))?;
return Ok(());
};
self.last_applied
.store(index.numeric_cast(), Ordering::Relaxed);
let ops = vec![WriteOperation::new_put(
META_TABLE,
APPLIED_INDEX_KEY.into(),
index.to_le_bytes().to_vec(),
)];
self.store
.write_batch(ops, true)
.map_err(|e| ExecuteError(e.to_string()))?;
snapshot.rewind().unwrap();
self.store
.apply_snapshot(snapshot, &[TEST_TABLE])
Expand All @@ -388,14 +431,24 @@ impl TestCE {
server_id: String,
exe_sender: mpsc::UnboundedSender<(TestCommand, TestCommandResult)>,
after_sync_sender: mpsc::UnboundedSender<(TestCommand, LogIndex)>,
storage_cfg: StorageConfig,
) -> Self {
let engine_type = match storage_cfg {
StorageConfig::Memory => EngineType::Memory,
StorageConfig::RocksDB(path) => EngineType::Rocks(path),
_ => unreachable!("Not supported storage type"),
};
let store =
Arc::new(Engine::new(engine_type, &[TEST_TABLE, REVISION_TABLE, META_TABLE]).unwrap());
let rev = store
.get(META_TABLE, LAST_REVISION_KEY)
.unwrap()
.map(|r| i64::from_le_bytes(r.as_slice().try_into().unwrap()))
.unwrap_or(0);
Self {
server_id,
last_applied: Arc::new(AtomicU64::new(0)),
revision: Arc::new(AtomicI64::new(1)),
store: Arc::new(
Engine::new(EngineType::Memory, &[TEST_TABLE, REVISION_TABLE]).unwrap(),
),
revision: Arc::new(AtomicI64::new(rev + 1)),
store,
exe_sender,
after_sync_sender,
}
Expand Down
1 change: 1 addition & 0 deletions curp/proto/error.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ message ProposeError {
string encode_error = 4;
google.protobuf.Empty not_leader = 5;
google.protobuf.Empty shutdown = 6;
google.protobuf.Empty timeout = 7;
}
}

Expand Down
2 changes: 1 addition & 1 deletion curp/proto/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ service Protocol {
rpc WaitSynced(commandpb.WaitSyncedRequest)
returns (commandpb.WaitSyncedResponse);
rpc Shutdown(ShutdownRequest) returns (ShutdownResponse);
rpc AppendEntries(AppendEntriesRequest) returns (AppendEntriesResponse);
rpc AppendEntries(AppendEntriesRequest) returns (AppendEntriesResponse);
rpc Vote(VoteRequest) returns (VoteResponse);
rpc InstallSnapshot(stream InstallSnapshotRequest)
returns (InstallSnapshotResponse);
Expand Down
Loading

0 comments on commit 6e4769f

Please sign in to comment.