Skip to content

Commit

Permalink
Feature: Add TypeConfigExt to simplify RaftTypeConfig Access
Browse files Browse the repository at this point in the history
This commit introduces a new trait, `TypeConfigExt`, which extends
`RaftTypeConfig`. The purpose of this trait is to simplify the access to
various functionalities provided by the `RaftTypeConfig` trait,
enhancing code readability and reducing complexity.

**Methods Added to `TypeConfigExt`:**
- `now()`
- `sleep()`
- `sleep_until()`
- `timeout()`
- `timeout_at()`
- `oneshot()`
- `spawn()`

**Usage Improvement:**
- Instead of using the
  `<<C as RaftTypeConfig>::AsyncRuntime as AsyncRuntime>::Instant::now()`,
  you can now simply call `C::now()`.
  • Loading branch information
drmingdrmer committed Jul 4, 2024
1 parent b06cbb3 commit c71a95d
Show file tree
Hide file tree
Showing 23 changed files with 160 additions and 102 deletions.
25 changes: 10 additions & 15 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,10 @@ use crate::storage::LogFlushed;
use crate::storage::RaftLogReaderExt;
use crate::storage::RaftLogStorage;
use crate::storage::RaftStateMachine;
use crate::type_config::alias::AsyncRuntimeOf;
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::OneshotReceiverOf;
use crate::type_config::alias::ResponderOf;
use crate::type_config::TypeConfigExt;
use crate::AsyncRuntime;
use crate::ChangeMembers;
use crate::Instant;
Expand Down Expand Up @@ -146,7 +146,7 @@ pub(crate) struct LeaderData<C: RaftTypeConfig> {
impl<C: RaftTypeConfig> LeaderData<C> {
pub(crate) fn new() -> Self {
Self {
next_heartbeat: InstantOf::<C>::now(),
next_heartbeat: C::now(),
}
}
}
Expand Down Expand Up @@ -501,16 +501,12 @@ where
/// Currently heartbeat is a blank log
#[tracing::instrument(level = "debug", skip_all, fields(id = display(self.id)))]
pub fn send_heartbeat(&mut self, emitter: impl Display) -> bool {
tracing::debug!(now = debug(InstantOf::<C>::now()), "send_heartbeat");
tracing::debug!(now = debug(C::now()), "send_heartbeat");

let mut lh = if let Some((lh, _)) = self.engine.get_leader_handler_or_reject(None) {
lh
} else {
tracing::debug!(
now = debug(InstantOf::<C>::now()),
"{} failed to send heartbeat",
emitter
);
tracing::debug!(now = debug(C::now()), "{} failed to send heartbeat", emitter);
return false;
};

Expand Down Expand Up @@ -1136,7 +1132,7 @@ where
self.handle_append_entries_request(rpc, tx);
}
RaftMsg::RequestVote { rpc, tx } => {
let now = InstantOf::<C>::now();
let now = C::now();
tracing::info!(
now = display(now.display()),
vote_request = display(&rpc),
Expand Down Expand Up @@ -1223,7 +1219,7 @@ where
resp,
sender_vote,
} => {
let now = InstantOf::<C>::now();
let now = C::now();

tracing::info!(
now = display(now.display()),
Expand Down Expand Up @@ -1259,7 +1255,7 @@ where
Notify::Tick { i } => {
// check every timer

let now = InstantOf::<C>::now();
let now = C::now();
tracing::debug!("received tick: {}, now: {:?}", i, now);

self.handle_tick_election();
Expand All @@ -1277,8 +1273,7 @@ where

// Install next heartbeat
if let Some(l) = &mut self.leader_data {
l.next_heartbeat =
InstantOf::<C>::now() + Duration::from_millis(self.config.heartbeat_interval);
l.next_heartbeat = C::now() + Duration::from_millis(self.config.heartbeat_interval);
}
}
}
Expand Down Expand Up @@ -1417,7 +1412,7 @@ where

#[tracing::instrument(level = "debug", skip_all)]
fn handle_tick_election(&mut self) {
let now = InstantOf::<C>::now();
let now = C::now();

tracing::debug!("try to trigger election by tick, now: {:?}", now);

Expand Down Expand Up @@ -1646,7 +1641,7 @@ where

// False positive lint warning(`non-binding `let` on a future`): https://github.com/rust-lang/rust-clippy/issues/9932
#[allow(clippy::let_underscore_future)]
let _ = AsyncRuntimeOf::<C>::spawn(async move {
let _ = C::spawn(async move {
for (log_index, tx) in removed.into_iter() {
tx.send(Err(ClientWriteError::ForwardToLeader(ForwardToLeader {
leader_id,
Expand Down
5 changes: 2 additions & 3 deletions openraft/src/core/sm/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
use tokio::sync::mpsc;

use crate::core::sm;
use crate::type_config::alias::AsyncRuntimeOf;
use crate::type_config::alias::JoinHandleOf;
use crate::AsyncRuntime;
use crate::type_config::TypeConfigExt;
use crate::RaftTypeConfig;
use crate::Snapshot;

Expand Down Expand Up @@ -55,7 +54,7 @@ where C: RaftTypeConfig
/// If the state machine worker has shutdown, it will return an error.
/// If there is not snapshot available, it will return `Ok(None)`.
pub(crate) async fn get_snapshot(&self) -> Result<Option<Snapshot<C>>, &'static str> {
let (tx, rx) = AsyncRuntimeOf::<C>::oneshot();
let (tx, rx) = C::oneshot();

let cmd = sm::Command::get_snapshot(tx);
tracing::debug!("SnapshotReader sending command to sm::Worker: {:?}", cmd);
Expand Down
18 changes: 7 additions & 11 deletions openraft/src/core/tick.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,8 @@ use tracing::Level;
use tracing::Span;

use crate::core::notify::Notify;
use crate::type_config::alias::AsyncRuntimeOf;
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::JoinHandleOf;
use crate::AsyncRuntime;
use crate::Instant;
use crate::type_config::TypeConfigExt;
use crate::RaftTypeConfig;

/// Emit RaftMsg::Tick event at regular `interval`.
Expand Down Expand Up @@ -68,7 +65,7 @@ where C: RaftTypeConfig

let shutdown = Mutex::new(Some(shutdown));

let join_handle = AsyncRuntimeOf::<C>::spawn(this.tick_loop(shutdown_rx).instrument(tracing::span!(
let join_handle = C::spawn(this.tick_loop(shutdown_rx).instrument(tracing::span!(
parent: &Span::current(),
Level::DEBUG,
"tick"
Expand All @@ -87,8 +84,8 @@ where C: RaftTypeConfig
let mut cancel = std::pin::pin!(cancel_rx);

loop {
let at = InstantOf::<C>::now() + self.interval;
let mut sleep_fut = AsyncRuntimeOf::<C>::sleep_until(at);
let at = C::now() + self.interval;
let mut sleep_fut = C::sleep_until(at);
let sleep_fut = std::pin::pin!(sleep_fut);
let cancel_fut = cancel.as_mut();

Expand Down Expand Up @@ -159,8 +156,7 @@ mod tests {
use tokio::time::Duration;

Check warning on line 156 in openraft/src/core/tick.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, single-term-leader,serde,singlethreaded)

unused import: `tokio::time::Duration`

Check warning on line 156 in openraft/src/core/tick.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, single-term-leader,serde,singlethreaded)

unused import: `tokio::time::Duration`

use crate::core::Tick;

Check warning on line 158 in openraft/src/core/tick.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, single-term-leader,serde,singlethreaded)

unused import: `crate::core::Tick`

Check warning on line 158 in openraft/src/core/tick.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, single-term-leader,serde,singlethreaded)

unused import: `crate::core::Tick`
use crate::type_config::alias::AsyncRuntimeOf;
use crate::AsyncRuntime;
use crate::type_config::TypeConfigExt;

Check warning on line 159 in openraft/src/core/tick.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, single-term-leader,serde,singlethreaded)

unused import: `crate::type_config::TypeConfigExt`

Check warning on line 159 in openraft/src/core/tick.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, single-term-leader,serde,singlethreaded)

unused import: `crate::type_config::TypeConfigExt`
use crate::RaftTypeConfig;
use crate::TokioRuntime;

Expand All @@ -187,9 +183,9 @@ mod tests {
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let th = Tick::<TickUTConfig>::spawn(Duration::from_millis(100), tx, true);

AsyncRuntimeOf::<TickUTConfig>::sleep(Duration::from_millis(500)).await;
TickUTConfig::sleep(Duration::from_millis(500)).await;
let _ = th.shutdown().unwrap().await;
AsyncRuntimeOf::<TickUTConfig>::sleep(Duration::from_millis(500)).await;
TickUTConfig::sleep(Duration::from_millis(500)).await;

let mut received = vec![];
while let Some(x) = rx.recv().await {
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/display_ext.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Implement [`fmt::Display`] for types such as `Option<T>` and slice `&[T]`.
//! Implement [`std::fmt::Display`] for types such as `Option<T>` and slice `&[T]`.

pub(crate) mod display_instant;
pub(crate) mod display_option;
Expand Down
7 changes: 3 additions & 4 deletions openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,9 @@ use crate::raft::VoteRequest;
use crate::raft::VoteResponse;
use crate::raft_state::LogStateReader;
use crate::raft_state::RaftState;
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::ResponderOf;
use crate::type_config::alias::SnapshotDataOf;
use crate::Instant;
use crate::type_config::TypeConfigExt;
use crate::LogId;
use crate::LogIdOptionExt;
use crate::Membership;
Expand Down Expand Up @@ -123,7 +122,7 @@ where C: RaftTypeConfig
/// The candidate `last_log_id` is initialized with the attributes of Acceptor part:
/// [`RaftState`]
pub(crate) fn new_candidate(&mut self, vote: Vote<C::NodeId>) -> &mut Candidate<C, LeaderQuorumSet<C::NodeId>> {
let now = InstantOf::<C>::now();
let now = C::now();
let last_log_id = self.state.last_log_id().copied();

let membership = self.state.membership_state.effective().membership();
Expand Down Expand Up @@ -282,7 +281,7 @@ where C: RaftTypeConfig

#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn handle_vote_req(&mut self, req: VoteRequest<C>) -> VoteResponse<C> {
let now = InstantOf::<C>::now();
let now = C::now();
let lease = self.config.timer_config.leader_lease;
let vote = self.state.vote_ref();

Expand Down
9 changes: 4 additions & 5 deletions openraft/src/engine/handler/vote_handler/accept_vote_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ use crate::engine::Respond;
use crate::error::Infallible;
use crate::raft::VoteResponse;
use crate::testing::log_id;
use crate::type_config::alias::AsyncRuntimeOf;
use crate::type_config::TypeConfigExt;
use crate::utime::UTime;
use crate::AsyncRuntime;
use crate::EffectiveMembership;
use crate::Membership;
use crate::TokioInstant;
Expand Down Expand Up @@ -48,12 +47,12 @@ fn test_accept_vote_reject_smaller_vote() -> anyhow::Result<()> {
let mut eng = eng();
eng.output.take_commands();

let (tx, _rx) = AsyncRuntimeOf::<UTConfig>::oneshot();
let (tx, _rx) = UTConfig::<()>::oneshot();
let resp = eng.vote_handler().accept_vote(&Vote::new(1, 2), tx, |_state, _err| mk_res());

assert!(resp.is_none());

let (tx, _rx) = AsyncRuntimeOf::<UTConfig>::oneshot();
let (tx, _rx) = UTConfig::<()>::oneshot();
assert_eq!(
vec![
//
Expand All @@ -74,7 +73,7 @@ fn test_accept_vote_granted_greater_vote() -> anyhow::Result<()> {
let mut eng = eng();
eng.output.take_commands();

let (tx, _rx) = AsyncRuntimeOf::<UTConfig>::oneshot();
let (tx, _rx) = UTConfig::<()>::oneshot();
let resp = eng.vote_handler().accept_vote(&Vote::new(3, 3), tx, |_state, _err| mk_res());

assert!(resp.is_some());
Expand Down
9 changes: 4 additions & 5 deletions openraft/src/engine/handler/vote_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ use crate::error::RejectVoteRequest;
use crate::proposer::CandidateState;
use crate::proposer::LeaderState;
use crate::raft_state::LogStateReader;
use crate::type_config::alias::InstantOf;
use crate::Instant;
use crate::type_config::TypeConfigExt;
use crate::LogId;
use crate::OptionalSend;
use crate::RaftState;
Expand Down Expand Up @@ -133,15 +132,15 @@ where C: RaftTypeConfig
if vote > self.state.vote_ref() {
tracing::info!("vote is changing from {} to {}", self.state.vote_ref(), vote);

self.state.vote.update(InstantOf::<C>::now(), *vote);
self.state.vote.update(C::now(), *vote);
self.output.push_command(Command::SaveVote { vote: *vote });
} else {
self.state.vote.touch(InstantOf::<C>::now());
self.state.vote.touch(C::now());
}

// Update vote related timer and lease.

tracing::debug!(now = debug(InstantOf::<C>::now()), "{}", func_name!());
tracing::debug!(now = debug(C::now()), "{}", func_name!());

self.update_internal_server_state();

Expand Down
11 changes: 5 additions & 6 deletions openraft/src/engine/tests/install_full_snapshot_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ use crate::engine::LogIdList;
use crate::engine::Respond;
use crate::raft::SnapshotResponse;
use crate::testing::log_id;
use crate::type_config::alias::AsyncRuntimeOf;
use crate::AsyncRuntime;
use crate::type_config::TypeConfigExt;
use crate::Membership;
use crate::Snapshot;
use crate::SnapshotMeta;
Expand Down Expand Up @@ -63,7 +62,7 @@ fn test_handle_install_full_snapshot_lt_last_snapshot() -> anyhow::Result<()> {

let curr_vote = *eng.state.vote_ref();

let (tx, _rx) = AsyncRuntimeOf::<UTConfig>::oneshot();
let (tx, _rx) = UTConfig::<()>::oneshot();

eng.handle_install_full_snapshot(
curr_vote,
Expand All @@ -87,7 +86,7 @@ fn test_handle_install_full_snapshot_lt_last_snapshot() -> anyhow::Result<()> {
eng.state.snapshot_meta
);

let (dummy_tx, _rx) = AsyncRuntimeOf::<UTConfig>::oneshot();
let (dummy_tx, _rx) = UTConfig::<()>::oneshot();
assert_eq!(
vec![
//
Expand All @@ -111,7 +110,7 @@ fn test_handle_install_full_snapshot_no_conflict() -> anyhow::Result<()> {

let curr_vote = *eng.state.vote_ref();

let (tx, _rx) = AsyncRuntimeOf::<UTConfig>::oneshot();
let (tx, _rx) = UTConfig::<()>::oneshot();

eng.handle_install_full_snapshot(
curr_vote,
Expand All @@ -135,7 +134,7 @@ fn test_handle_install_full_snapshot_no_conflict() -> anyhow::Result<()> {
eng.state.snapshot_meta
);

let (dummy_tx, _rx) = AsyncRuntimeOf::<UTConfig>::oneshot();
let (dummy_tx, _rx) = UTConfig::<()>::oneshot();
assert_eq!(
vec![
//
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ pub(crate) mod log_id_range;
pub(crate) mod proposer;
pub(crate) mod raft_state;
pub(crate) mod timer;
pub(crate) mod type_config;
pub(crate) mod utime;

pub mod async_runtime;
Expand All @@ -63,6 +62,7 @@ pub mod network;
pub mod raft;
pub mod storage;
pub mod testing;
pub mod type_config;

#[cfg(test)]
mod feature_serde_test;
Expand Down
11 changes: 4 additions & 7 deletions openraft/src/metrics/wait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@ use crate::core::ServerState;
use crate::metrics::Condition;
use crate::metrics::Metric;
use crate::metrics::RaftMetrics;
use crate::type_config::alias::AsyncRuntimeOf;
use crate::type_config::alias::InstantOf;
use crate::AsyncRuntime;
use crate::Instant;
use crate::type_config::TypeConfigExt;
use crate::LogId;
use crate::OptionalSend;
use crate::RaftTypeConfig;
Expand Down Expand Up @@ -40,7 +37,7 @@ where C: RaftTypeConfig
#[tracing::instrument(level = "trace", skip(self, func), fields(msg=%msg.to_string()))]
pub async fn metrics<T>(&self, func: T, msg: impl ToString) -> Result<RaftMetrics<C>, WaitError>
where T: Fn(&RaftMetrics<C>) -> bool + OptionalSend {
let timeout_at = InstantOf::<C>::now() + self.timeout;
let timeout_at = C::now() + self.timeout;

let mut rx = self.rx.clone();
loop {
Expand All @@ -53,7 +50,7 @@ where C: RaftTypeConfig
return Ok(latest);
}

let now = InstantOf::<C>::now();
let now = C::now();
if now >= timeout_at {
return Err(WaitError::Timeout(
self.timeout,
Expand All @@ -63,7 +60,7 @@ where C: RaftTypeConfig

let sleep_time = timeout_at - now;
tracing::debug!(?sleep_time, "wait timeout");
let delay = AsyncRuntimeOf::<C>::sleep(sleep_time);
let delay = C::sleep(sleep_time);

tokio::select! {
_ = delay => {
Expand Down
7 changes: 3 additions & 4 deletions openraft/src/network/snapshot_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ use crate::error::StreamingError;
use crate::network::RPCOption;
use crate::raft::InstallSnapshotRequest;
use crate::raft::SnapshotResponse;
use crate::type_config::alias::AsyncRuntimeOf;
use crate::AsyncRuntime;
use crate::type_config::TypeConfigExt;
use crate::ErrorSubject;
use crate::ErrorVerb;
use crate::OptionalSend;
Expand Down Expand Up @@ -124,7 +123,7 @@ where C::SnapshotData: tokio::io::AsyncRead + tokio::io::AsyncWrite + tokio::io:
// Sleep a short time otherwise in test environment it is a dead-loop that never
// yields.
// Because network implementation does not yield.
AsyncRuntimeOf::<C>::sleep(Duration::from_millis(1)).await;
C::sleep(Duration::from_millis(1)).await;

snapshot.snapshot.seek(SeekFrom::Start(offset)).await.sto_res(subject_verb)?;

Expand Down Expand Up @@ -160,7 +159,7 @@ where C::SnapshotData: tokio::io::AsyncRead + tokio::io::AsyncWrite + tokio::io:
);

#[allow(deprecated)]
let res = AsyncRuntimeOf::<C>::timeout(option.hard_ttl(), net.install_snapshot(req, option.clone())).await;
let res = C::timeout(option.hard_ttl(), net.install_snapshot(req, option.clone())).await;

let resp = match res {
Ok(outer_res) => match outer_res {
Expand Down
Loading

0 comments on commit c71a95d

Please sign in to comment.