From b1744f2dc09ac99d8e1d0bc436b28c3a6de6c57e Mon Sep 17 00:00:00 2001 From: Phoeniix Zhao Date: Fri, 12 Apr 2024 14:05:37 +0800 Subject: [PATCH] squash!: format the code Signed-off-by: Phoeniix Zhao --- crates/curp-test-utils/src/test_cmd.rs | 8 +++-- crates/curp/src/server/cmd_worker/mod.rs | 4 +-- .../curp/src/server/conflict/spec_pool_new.rs | 3 +- crates/curp/src/server/conflict/tests.rs | 3 +- .../src/server/conflict/uncommitted_pool.rs | 3 +- crates/curp/src/server/curp_node.rs | 6 +++- crates/curp/src/server/raw_curp/mod.rs | 5 ++- crates/engine/src/memory_engine/mod.rs | 3 +- crates/engine/src/rocksdb_engine/mod.rs | 22 ++++++++----- crates/utils/src/task_manager/mod.rs | 2 +- crates/xline-client/src/clients/auth.rs | 6 +++- crates/xline-client/src/clients/kv.rs | 6 +++- crates/xline/src/server/command.rs | 2 +- crates/xline/src/storage/auth_store/store.rs | 7 ++-- crates/xline/src/storage/kv_store.rs | 7 ++-- .../storage/lease_store/lease_collection.rs | 4 +-- crates/xline/src/storage/lease_store/mod.rs | 8 +++-- crates/xline/src/utils/args.rs | 33 +++++++++++-------- crates/xlinectl/src/command/watch.rs | 18 +++++----- crates/xlinectl/src/utils/parser.rs | 5 ++- 20 files changed, 93 insertions(+), 62 deletions(-) diff --git a/crates/curp-test-utils/src/test_cmd.rs b/crates/curp-test-utils/src/test_cmd.rs index 79bfdd502..ee56b26e5 100644 --- a/crates/curp-test-utils/src/test_cmd.rs +++ b/crates/curp-test-utils/src/test_cmd.rs @@ -361,7 +361,8 @@ impl CommandExecutor for TestCE { let Some(index) = self .store .get(META_TABLE, APPLIED_INDEX_KEY) - .map_err(|e| ExecuteError(e.to_string()))? else { + .map_err(|e| ExecuteError(e.to_string()))? + else { return Ok(0); }; let index = LogIndex::from_le_bytes(index.as_slice().try_into().unwrap()); @@ -379,7 +380,10 @@ impl CommandExecutor for TestCE { snapshot: Option<(Snapshot, LogIndex)>, ) -> Result<(), ::Error> { let Some((mut snapshot, index)) = snapshot else { - let ops = vec![WriteOperation::new_delete_range(TEST_TABLE, &[], &[0xff]),WriteOperation::new_delete(META_TABLE, APPLIED_INDEX_KEY.as_ref())]; + let ops = vec![ + WriteOperation::new_delete_range(TEST_TABLE, &[], &[0xff]), + WriteOperation::new_delete(META_TABLE, APPLIED_INDEX_KEY.as_ref()), + ]; self.store .write_batch(ops, true) .map_err(|e| ExecuteError(e.to_string()))?; diff --git a/crates/curp/src/server/cmd_worker/mod.rs b/crates/curp/src/server/cmd_worker/mod.rs index 8ede8acd7..47a79ba1a 100644 --- a/crates/curp/src/server/cmd_worker/mod.rs +++ b/crates/curp/src/server/cmd_worker/mod.rs @@ -151,8 +151,8 @@ async fn worker_as, RC: RoleChange>( let success = match entry.entry_data { EntryData::Command(ref cmd) => { let Some(prepare) = prepare else { - unreachable!("prepare should always be Some(_) when entry is a command"); - }; + unreachable!("prepare should always be Some(_) when entry is a command"); + }; let asr = ce.after_sync(cmd.as_ref(), entry.index, prepare).await; let asr_ok = asr.is_ok(); cb.write().insert_asr(entry.propose_id, asr); diff --git a/crates/curp/src/server/conflict/spec_pool_new.rs b/crates/curp/src/server/conflict/spec_pool_new.rs index 41b186e4b..88fde0c96 100644 --- a/crates/curp/src/server/conflict/spec_pool_new.rs +++ b/crates/curp/src/server/conflict/spec_pool_new.rs @@ -1,8 +1,7 @@ use curp_external_api::conflict::{ConflictPoolOp, SpeculativePoolOp}; -use crate::rpc::PoolEntry; - use super::{CommandEntry, ConfChangeEntry, ConflictPoolEntry}; +use crate::rpc::PoolEntry; /// A speculative pool object pub type SpObject = Box> + Send + 'static>; diff --git a/crates/curp/src/server/conflict/tests.rs b/crates/curp/src/server/conflict/tests.rs index 2fb7203f1..6dff83c80 100644 --- a/crates/curp/src/server/conflict/tests.rs +++ b/crates/curp/src/server/conflict/tests.rs @@ -2,13 +2,12 @@ use std::{cmp::Ordering, sync::Arc}; use curp_external_api::conflict::{ConflictPoolOp, SpeculativePoolOp, UncommittedPoolOp}; +use super::{spec_pool_new::SpeculativePool, CommandEntry}; use crate::{ rpc::{ConfChange, PoolEntry, PoolEntryInner, ProposeId}, server::conflict::uncommitted_pool::UncommittedPool, }; -use super::{spec_pool_new::SpeculativePool, CommandEntry}; - #[derive(Debug, Default)] struct TestSp { entries: Vec>, diff --git a/crates/curp/src/server/conflict/uncommitted_pool.rs b/crates/curp/src/server/conflict/uncommitted_pool.rs index ce2118fd3..6b38b34f5 100644 --- a/crates/curp/src/server/conflict/uncommitted_pool.rs +++ b/crates/curp/src/server/conflict/uncommitted_pool.rs @@ -1,8 +1,7 @@ use curp_external_api::conflict::{ConflictPoolOp, UncommittedPoolOp}; -use crate::rpc::PoolEntry; - use super::{CommandEntry, ConfChangeEntry, ConflictPoolEntry}; +use crate::rpc::PoolEntry; /// An uncommitted pool object pub type UcpObject = Box> + Send + 'static>; diff --git a/crates/curp/src/server/curp_node.rs b/crates/curp/src/server/curp_node.rs index 61da4da19..00ee0a0b1 100644 --- a/crates/curp/src/server/curp_node.rs +++ b/crates/curp/src/server/curp_node.rs @@ -485,7 +485,11 @@ impl CurpNode { break; } let Some(event) = remove_events.remove(&change.node_id) else { - unreachable!("({:?}) shutdown_event of removed follower ({:x}) should exist", curp.id(), change.node_id); + unreachable!( + "({:?}) shutdown_event of removed follower ({:x}) should exist", + curp.id(), + change.node_id + ); }; event.notify(1); } diff --git a/crates/curp/src/server/raw_curp/mod.rs b/crates/curp/src/server/raw_curp/mod.rs index 12f872c56..11f2411bd 100644 --- a/crates/curp/src/server/raw_curp/mod.rs +++ b/crates/curp/src/server/raw_curp/mod.rs @@ -1165,7 +1165,10 @@ impl RawCurp { }; let Some(next_index) = self.lst.get_next_index(follower_id) else { - warn!("follower {} is not found, it maybe has been removed", follower_id); + warn!( + "follower {} is not found, it maybe has been removed", + follower_id + ); return None; }; let log_r = self.log.read(); diff --git a/crates/engine/src/memory_engine/mod.rs b/crates/engine/src/memory_engine/mod.rs index e3e4e56c8..d5d964ac6 100644 --- a/crates/engine/src/memory_engine/mod.rs +++ b/crates/engine/src/memory_engine/mod.rs @@ -15,14 +15,13 @@ use parking_lot::RwLock; use tokio::io::AsyncWriteExt; use tokio_util::io::read_buf; +pub(super) use self::transaction::MemoryTransaction; use crate::{ api::{engine_api::StorageEngine, snapshot_api::SnapshotApi}, error::EngineError, WriteOperation, }; -pub(super) use self::transaction::MemoryTransaction; - /// A helper type to store the key-value pairs for the `MemoryEngine` type MemoryTable = HashMap, Vec>; diff --git a/crates/engine/src/rocksdb_engine/mod.rs b/crates/engine/src/rocksdb_engine/mod.rs index 470d386be..a3435cbad 100644 --- a/crates/engine/src/rocksdb_engine/mod.rs +++ b/crates/engine/src/rocksdb_engine/mod.rs @@ -14,21 +14,21 @@ use std::{ use bytes::{Buf, Bytes, BytesMut}; use clippy_utilities::{NumericCast, OverflowArithmetic}; use rocksdb::{ - Direction, Error as RocksError, IteratorMode, OptimisticTransactionDB, Options, SstFileWriter, ErrorKind as RocksErrorKind, + Direction, Error as RocksError, ErrorKind as RocksErrorKind, IteratorMode, + OptimisticTransactionDB, Options, SstFileWriter, }; use serde::{Deserialize, Serialize}; use tokio::{fs::File, io::AsyncWriteExt}; use tokio_util::io::read_buf; use tracing::warn; +pub(super) use self::transaction::RocksTransaction; use crate::{ api::{engine_api::StorageEngine, snapshot_api::SnapshotApi}, error::EngineError, WriteOperation, }; -pub(super) use self::transaction::RocksTransaction; - /// Install snapshot chunk size: 64KB const SNAPSHOT_CHUNK_SIZE: usize = 64 * 1024; @@ -246,13 +246,15 @@ impl StorageEngine for RocksEngine { } } match transaction.commit() { - Ok(_) => { + Ok(()) => { _ = self - .size - .fetch_add(size.numeric_cast(), std::sync::atomic::Ordering::Relaxed); + .size + .fetch_add(size.numeric_cast(), std::sync::atomic::Ordering::Relaxed); return Ok(()); } - Err(err) if matches!(err.kind(), RocksErrorKind::Busy | RocksErrorKind::TryAgain) => { + Err(err) + if matches!(err.kind(), RocksErrorKind::Busy | RocksErrorKind::TryAgain) => + { if retry_count > max_retry_count { warn!("Oops, txn commit retry count reach the max_retry_count: {max_retry_count}"); return Err(EngineError::UnderlyingError(err.to_string())); @@ -501,7 +503,11 @@ impl RocksSnapshot { /// path of current file fn current_file_path(&self, tmp: bool) -> PathBuf { - let Some(current_filename) = self.snap_files.get(self.snap_file_idx).map(|sf| &sf.filename) else { + let Some(current_filename) = self + .snap_files + .get(self.snap_file_idx) + .map(|sf| &sf.filename) + else { unreachable!("this method must be called when self.file_index < self.snap_files.len()") }; let filename = if tmp { diff --git a/crates/utils/src/task_manager/mod.rs b/crates/utils/src/task_manager/mod.rs index 7b385ff56..894b70170 100644 --- a/crates/utils/src/task_manager/mod.rs +++ b/crates/utils/src/task_manager/mod.rs @@ -184,7 +184,7 @@ impl TaskManager { let mut queue = Self::root_tasks_queue(&tasks); state.store(1, Ordering::Release); while let Some(v) = queue.pop_front() { - let Some((_name,mut task)) = tasks.remove(&v) else { + let Some((_name, mut task)) = tasks.remove(&v) else { continue; }; task.notifier.notify_waiters(); diff --git a/crates/xline-client/src/clients/auth.rs b/crates/xline-client/src/clients/auth.rs index 4b2eea3da..130747a72 100644 --- a/crates/xline-client/src/clients/auth.rs +++ b/crates/xline-client/src/clients/auth.rs @@ -730,7 +730,11 @@ impl AuthClient { .await??; cmd_res.into_inner() } else { - let (cmd_res, Some(sync_res)) = self.curp_client.propose(&cmd,self.token.as_ref(),false).await?? else { + let (cmd_res, Some(sync_res)) = self + .curp_client + .propose(&cmd, self.token.as_ref(), false) + .await?? + else { unreachable!("sync_res is always Some when use_fast_path is false"); }; let mut res_wrapper = cmd_res.into_inner(); diff --git a/crates/xline-client/src/clients/kv.rs b/crates/xline-client/src/clients/kv.rs index f11f173e4..3411c2164 100644 --- a/crates/xline-client/src/clients/kv.rs +++ b/crates/xline-client/src/clients/kv.rs @@ -214,7 +214,11 @@ impl KvClient { pub async fn txn(&self, request: TxnRequest) -> Result { let request = RequestWrapper::from(xlineapi::TxnRequest::from(request)); let cmd = Command::new(request.keys(), request); - let (cmd_res, Some(sync_res)) = self.curp_client.propose(&cmd,self.token.as_ref(), false).await?? else { + let (cmd_res, Some(sync_res)) = self + .curp_client + .propose(&cmd, self.token.as_ref(), false) + .await?? + else { unreachable!("sync_res is always Some when use_fast_path is false"); }; let mut res_wrapper = cmd_res.into_inner(); diff --git a/crates/xline/src/server/command.rs b/crates/xline/src/server/command.rs index 792b0ad69..510614ee1 100644 --- a/crates/xline/src/server/command.rs +++ b/crates/xline/src/server/command.rs @@ -181,7 +181,7 @@ where let cmd_size = size_estimate::cmd_size(cmd.request()); if self.persistent.estimated_file_size().overflow_add(cmd_size) > self.quota { let Ok(file_size) = self.persistent.file_size() else { - return false + return false; }; if file_size.overflow_add(cmd_size) > self.quota { warn!( diff --git a/crates/xline/src/storage/auth_store/store.rs b/crates/xline/src/storage/auth_store/store.rs index 52ccd1e2c..700b1f093 100644 --- a/crates/xline/src/storage/auth_store/store.rs +++ b/crates/xline/src/storage/auth_store/store.rs @@ -702,8 +702,11 @@ where if (req.role != ROOT_ROLE) && role.is_err() { return Err(ExecuteError::RoleNotFound(req.role.clone())); } - let Err(idx) = user.roles.binary_search(&req.role) else { - return Err(ExecuteError::UserAlreadyHasRole(req.user.clone(), req.role.clone())); + let Err(idx) = user.roles.binary_search(&req.role) else { + return Err(ExecuteError::UserAlreadyHasRole( + req.user.clone(), + req.role.clone(), + )); }; user.roles.insert(idx, req.role.clone()); if let Ok(role) = role { diff --git a/crates/xline/src/storage/kv_store.rs b/crates/xline/src/storage/kv_store.rs index 61c91eb84..ebbedf68e 100644 --- a/crates/xline/src/storage/kv_store.rs +++ b/crates/xline/src/storage/kv_store.rs @@ -264,11 +264,8 @@ where /// Get compact revision from db fn get_compact_revision(&self, revision_key: &str) -> Result, ExecuteError> { - let Some(revision_bytes)= self.inner - .db - .get_value(META_TABLE, revision_key)? - else { - return Ok(None); + let Some(revision_bytes) = self.inner.db.get_value(META_TABLE, revision_key)? else { + return Ok(None); }; let bytes = revision_bytes.try_into().map_err(|e| { ExecuteError::DbError(format!( diff --git a/crates/xline/src/storage/lease_store/lease_collection.rs b/crates/xline/src/storage/lease_store/lease_collection.rs index 677bbb94f..7a64698af 100644 --- a/crates/xline/src/storage/lease_store/lease_collection.rs +++ b/crates/xline/src/storage/lease_store/lease_collection.rs @@ -85,7 +85,7 @@ impl LeaseCollection { pub(crate) fn attach(&self, lease_id: i64, key: Vec) -> Result<(), ExecuteError> { let mut inner = self.inner.write(); let Some(lease) = inner.lease_map.get_mut(&lease_id) else { - return Err(ExecuteError::LeaseNotFound(lease_id)); + return Err(ExecuteError::LeaseNotFound(lease_id)); }; lease.insert_key(key.clone()); let _ignore = inner.item_map.insert(key, lease_id); @@ -96,7 +96,7 @@ impl LeaseCollection { pub(crate) fn detach(&self, lease_id: i64, key: &[u8]) -> Result<(), ExecuteError> { let mut inner = self.inner.write(); let Some(lease) = inner.lease_map.get_mut(&lease_id) else { - return Err(ExecuteError::LeaseNotFound(lease_id)); + return Err(ExecuteError::LeaseNotFound(lease_id)); }; lease.remove_key(key); let _ignore = inner.item_map.remove(key); diff --git a/crates/xline/src/storage/lease_store/mod.rs b/crates/xline/src/storage/lease_store/mod.rs index efedf8e27..4d0175bac 100644 --- a/crates/xline/src/storage/lease_store/mod.rs +++ b/crates/xline/src/storage/lease_store/mod.rs @@ -412,13 +412,17 @@ mod test { let _ignore4 = exe_and_sync_req(&lease_store, &req4, -1).await?; let resp_1 = exe_and_sync_req(&lease_store, &req6, -1).await?; - let ResponseWrapper::LeaseLeasesResponse(leases_1) = resp_1 else { panic!("wrong response type: {resp_1:?}"); }; + let ResponseWrapper::LeaseLeasesResponse(leases_1) = resp_1 else { + panic!("wrong response type: {resp_1:?}"); + }; assert_eq!(leases_1.leases[0].id, 3); assert_eq!(leases_1.leases[1].id, 4); let _ignore5 = exe_and_sync_req(&lease_store, &req5, -1).await?; let resp_2 = exe_and_sync_req(&lease_store, &req6, -1).await?; - let ResponseWrapper::LeaseLeasesResponse(leases_2) = resp_2 else { panic!("wrong response type: {resp_2:?}"); }; + let ResponseWrapper::LeaseLeasesResponse(leases_2) = resp_2 else { + panic!("wrong response type: {resp_2:?}"); + }; assert_eq!(leases_2.leases[0].id, 4); Ok(()) diff --git a/crates/xline/src/utils/args.rs b/crates/xline/src/utils/args.rs index 3379d0439..199d26b7e 100644 --- a/crates/xline/src/utils/args.rs +++ b/crates/xline/src/utils/args.rs @@ -235,19 +235,26 @@ impl From for XlineServerConfig { let storage = StorageConfig::new(engine, args.quota.unwrap_or_else(default_quota)); let Ok(curp_config) = CurpConfigBuilder::default() - .heartbeat_interval(args.heartbeat_interval - .unwrap_or_else(default_heartbeat_interval)) - .wait_synced_timeout(args.server_wait_synced_timeout - .unwrap_or_else(default_server_wait_synced_timeout)) - .rpc_timeout(args.rpc_timeout.unwrap_or_else(default_rpc_timeout)) - .batch_timeout(args.batch_timeout.unwrap_or_else(default_batch_timeout)) - .batch_max_size(args.batch_max_size.unwrap_or_else(default_batch_max_size)) - .follower_timeout_ticks(args.follower_timeout_ticks) - .candidate_timeout_ticks(args.candidate_timeout_ticks) - .engine_cfg(curp_engine) - .gc_interval(args.gc_interval.unwrap_or_else(default_gc_interval)) - .cmd_workers(args.cmd_workers) - .build() else { panic!("failed to create curp config") }; + .heartbeat_interval( + args.heartbeat_interval + .unwrap_or_else(default_heartbeat_interval), + ) + .wait_synced_timeout( + args.server_wait_synced_timeout + .unwrap_or_else(default_server_wait_synced_timeout), + ) + .rpc_timeout(args.rpc_timeout.unwrap_or_else(default_rpc_timeout)) + .batch_timeout(args.batch_timeout.unwrap_or_else(default_batch_timeout)) + .batch_max_size(args.batch_max_size.unwrap_or_else(default_batch_max_size)) + .follower_timeout_ticks(args.follower_timeout_ticks) + .candidate_timeout_ticks(args.candidate_timeout_ticks) + .engine_cfg(curp_engine) + .gc_interval(args.gc_interval.unwrap_or_else(default_gc_interval)) + .cmd_workers(args.cmd_workers) + .build() + else { + panic!("failed to create curp config") + }; let client_config = ClientConfig::new( args.client_wait_synced_timeout .unwrap_or_else(default_client_wait_synced_timeout), diff --git a/crates/xlinectl/src/command/watch.rs b/crates/xlinectl/src/command/watch.rs index 15cf800df..af2c86f8f 100644 --- a/crates/xlinectl/src/command/watch.rs +++ b/crates/xlinectl/src/command/watch.rs @@ -102,8 +102,8 @@ async fn exec_interactive(client: &mut Client, matches: &ArgMatches) -> Result<( let mut line = String::new(); let _n = io::stdin().read_line(&mut line)?; let Some(args) = shlex::split(&line) else { - failed!(line); - }; + failed!(line); + }; if args.len() < 2 { failed!(line); @@ -115,8 +115,8 @@ async fn exec_interactive(client: &mut Client, matches: &ArgMatches) -> Result<( match args.next().unwrap() { "watch" => { let Some(key) = args.next() else { - failed!(line); - }; + failed!(line); + }; let request = req_builder(key, args.next()); let (new_watcher, mut stream) = client.watch_client().watch(request).await?; watcher = Some(new_watcher); @@ -129,13 +129,13 @@ async fn exec_interactive(client: &mut Client, matches: &ArgMatches) -> Result<( } "cancel" => { let Some(watcher) = watcher.as_mut() else { - eprintln!("No currently active watch"); - continue; - }; + eprintln!("No currently active watch"); + continue; + }; let cancel_result = if let Some(id_str) = args.next() { let Ok(id) = id_str.parse() else { - failed!(line); - }; + failed!(line); + }; watcher.cancel_by_id(id) } else { watcher.cancel() diff --git a/crates/xlinectl/src/utils/parser.rs b/crates/xlinectl/src/utils/parser.rs index 6dc3bd6cd..2a0e4e492 100644 --- a/crates/xlinectl/src/utils/parser.rs +++ b/crates/xlinectl/src/utils/parser.rs @@ -24,9 +24,8 @@ pub(crate) fn parse_user(matches: &ArgMatches) -> Result