Skip to content

Commit

Permalink
docs: add empty lines
Browse files Browse the repository at this point in the history
Signed-off-by: lxl66566 <[email protected]>
  • Loading branch information
lxl66566 committed Jul 15, 2024
1 parent 245560c commit 336f454
Show file tree
Hide file tree
Showing 31 changed files with 122 additions and 10 deletions.
12 changes: 12 additions & 0 deletions crates/curp-external-api/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub trait Command: pri::Serializable + ConflictCheck + PbCodec {
type Error: pri::Serializable + PbCodec + std::error::Error;

/// K (key) is used to tell confliction
///
/// The key can be a single key or a key range
type K: pri::Serializable + Eq + Hash + ConflictCheck;

Expand All @@ -52,6 +53,7 @@ pub trait Command: pri::Serializable + ConflictCheck + PbCodec {
/// Prepare the command
///
/// # Errors
///
/// Return `Self::Error` when `CommandExecutor::prepare` goes wrong
#[inline]
fn prepare<E>(&self, e: &E) -> Result<Self::PR, Self::Error>
Expand All @@ -64,6 +66,7 @@ pub trait Command: pri::Serializable + ConflictCheck + PbCodec {
/// Execute the command according to the executor
///
/// # Errors
///
/// Return `Self::Error` when `CommandExecutor::execute` goes wrong
#[inline]
async fn execute<E>(&self, e: &E) -> Result<Self::ER, Self::Error>
Expand All @@ -76,6 +79,7 @@ pub trait Command: pri::Serializable + ConflictCheck + PbCodec {
/// Execute the command after_sync callback
///
/// # Errors
///
/// Return `Self::Error` when `CommandExecutor::after_sync` goes wrong
#[inline]
async fn after_sync<E>(
Expand Down Expand Up @@ -116,6 +120,7 @@ impl ConflictCheck for u32 {
}

/// Command executor which actually executes the command.
///
/// It is usually defined by the protocol user.
#[async_trait]
pub trait CommandExecutor<C>: pri::ThreadSafe
Expand All @@ -125,18 +130,21 @@ where
/// Prepare the command
///
/// # Errors
///
/// This function may return an error if there is a problem preparing the command.
fn prepare(&self, cmd: &C) -> Result<C::PR, C::Error>;

/// Execute the command
///
/// # Errors
///
/// This function may return an error if there is a problem executing the command.
async fn execute(&self, cmd: &C) -> Result<C::ER, C::Error>;

/// Execute the after_sync callback
///
/// # Errors
///
/// This function may return an error if there is a problem executing the after_sync callback.
async fn after_sync(
&self,
Expand All @@ -148,24 +156,28 @@ where
/// Set the index of the last log entry that has been successfully applied to the command executor
///
/// # Errors
///
/// Returns an error if setting the last applied log entry fails.
fn set_last_applied(&self, index: LogIndex) -> Result<(), C::Error>;

/// Get the index of the last log entry that has been successfully applied to the command executor
///
/// # Errors
///
/// Returns an error if retrieval of the last applied log entry fails.
fn last_applied(&self) -> Result<LogIndex, C::Error>;

/// Take a snapshot
///
/// # Errors
///
/// This function may return an error if there is a problem taking a snapshot.
async fn snapshot(&self) -> Result<Snapshot, C::Error>;

/// Reset the command executor using the snapshot or to the initial state if None
///
/// # Errors
///
/// This function may return an error if there is a problem resetting the command executor.
async fn reset(&self, snapshot: Option<(Snapshot, LogIndex)>) -> Result<(), C::Error>;

Expand Down
1 change: 1 addition & 0 deletions crates/curp/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ pub trait ClientApi {

/// Send fetch cluster requests to all servers (That's because initially, we didn't
/// know who the leader is.)
///
/// Note: The fetched cluster may still be outdated if `linearizable` is false
async fn fetch_cluster(&self, linearizable: bool) -> Result<FetchClusterResponse, Self::Error>;

Expand Down
1 change: 1 addition & 0 deletions crates/curp/src/client/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ where

/// Send fetch cluster requests to all servers (That's because initially, we didn't
/// know who the leader is.)
///
/// Note: The fetched cluster may still be outdated if `linearizable` is false
async fn fetch_cluster(
&self,
Expand Down
1 change: 1 addition & 0 deletions crates/curp/src/client/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,7 @@ impl ConnectApi for MockedStreamConnectApi {
}

/// Create mocked stream connects
///
/// The leader is S0
#[allow(trivial_casts)] // cannot be inferred
fn init_mocked_stream_connects(
Expand Down
5 changes: 5 additions & 0 deletions crates/curp/src/client/unary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub(super) struct UnaryConfig {
/// The rpc timeout of a propose request
propose_timeout: Duration,
/// The rpc timeout of a 2-RTT request, usually takes longer than propose timeout
///
/// The recommended the values is within (propose_timeout, 2 * propose_timeout].
wait_synced_timeout: Duration,
}
Expand Down Expand Up @@ -60,9 +61,13 @@ impl<C: Command> Unary<C> {
}

/// Get a handle `f` and apply to the leader
///
/// NOTICE:
///
/// The leader might be outdate if the local state is stale.
///
/// `map_leader` should never be invoked in [`ClientApi::fetch_cluster`]
///
/// `map_leader` might call `fetch_leader_id`, `fetch_cluster`, finally
/// result in stack overflow.
async fn map_leader<R, F: Future<Output = Result<R, CurpError>>>(
Expand Down
8 changes: 7 additions & 1 deletion crates/curp/src/members.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ impl ClusterInfo {
}

/// Construct a new `ClusterInfo` from members map
///
/// # Panics
///
/// panic if `all_members` is empty
#[inline]
#[must_use]
Expand Down Expand Up @@ -131,7 +133,9 @@ impl ClusterInfo {
}

/// Construct a new `ClusterInfo` from `FetchClusterResponse`
///
/// # Panics
///
/// panic if `cluster.members` doesn't contain `self_addr`
#[inline]
#[must_use]
Expand Down Expand Up @@ -234,7 +238,9 @@ impl ClusterInfo {
}

/// Get the current member
///
/// # Panics
///
/// panic if self member id is not in members
#[allow(clippy::unwrap_used)] // self member id must be in members
#[must_use]
Expand All @@ -257,7 +263,7 @@ impl ClusterInfo {
self.self_member().client_urls.clone()
}

/// Get the current server id
/// Get the current server name
#[must_use]
#[inline]
pub fn self_name(&self) -> String {
Expand Down
1 change: 1 addition & 0 deletions crates/curp/src/rpc/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,7 @@ impl<T: Protocol> BypassedConnect<T> {
const BYPASS_KEY: &str = "bypass";

/// Inject bypassed message into a request's metadata and check if it is a bypassed request.
///
/// A bypass request can skip the check for lease expiration (there will never be a disconnection from oneself).
pub(crate) trait Bypass {
/// Inject into metadata
Expand Down
4 changes: 4 additions & 0 deletions crates/curp/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,9 @@ impl ProposeRequest {
}

/// Get command
///
/// # Errors
///
/// Return error if the command can't be decoded
#[inline]
pub fn cmd<C: Command>(&self) -> Result<C, PbSerializeError> {
Expand Down Expand Up @@ -619,6 +621,7 @@ impl PublishRequest {
}

/// NOTICE:
///
/// Please check test case `test_unary_fast_round_return_early_err` `test_unary_propose_return_early_err`
/// `test_retry_propose_return_no_retry_error` `test_retry_propose_return_retry_error` if you added some
/// new [`CurpError`]
Expand Down Expand Up @@ -891,6 +894,7 @@ impl<C> From<Vec<ConfChange>> for PoolEntryInner<C> {
}

/// Command Id wrapper, which is used to identify a command
///
/// The underlying data is a tuple of (`client_id`, `seq_num`)
#[derive(
Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Ord, PartialOrd, Default,
Expand Down
4 changes: 4 additions & 0 deletions crates/curp/src/server/cmd_worker/conflict_checked_mpmc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ use crate::{
/// Cart
mod cart {
/// Cart is a utility that acts as a temporary container.
///
/// It is usually filled by the provider and consumed by the customer.
///
/// This is useful when we are sure that the provider will fill the cart and the cart will be consumed by the customer
/// so that we don't need to check whether there is something in the `Option`.
#[derive(Debug)]
Expand Down Expand Up @@ -188,6 +190,7 @@ enum OnceState {
}

/// The filter will block any msg if its predecessors(msgs that arrive earlier and conflict with it) haven't finished process
///
/// Internally it maintains a dependency graph of conflicting cmds
struct Filter<C: Command, CE> {
Expand Down Expand Up @@ -318,6 +321,7 @@ impl<C: Command, CE: CommandExecutor<C>> Filter<C, CE> {
}

/// Update the vertex, see if it can progress
///
/// Return true if it can be removed
#[allow(clippy::expect_used, clippy::too_many_lines)] // TODO: split this function
fn update_vertex(&mut self, vid: u64) -> bool {
Expand Down
7 changes: 5 additions & 2 deletions crates/curp/src/server/curp_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -722,8 +722,11 @@ impl<C: Command, RC: RoleChange> CurpNode<C, RC> {
}

/// Candidate or pre candidate broadcasts votes
/// Return `Some(vote)` if bcast pre vote and success
/// Return `None` if bcast pre vote and fail or bcast vote
///
/// # Returns
///
/// - `Some(vote)` if bcast pre vote and success
/// - `None` if bcast pre vote and fail or bcast vote
async fn bcast_vote(curp: &RawCurp<C, RC>, vote: Vote) -> Option<Vote> {
if vote.is_pre_vote {
debug!("{} broadcasts pre votes to all servers", curp.id());
Expand Down
1 change: 1 addition & 0 deletions crates/curp/src/server/lease_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const DEFAULT_LEASE_TTL: Duration = Duration::from_secs(8);
/// Lease manager
pub(crate) struct LeaseManager {
/// client_id => expired_at
///
/// expiry queue to check the smallest expired_at
pub(super) expiry_queue: PriorityQueue<u64, Reverse<Instant>>,
}
Expand Down
8 changes: 6 additions & 2 deletions crates/curp/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ mod metrics;
pub use storage::{db::DB, StorageApi, StorageError};

/// The Rpc Server to handle rpc requests
///
/// This Wrapper is introduced due to the `MadSim` rpc lib
#[derive(Debug)]
pub struct Rpc<C: Command, RC: RoleChange> {
Expand Down Expand Up @@ -231,7 +232,9 @@ impl<C: Command, RC: RoleChange> crate::rpc::InnerProtocol for Rpc<C, RC> {

impl<C: Command, RC: RoleChange> Rpc<C, RC> {
/// New `Rpc`
///
/// # Panics
///
/// Panic if storage creation failed
#[inline]
#[allow(clippy::too_many_arguments)] // TODO: refactor this use builder pattern
Expand Down Expand Up @@ -278,8 +281,9 @@ impl<C: Command, RC: RoleChange> Rpc<C, RC> {
/// Run a new rpc server on a specific addr, designed to be used in the tests
///
/// # Errors
/// `ServerError::ParsingError` if parsing failed for the local server address
/// `ServerError::RpcError` if any rpc related error met
///
/// - `ServerError::ParsingError` if parsing failed for the local server address
/// - `ServerError::RpcError` if any rpc related error met
#[cfg(madsim)]
#[allow(clippy::too_many_arguments)]
#[inline]
Expand Down
2 changes: 2 additions & 0 deletions crates/curp/src/server/raw_curp/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ impl<T> From<RangeInclusive<T>> for LogRange<T> {
}

/// Curp logs
///
/// There exists a fake log entry 0 whose term equals 0
///
/// For the leader, there should never be a gap between snapshot and entries
///
/// Examples:
Expand Down
3 changes: 3 additions & 0 deletions crates/curp/src/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const USIZE_BITS: usize = std::mem::size_of::<usize>() * 8;
const DEFAULT_BIT_VEC_QUEUE_CAP: usize = 1024;

/// A one-direction bit vector queue
///
/// It use a ring buffer `VecDeque<usize>` to store bits
///
/// Memory Layout:
Expand Down Expand Up @@ -161,7 +162,9 @@ impl BitVecQueue {
}

/// Split this bit vec queue to `at`
///
/// e.g.
///
/// 001100 -> `split_at(2)` -> 1100
fn split_at(&mut self, at: usize) {
if self.store.is_empty() {
Expand Down
5 changes: 5 additions & 0 deletions crates/engine/src/api/engine_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ pub trait StorageEngine: Send + Sync + 'static + std::fmt::Debug {
fn transaction(&self) -> Self::Transaction<'_>;

/// Get all the values of the given table
///
/// # Errors
///
/// Return `EngineError::TableNotFound` if the given table does not exist
/// Return `EngineError` if met some errors
#[allow(clippy::type_complexity)] // it's clear that (Vec<u8>, Vec<u8>) is a key-value pair
Expand All @@ -23,6 +25,7 @@ pub trait StorageEngine: Send + Sync + 'static + std::fmt::Debug {
/// Get a snapshot of the current state of the database
///
/// # Errors
///
/// Return `EngineError` if met some errors when creating the snapshot
fn get_snapshot(
&self,
Expand All @@ -33,6 +36,7 @@ pub trait StorageEngine: Send + Sync + 'static + std::fmt::Debug {
/// Apply a snapshot to the database
///
/// # Errors
///
/// Return `EngineError` if met some errors when applying the snapshot
async fn apply_snapshot(
&self,
Expand All @@ -46,6 +50,7 @@ pub trait StorageEngine: Send + Sync + 'static + std::fmt::Debug {
/// Get the file size of the engine (Measured in bytes)
///
/// # Errors
///
/// Return `EngineError` if met some errors when get file size
fn file_size(&self) -> Result<u64, EngineError>;
}
10 changes: 8 additions & 2 deletions crates/engine/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ impl<E> Layer<E> {

impl Layer<RocksEngine> {
/// Apply snapshot from file, only works for `RocksEngine`
///
/// # Errors
///
/// Return `EngineError` when `RocksDB` returns an error.
#[inline]
pub async fn apply_snapshot_from_file(
Expand Down Expand Up @@ -68,9 +70,11 @@ where
}

/// Get all the values of the given table
///
/// # Errors
/// Return `EngineError::TableNotFound` if the given table does not exist
/// Return `EngineError` if met some errors
///
/// - Return `EngineError::TableNotFound` if the given table does not exist
/// - Return `EngineError` if met some errors
#[allow(clippy::type_complexity)] // it's clear that (Vec<u8>, Vec<u8>) is a key-value pair
fn get_all(&self, table: &str) -> Result<Vec<(Vec<u8>, Vec<u8>)>, EngineError> {
self.engine.get_all(table)
Expand All @@ -79,6 +83,7 @@ where
/// Get a snapshot of the current state of the database
///
/// # Errors
///
/// Return `EngineError` if met some errors when creating the snapshot
fn get_snapshot(
&self,
Expand All @@ -91,6 +96,7 @@ where
/// Apply a snapshot to the database
///
/// # Errors
///
/// Return `EngineError` if met some errors when applying the snapshot
async fn apply_snapshot(
&self,
Expand Down
2 changes: 2 additions & 0 deletions crates/engine/src/mock_rocksdb_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,9 @@ impl RocksSnapshot {
}

/// Create a new mock snapshot for receiving
///
/// # Errors
///
/// Return `EngineError` when create directory failed.
#[inline]
#[allow(clippy::unnecessary_wraps)] // the real rocksdb engine need the Result wrap
Expand Down
Loading

0 comments on commit 336f454

Please sign in to comment.