Skip to content

Commit

Permalink
squash!: format the code
Browse files Browse the repository at this point in the history
Signed-off-by: Phoeniix Zhao <[email protected]>
  • Loading branch information
Phoenix500526 committed Apr 18, 2024
1 parent 6f85e73 commit b1744f2
Show file tree
Hide file tree
Showing 20 changed files with 93 additions and 62 deletions.
8 changes: 6 additions & 2 deletions crates/curp-test-utils/src/test_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,8 @@ impl CommandExecutor<TestCommand> for TestCE {
let Some(index) = self
.store
.get(META_TABLE, APPLIED_INDEX_KEY)
.map_err(|e| ExecuteError(e.to_string()))? else {
.map_err(|e| ExecuteError(e.to_string()))?
else {
return Ok(0);
};
let index = LogIndex::from_le_bytes(index.as_slice().try_into().unwrap());
Expand All @@ -379,7 +380,10 @@ impl CommandExecutor<TestCommand> for TestCE {
snapshot: Option<(Snapshot, LogIndex)>,
) -> Result<(), <TestCommand as Command>::Error> {
let Some((mut snapshot, index)) = snapshot else {
let ops = vec![WriteOperation::new_delete_range(TEST_TABLE, &[], &[0xff]),WriteOperation::new_delete(META_TABLE, APPLIED_INDEX_KEY.as_ref())];
let ops = vec![
WriteOperation::new_delete_range(TEST_TABLE, &[], &[0xff]),
WriteOperation::new_delete(META_TABLE, APPLIED_INDEX_KEY.as_ref()),
];
self.store
.write_batch(ops, true)
.map_err(|e| ExecuteError(e.to_string()))?;
Expand Down
4 changes: 2 additions & 2 deletions crates/curp/src/server/cmd_worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ async fn worker_as<C: Command, CE: CommandExecutor<C>, RC: RoleChange>(
let success = match entry.entry_data {
EntryData::Command(ref cmd) => {
let Some(prepare) = prepare else {
unreachable!("prepare should always be Some(_) when entry is a command");
};
unreachable!("prepare should always be Some(_) when entry is a command");
};
let asr = ce.after_sync(cmd.as_ref(), entry.index, prepare).await;
let asr_ok = asr.is_ok();
cb.write().insert_asr(entry.propose_id, asr);
Expand Down
3 changes: 1 addition & 2 deletions crates/curp/src/server/conflict/spec_pool_new.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use curp_external_api::conflict::{ConflictPoolOp, SpeculativePoolOp};

use crate::rpc::PoolEntry;

use super::{CommandEntry, ConfChangeEntry, ConflictPoolEntry};
use crate::rpc::PoolEntry;

/// A speculative pool object
pub type SpObject<C> = Box<dyn SpeculativePoolOp<Entry = CommandEntry<C>> + Send + 'static>;
Expand Down
3 changes: 1 addition & 2 deletions crates/curp/src/server/conflict/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@ use std::{cmp::Ordering, sync::Arc};

use curp_external_api::conflict::{ConflictPoolOp, SpeculativePoolOp, UncommittedPoolOp};

use super::{spec_pool_new::SpeculativePool, CommandEntry};
use crate::{
rpc::{ConfChange, PoolEntry, PoolEntryInner, ProposeId},
server::conflict::uncommitted_pool::UncommittedPool,
};

use super::{spec_pool_new::SpeculativePool, CommandEntry};

#[derive(Debug, Default)]
struct TestSp {
entries: Vec<CommandEntry<i32>>,
Expand Down
3 changes: 1 addition & 2 deletions crates/curp/src/server/conflict/uncommitted_pool.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use curp_external_api::conflict::{ConflictPoolOp, UncommittedPoolOp};

use crate::rpc::PoolEntry;

use super::{CommandEntry, ConfChangeEntry, ConflictPoolEntry};
use crate::rpc::PoolEntry;

/// An uncommitted pool object
pub type UcpObject<C> = Box<dyn UncommittedPoolOp<Entry = CommandEntry<C>> + Send + 'static>;
Expand Down
6 changes: 5 additions & 1 deletion crates/curp/src/server/curp_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,11 @@ impl<C: Command, RC: RoleChange> CurpNode<C, RC> {
break;
}
let Some(event) = remove_events.remove(&change.node_id) else {
unreachable!("({:?}) shutdown_event of removed follower ({:x}) should exist", curp.id(), change.node_id);
unreachable!(
"({:?}) shutdown_event of removed follower ({:x}) should exist",
curp.id(),
change.node_id
);
};
event.notify(1);
}
Expand Down
5 changes: 4 additions & 1 deletion crates/curp/src/server/raw_curp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1165,7 +1165,10 @@ impl<C: Command, RC: RoleChange> RawCurp<C, RC> {
};

let Some(next_index) = self.lst.get_next_index(follower_id) else {
warn!("follower {} is not found, it maybe has been removed", follower_id);
warn!(
"follower {} is not found, it maybe has been removed",
follower_id
);
return None;
};
let log_r = self.log.read();
Expand Down
3 changes: 1 addition & 2 deletions crates/engine/src/memory_engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@ use parking_lot::RwLock;
use tokio::io::AsyncWriteExt;
use tokio_util::io::read_buf;

pub(super) use self::transaction::MemoryTransaction;
use crate::{
api::{engine_api::StorageEngine, snapshot_api::SnapshotApi},
error::EngineError,
WriteOperation,
};

pub(super) use self::transaction::MemoryTransaction;

/// A helper type to store the key-value pairs for the `MemoryEngine`
type MemoryTable = HashMap<Vec<u8>, Vec<u8>>;

Expand Down
22 changes: 14 additions & 8 deletions crates/engine/src/rocksdb_engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,21 @@ use std::{
use bytes::{Buf, Bytes, BytesMut};
use clippy_utilities::{NumericCast, OverflowArithmetic};
use rocksdb::{
Direction, Error as RocksError, IteratorMode, OptimisticTransactionDB, Options, SstFileWriter, ErrorKind as RocksErrorKind,
Direction, Error as RocksError, ErrorKind as RocksErrorKind, IteratorMode,
OptimisticTransactionDB, Options, SstFileWriter,
};
use serde::{Deserialize, Serialize};
use tokio::{fs::File, io::AsyncWriteExt};
use tokio_util::io::read_buf;
use tracing::warn;

pub(super) use self::transaction::RocksTransaction;
use crate::{
api::{engine_api::StorageEngine, snapshot_api::SnapshotApi},
error::EngineError,
WriteOperation,
};

pub(super) use self::transaction::RocksTransaction;

/// Install snapshot chunk size: 64KB
const SNAPSHOT_CHUNK_SIZE: usize = 64 * 1024;

Expand Down Expand Up @@ -246,13 +246,15 @@ impl StorageEngine for RocksEngine {
}
}
match transaction.commit() {
Ok(_) => {
Ok(()) => {
_ = self
.size
.fetch_add(size.numeric_cast(), std::sync::atomic::Ordering::Relaxed);
.size
.fetch_add(size.numeric_cast(), std::sync::atomic::Ordering::Relaxed);
return Ok(());
}
Err(err) if matches!(err.kind(), RocksErrorKind::Busy | RocksErrorKind::TryAgain) => {
Err(err)
if matches!(err.kind(), RocksErrorKind::Busy | RocksErrorKind::TryAgain) =>
{
if retry_count > max_retry_count {
warn!("Oops, txn commit retry count reach the max_retry_count: {max_retry_count}");
return Err(EngineError::UnderlyingError(err.to_string()));
Expand Down Expand Up @@ -501,7 +503,11 @@ impl RocksSnapshot {

/// path of current file
fn current_file_path(&self, tmp: bool) -> PathBuf {
let Some(current_filename) = self.snap_files.get(self.snap_file_idx).map(|sf| &sf.filename) else {
let Some(current_filename) = self
.snap_files
.get(self.snap_file_idx)
.map(|sf| &sf.filename)
else {
unreachable!("this method must be called when self.file_index < self.snap_files.len()")
};
let filename = if tmp {
Expand Down
2 changes: 1 addition & 1 deletion crates/utils/src/task_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ impl TaskManager {
let mut queue = Self::root_tasks_queue(&tasks);
state.store(1, Ordering::Release);
while let Some(v) = queue.pop_front() {
let Some((_name,mut task)) = tasks.remove(&v) else {
let Some((_name, mut task)) = tasks.remove(&v) else {
continue;
};
task.notifier.notify_waiters();
Expand Down
6 changes: 5 additions & 1 deletion crates/xline-client/src/clients/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,11 @@ impl AuthClient {
.await??;
cmd_res.into_inner()
} else {
let (cmd_res, Some(sync_res)) = self.curp_client.propose(&cmd,self.token.as_ref(),false).await?? else {
let (cmd_res, Some(sync_res)) = self
.curp_client
.propose(&cmd, self.token.as_ref(), false)
.await??
else {
unreachable!("sync_res is always Some when use_fast_path is false");
};
let mut res_wrapper = cmd_res.into_inner();
Expand Down
6 changes: 5 additions & 1 deletion crates/xline-client/src/clients/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,11 @@ impl KvClient {
pub async fn txn(&self, request: TxnRequest) -> Result<TxnResponse> {
let request = RequestWrapper::from(xlineapi::TxnRequest::from(request));
let cmd = Command::new(request.keys(), request);
let (cmd_res, Some(sync_res)) = self.curp_client.propose(&cmd,self.token.as_ref(), false).await?? else {
let (cmd_res, Some(sync_res)) = self
.curp_client
.propose(&cmd, self.token.as_ref(), false)
.await??
else {
unreachable!("sync_res is always Some when use_fast_path is false");
};
let mut res_wrapper = cmd_res.into_inner();
Expand Down
2 changes: 1 addition & 1 deletion crates/xline/src/server/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ where
let cmd_size = size_estimate::cmd_size(cmd.request());
if self.persistent.estimated_file_size().overflow_add(cmd_size) > self.quota {
let Ok(file_size) = self.persistent.file_size() else {
return false
return false;
};
if file_size.overflow_add(cmd_size) > self.quota {
warn!(
Expand Down
7 changes: 5 additions & 2 deletions crates/xline/src/storage/auth_store/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -702,8 +702,11 @@ where
if (req.role != ROOT_ROLE) && role.is_err() {
return Err(ExecuteError::RoleNotFound(req.role.clone()));
}
let Err(idx) = user.roles.binary_search(&req.role) else {
return Err(ExecuteError::UserAlreadyHasRole(req.user.clone(), req.role.clone()));
let Err(idx) = user.roles.binary_search(&req.role) else {
return Err(ExecuteError::UserAlreadyHasRole(
req.user.clone(),
req.role.clone(),
));
};
user.roles.insert(idx, req.role.clone());
if let Ok(role) = role {
Expand Down
7 changes: 2 additions & 5 deletions crates/xline/src/storage/kv_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,11 +264,8 @@ where

/// Get compact revision from db
fn get_compact_revision(&self, revision_key: &str) -> Result<Option<i64>, ExecuteError> {
let Some(revision_bytes)= self.inner
.db
.get_value(META_TABLE, revision_key)?
else {
return Ok(None);
let Some(revision_bytes) = self.inner.db.get_value(META_TABLE, revision_key)? else {
return Ok(None);
};
let bytes = revision_bytes.try_into().map_err(|e| {
ExecuteError::DbError(format!(
Expand Down
4 changes: 2 additions & 2 deletions crates/xline/src/storage/lease_store/lease_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl LeaseCollection {
pub(crate) fn attach(&self, lease_id: i64, key: Vec<u8>) -> Result<(), ExecuteError> {
let mut inner = self.inner.write();
let Some(lease) = inner.lease_map.get_mut(&lease_id) else {
return Err(ExecuteError::LeaseNotFound(lease_id));
return Err(ExecuteError::LeaseNotFound(lease_id));
};
lease.insert_key(key.clone());
let _ignore = inner.item_map.insert(key, lease_id);
Expand All @@ -96,7 +96,7 @@ impl LeaseCollection {
pub(crate) fn detach(&self, lease_id: i64, key: &[u8]) -> Result<(), ExecuteError> {
let mut inner = self.inner.write();
let Some(lease) = inner.lease_map.get_mut(&lease_id) else {
return Err(ExecuteError::LeaseNotFound(lease_id));
return Err(ExecuteError::LeaseNotFound(lease_id));
};
lease.remove_key(key);
let _ignore = inner.item_map.remove(key);
Expand Down
8 changes: 6 additions & 2 deletions crates/xline/src/storage/lease_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,13 +412,17 @@ mod test {
let _ignore4 = exe_and_sync_req(&lease_store, &req4, -1).await?;
let resp_1 = exe_and_sync_req(&lease_store, &req6, -1).await?;

let ResponseWrapper::LeaseLeasesResponse(leases_1) = resp_1 else { panic!("wrong response type: {resp_1:?}"); };
let ResponseWrapper::LeaseLeasesResponse(leases_1) = resp_1 else {
panic!("wrong response type: {resp_1:?}");
};
assert_eq!(leases_1.leases[0].id, 3);
assert_eq!(leases_1.leases[1].id, 4);

let _ignore5 = exe_and_sync_req(&lease_store, &req5, -1).await?;
let resp_2 = exe_and_sync_req(&lease_store, &req6, -1).await?;
let ResponseWrapper::LeaseLeasesResponse(leases_2) = resp_2 else { panic!("wrong response type: {resp_2:?}"); };
let ResponseWrapper::LeaseLeasesResponse(leases_2) = resp_2 else {
panic!("wrong response type: {resp_2:?}");
};
assert_eq!(leases_2.leases[0].id, 4);

Ok(())
Expand Down
33 changes: 20 additions & 13 deletions crates/xline/src/utils/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,19 +235,26 @@ impl From<ServerArgs> for XlineServerConfig {

let storage = StorageConfig::new(engine, args.quota.unwrap_or_else(default_quota));
let Ok(curp_config) = CurpConfigBuilder::default()
.heartbeat_interval(args.heartbeat_interval
.unwrap_or_else(default_heartbeat_interval))
.wait_synced_timeout(args.server_wait_synced_timeout
.unwrap_or_else(default_server_wait_synced_timeout))
.rpc_timeout(args.rpc_timeout.unwrap_or_else(default_rpc_timeout))
.batch_timeout(args.batch_timeout.unwrap_or_else(default_batch_timeout))
.batch_max_size(args.batch_max_size.unwrap_or_else(default_batch_max_size))
.follower_timeout_ticks(args.follower_timeout_ticks)
.candidate_timeout_ticks(args.candidate_timeout_ticks)
.engine_cfg(curp_engine)
.gc_interval(args.gc_interval.unwrap_or_else(default_gc_interval))
.cmd_workers(args.cmd_workers)
.build() else { panic!("failed to create curp config") };
.heartbeat_interval(
args.heartbeat_interval
.unwrap_or_else(default_heartbeat_interval),
)
.wait_synced_timeout(
args.server_wait_synced_timeout
.unwrap_or_else(default_server_wait_synced_timeout),
)
.rpc_timeout(args.rpc_timeout.unwrap_or_else(default_rpc_timeout))
.batch_timeout(args.batch_timeout.unwrap_or_else(default_batch_timeout))
.batch_max_size(args.batch_max_size.unwrap_or_else(default_batch_max_size))
.follower_timeout_ticks(args.follower_timeout_ticks)
.candidate_timeout_ticks(args.candidate_timeout_ticks)
.engine_cfg(curp_engine)
.gc_interval(args.gc_interval.unwrap_or_else(default_gc_interval))
.cmd_workers(args.cmd_workers)
.build()
else {
panic!("failed to create curp config")
};
let client_config = ClientConfig::new(
args.client_wait_synced_timeout
.unwrap_or_else(default_client_wait_synced_timeout),
Expand Down
18 changes: 9 additions & 9 deletions crates/xlinectl/src/command/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ async fn exec_interactive(client: &mut Client, matches: &ArgMatches) -> Result<(
let mut line = String::new();
let _n = io::stdin().read_line(&mut line)?;
let Some(args) = shlex::split(&line) else {
failed!(line);
};
failed!(line);
};

if args.len() < 2 {
failed!(line);
Expand All @@ -115,8 +115,8 @@ async fn exec_interactive(client: &mut Client, matches: &ArgMatches) -> Result<(
match args.next().unwrap() {
"watch" => {
let Some(key) = args.next() else {
failed!(line);
};
failed!(line);
};
let request = req_builder(key, args.next());
let (new_watcher, mut stream) = client.watch_client().watch(request).await?;
watcher = Some(new_watcher);
Expand All @@ -129,13 +129,13 @@ async fn exec_interactive(client: &mut Client, matches: &ArgMatches) -> Result<(
}
"cancel" => {
let Some(watcher) = watcher.as_mut() else {
eprintln!("No currently active watch");
continue;
};
eprintln!("No currently active watch");
continue;
};
let cancel_result = if let Some(id_str) = args.next() {
let Ok(id) = id_str.parse() else {
failed!(line);
};
failed!(line);
};
watcher.cancel_by_id(id)
} else {
watcher.cancel()
Expand Down
5 changes: 2 additions & 3 deletions crates/xlinectl/src/utils/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ pub(crate) fn parse_user(matches: &ArgMatches) -> Result<Option<(String, String)
Ok(Some((user, passwd)))
} else {
let Some(password) = password_opt else {
bail!("Password not set in `--user`, please set it in `--password`"
.to_owned());
};
bail!("Password not set in `--user`, please set it in `--password`".to_owned());
};
Ok(Some((user, password.clone())))
}
} else {
Expand Down

0 comments on commit b1744f2

Please sign in to comment.