Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Add TypeConfigExt to simplify RaftTypeConfig Access #1145

Merged
merged 1 commit into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 10 additions & 15 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,10 @@ use crate::storage::LogFlushed;
use crate::storage::RaftLogReaderExt;
use crate::storage::RaftLogStorage;
use crate::storage::RaftStateMachine;
use crate::type_config::alias::AsyncRuntimeOf;
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::OneshotReceiverOf;
use crate::type_config::alias::ResponderOf;
use crate::type_config::TypeConfigExt;
use crate::AsyncRuntime;
use crate::ChangeMembers;
use crate::Instant;
Expand Down Expand Up @@ -146,7 +146,7 @@ pub(crate) struct LeaderData<C: RaftTypeConfig> {
impl<C: RaftTypeConfig> LeaderData<C> {
pub(crate) fn new() -> Self {
Self {
next_heartbeat: InstantOf::<C>::now(),
next_heartbeat: C::now(),
}
}
}
Expand Down Expand Up @@ -501,16 +501,12 @@ where
/// Currently heartbeat is a blank log
#[tracing::instrument(level = "debug", skip_all, fields(id = display(self.id)))]
pub fn send_heartbeat(&mut self, emitter: impl Display) -> bool {
tracing::debug!(now = debug(InstantOf::<C>::now()), "send_heartbeat");
tracing::debug!(now = debug(C::now()), "send_heartbeat");

let mut lh = if let Some((lh, _)) = self.engine.get_leader_handler_or_reject(None) {
lh
} else {
tracing::debug!(
now = debug(InstantOf::<C>::now()),
"{} failed to send heartbeat",
emitter
);
tracing::debug!(now = debug(C::now()), "{} failed to send heartbeat", emitter);
return false;
};

Expand Down Expand Up @@ -1136,7 +1132,7 @@ where
self.handle_append_entries_request(rpc, tx);
}
RaftMsg::RequestVote { rpc, tx } => {
let now = InstantOf::<C>::now();
let now = C::now();
tracing::info!(
now = display(now.display()),
vote_request = display(&rpc),
Expand Down Expand Up @@ -1223,7 +1219,7 @@ where
resp,
sender_vote,
} => {
let now = InstantOf::<C>::now();
let now = C::now();

tracing::info!(
now = display(now.display()),
Expand Down Expand Up @@ -1259,7 +1255,7 @@ where
Notify::Tick { i } => {
// check every timer

let now = InstantOf::<C>::now();
let now = C::now();
tracing::debug!("received tick: {}, now: {:?}", i, now);

self.handle_tick_election();
Expand All @@ -1277,8 +1273,7 @@ where

// Install next heartbeat
if let Some(l) = &mut self.leader_data {
l.next_heartbeat =
InstantOf::<C>::now() + Duration::from_millis(self.config.heartbeat_interval);
l.next_heartbeat = C::now() + Duration::from_millis(self.config.heartbeat_interval);
}
}
}
Expand Down Expand Up @@ -1417,7 +1412,7 @@ where

#[tracing::instrument(level = "debug", skip_all)]
fn handle_tick_election(&mut self) {
let now = InstantOf::<C>::now();
let now = C::now();

tracing::debug!("try to trigger election by tick, now: {:?}", now);

Expand Down Expand Up @@ -1646,7 +1641,7 @@ where

// False positive lint warning(`non-binding `let` on a future`): https://github.com/rust-lang/rust-clippy/issues/9932
#[allow(clippy::let_underscore_future)]
let _ = AsyncRuntimeOf::<C>::spawn(async move {
let _ = C::spawn(async move {
for (log_index, tx) in removed.into_iter() {
tx.send(Err(ClientWriteError::ForwardToLeader(ForwardToLeader {
leader_id,
Expand Down
5 changes: 2 additions & 3 deletions openraft/src/core/sm/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
use tokio::sync::mpsc;

use crate::core::sm;
use crate::type_config::alias::AsyncRuntimeOf;
use crate::type_config::alias::JoinHandleOf;
use crate::AsyncRuntime;
use crate::type_config::TypeConfigExt;
use crate::RaftTypeConfig;
use crate::Snapshot;

Expand Down Expand Up @@ -55,7 +54,7 @@ where C: RaftTypeConfig
/// If the state machine worker has shutdown, it will return an error.
/// If there is not snapshot available, it will return `Ok(None)`.
pub(crate) async fn get_snapshot(&self) -> Result<Option<Snapshot<C>>, &'static str> {
let (tx, rx) = AsyncRuntimeOf::<C>::oneshot();
let (tx, rx) = C::oneshot();

let cmd = sm::Command::get_snapshot(tx);
tracing::debug!("SnapshotReader sending command to sm::Worker: {:?}", cmd);
Expand Down
18 changes: 7 additions & 11 deletions openraft/src/core/tick.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,8 @@
use tracing::Span;

use crate::core::notify::Notify;
use crate::type_config::alias::AsyncRuntimeOf;
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::JoinHandleOf;
use crate::AsyncRuntime;
use crate::Instant;
use crate::type_config::TypeConfigExt;
use crate::RaftTypeConfig;

/// Emit RaftMsg::Tick event at regular `interval`.
Expand Down Expand Up @@ -68,7 +65,7 @@

let shutdown = Mutex::new(Some(shutdown));

let join_handle = AsyncRuntimeOf::<C>::spawn(this.tick_loop(shutdown_rx).instrument(tracing::span!(
let join_handle = C::spawn(this.tick_loop(shutdown_rx).instrument(tracing::span!(
parent: &Span::current(),
Level::DEBUG,
"tick"
Expand All @@ -87,8 +84,8 @@
let mut cancel = std::pin::pin!(cancel_rx);

loop {
let at = InstantOf::<C>::now() + self.interval;
let mut sleep_fut = AsyncRuntimeOf::<C>::sleep_until(at);
let at = C::now() + self.interval;
let mut sleep_fut = C::sleep_until(at);
let sleep_fut = std::pin::pin!(sleep_fut);
let cancel_fut = cancel.as_mut();

Expand Down Expand Up @@ -156,11 +153,10 @@
mod tests {
use std::io::Cursor;

use tokio::time::Duration;

Check warning on line 156 in openraft/src/core/tick.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, single-term-leader,serde,singlethreaded)

unused import: `tokio::time::Duration`

use crate::core::Tick;

Check warning on line 158 in openraft/src/core/tick.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, single-term-leader,serde,singlethreaded)

unused import: `crate::core::Tick`
use crate::type_config::alias::AsyncRuntimeOf;
use crate::AsyncRuntime;
use crate::type_config::TypeConfigExt;

Check warning on line 159 in openraft/src/core/tick.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, single-term-leader,serde,singlethreaded)

unused import: `crate::type_config::TypeConfigExt`
use crate::RaftTypeConfig;
use crate::TokioRuntime;

Expand All @@ -187,9 +183,9 @@
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let th = Tick::<TickUTConfig>::spawn(Duration::from_millis(100), tx, true);

AsyncRuntimeOf::<TickUTConfig>::sleep(Duration::from_millis(500)).await;
TickUTConfig::sleep(Duration::from_millis(500)).await;
let _ = th.shutdown().unwrap().await;
AsyncRuntimeOf::<TickUTConfig>::sleep(Duration::from_millis(500)).await;
TickUTConfig::sleep(Duration::from_millis(500)).await;

let mut received = vec![];
while let Some(x) = rx.recv().await {
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/display_ext.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Implement [`fmt::Display`] for types such as `Option<T>` and slice `&[T]`.
//! Implement [`std::fmt::Display`] for types such as `Option<T>` and slice `&[T]`.

pub(crate) mod display_instant;
pub(crate) mod display_option;
Expand Down
7 changes: 3 additions & 4 deletions openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,9 @@ use crate::raft::VoteRequest;
use crate::raft::VoteResponse;
use crate::raft_state::LogStateReader;
use crate::raft_state::RaftState;
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::ResponderOf;
use crate::type_config::alias::SnapshotDataOf;
use crate::Instant;
use crate::type_config::TypeConfigExt;
use crate::LogId;
use crate::LogIdOptionExt;
use crate::Membership;
Expand Down Expand Up @@ -123,7 +122,7 @@ where C: RaftTypeConfig
/// The candidate `last_log_id` is initialized with the attributes of Acceptor part:
/// [`RaftState`]
pub(crate) fn new_candidate(&mut self, vote: Vote<C::NodeId>) -> &mut Candidate<C, LeaderQuorumSet<C::NodeId>> {
let now = InstantOf::<C>::now();
let now = C::now();
let last_log_id = self.state.last_log_id().copied();

let membership = self.state.membership_state.effective().membership();
Expand Down Expand Up @@ -282,7 +281,7 @@ where C: RaftTypeConfig

#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn handle_vote_req(&mut self, req: VoteRequest<C>) -> VoteResponse<C> {
let now = InstantOf::<C>::now();
let now = C::now();
let lease = self.config.timer_config.leader_lease;
let vote = self.state.vote_ref();

Expand Down
9 changes: 4 additions & 5 deletions openraft/src/engine/handler/vote_handler/accept_vote_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ use crate::engine::Respond;
use crate::error::Infallible;
use crate::raft::VoteResponse;
use crate::testing::log_id;
use crate::type_config::alias::AsyncRuntimeOf;
use crate::type_config::TypeConfigExt;
use crate::utime::UTime;
use crate::AsyncRuntime;
use crate::EffectiveMembership;
use crate::Membership;
use crate::TokioInstant;
Expand Down Expand Up @@ -48,12 +47,12 @@ fn test_accept_vote_reject_smaller_vote() -> anyhow::Result<()> {
let mut eng = eng();
eng.output.take_commands();

let (tx, _rx) = AsyncRuntimeOf::<UTConfig>::oneshot();
let (tx, _rx) = UTConfig::<()>::oneshot();
let resp = eng.vote_handler().accept_vote(&Vote::new(1, 2), tx, |_state, _err| mk_res());

assert!(resp.is_none());

let (tx, _rx) = AsyncRuntimeOf::<UTConfig>::oneshot();
let (tx, _rx) = UTConfig::<()>::oneshot();
assert_eq!(
vec![
//
Expand All @@ -74,7 +73,7 @@ fn test_accept_vote_granted_greater_vote() -> anyhow::Result<()> {
let mut eng = eng();
eng.output.take_commands();

let (tx, _rx) = AsyncRuntimeOf::<UTConfig>::oneshot();
let (tx, _rx) = UTConfig::<()>::oneshot();
let resp = eng.vote_handler().accept_vote(&Vote::new(3, 3), tx, |_state, _err| mk_res());

assert!(resp.is_some());
Expand Down
9 changes: 4 additions & 5 deletions openraft/src/engine/handler/vote_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ use crate::error::RejectVoteRequest;
use crate::proposer::CandidateState;
use crate::proposer::LeaderState;
use crate::raft_state::LogStateReader;
use crate::type_config::alias::InstantOf;
use crate::Instant;
use crate::type_config::TypeConfigExt;
use crate::LogId;
use crate::OptionalSend;
use crate::RaftState;
Expand Down Expand Up @@ -133,15 +132,15 @@ where C: RaftTypeConfig
if vote > self.state.vote_ref() {
tracing::info!("vote is changing from {} to {}", self.state.vote_ref(), vote);

self.state.vote.update(InstantOf::<C>::now(), *vote);
self.state.vote.update(C::now(), *vote);
self.output.push_command(Command::SaveVote { vote: *vote });
} else {
self.state.vote.touch(InstantOf::<C>::now());
self.state.vote.touch(C::now());
}

// Update vote related timer and lease.

tracing::debug!(now = debug(InstantOf::<C>::now()), "{}", func_name!());
tracing::debug!(now = debug(C::now()), "{}", func_name!());

self.update_internal_server_state();

Expand Down
11 changes: 5 additions & 6 deletions openraft/src/engine/tests/install_full_snapshot_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ use crate::engine::LogIdList;
use crate::engine::Respond;
use crate::raft::SnapshotResponse;
use crate::testing::log_id;
use crate::type_config::alias::AsyncRuntimeOf;
use crate::AsyncRuntime;
use crate::type_config::TypeConfigExt;
use crate::Membership;
use crate::Snapshot;
use crate::SnapshotMeta;
Expand Down Expand Up @@ -63,7 +62,7 @@ fn test_handle_install_full_snapshot_lt_last_snapshot() -> anyhow::Result<()> {

let curr_vote = *eng.state.vote_ref();

let (tx, _rx) = AsyncRuntimeOf::<UTConfig>::oneshot();
let (tx, _rx) = UTConfig::<()>::oneshot();

eng.handle_install_full_snapshot(
curr_vote,
Expand All @@ -87,7 +86,7 @@ fn test_handle_install_full_snapshot_lt_last_snapshot() -> anyhow::Result<()> {
eng.state.snapshot_meta
);

let (dummy_tx, _rx) = AsyncRuntimeOf::<UTConfig>::oneshot();
let (dummy_tx, _rx) = UTConfig::<()>::oneshot();
assert_eq!(
vec![
//
Expand All @@ -111,7 +110,7 @@ fn test_handle_install_full_snapshot_no_conflict() -> anyhow::Result<()> {

let curr_vote = *eng.state.vote_ref();

let (tx, _rx) = AsyncRuntimeOf::<UTConfig>::oneshot();
let (tx, _rx) = UTConfig::<()>::oneshot();

eng.handle_install_full_snapshot(
curr_vote,
Expand All @@ -135,7 +134,7 @@ fn test_handle_install_full_snapshot_no_conflict() -> anyhow::Result<()> {
eng.state.snapshot_meta
);

let (dummy_tx, _rx) = AsyncRuntimeOf::<UTConfig>::oneshot();
let (dummy_tx, _rx) = UTConfig::<()>::oneshot();
assert_eq!(
vec![
//
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ pub(crate) mod log_id_range;
pub(crate) mod proposer;
pub(crate) mod raft_state;
pub(crate) mod timer;
pub(crate) mod type_config;
pub(crate) mod utime;

pub mod async_runtime;
Expand All @@ -63,6 +62,7 @@ pub mod network;
pub mod raft;
pub mod storage;
pub mod testing;
pub mod type_config;

#[cfg(test)]
mod feature_serde_test;
Expand Down
11 changes: 4 additions & 7 deletions openraft/src/metrics/wait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@ use crate::core::ServerState;
use crate::metrics::Condition;
use crate::metrics::Metric;
use crate::metrics::RaftMetrics;
use crate::type_config::alias::AsyncRuntimeOf;
use crate::type_config::alias::InstantOf;
use crate::AsyncRuntime;
use crate::Instant;
use crate::type_config::TypeConfigExt;
use crate::LogId;
use crate::OptionalSend;
use crate::RaftTypeConfig;
Expand Down Expand Up @@ -40,7 +37,7 @@ where C: RaftTypeConfig
#[tracing::instrument(level = "trace", skip(self, func), fields(msg=%msg.to_string()))]
pub async fn metrics<T>(&self, func: T, msg: impl ToString) -> Result<RaftMetrics<C>, WaitError>
where T: Fn(&RaftMetrics<C>) -> bool + OptionalSend {
let timeout_at = InstantOf::<C>::now() + self.timeout;
let timeout_at = C::now() + self.timeout;

let mut rx = self.rx.clone();
loop {
Expand All @@ -53,7 +50,7 @@ where C: RaftTypeConfig
return Ok(latest);
}

let now = InstantOf::<C>::now();
let now = C::now();
if now >= timeout_at {
return Err(WaitError::Timeout(
self.timeout,
Expand All @@ -63,7 +60,7 @@ where C: RaftTypeConfig

let sleep_time = timeout_at - now;
tracing::debug!(?sleep_time, "wait timeout");
let delay = AsyncRuntimeOf::<C>::sleep(sleep_time);
let delay = C::sleep(sleep_time);

tokio::select! {
_ = delay => {
Expand Down
7 changes: 3 additions & 4 deletions openraft/src/network/snapshot_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ use crate::error::StreamingError;
use crate::network::RPCOption;
use crate::raft::InstallSnapshotRequest;
use crate::raft::SnapshotResponse;
use crate::type_config::alias::AsyncRuntimeOf;
use crate::AsyncRuntime;
use crate::type_config::TypeConfigExt;
use crate::ErrorSubject;
use crate::ErrorVerb;
use crate::OptionalSend;
Expand Down Expand Up @@ -124,7 +123,7 @@ where C::SnapshotData: tokio::io::AsyncRead + tokio::io::AsyncWrite + tokio::io:
// Sleep a short time otherwise in test environment it is a dead-loop that never
// yields.
// Because network implementation does not yield.
AsyncRuntimeOf::<C>::sleep(Duration::from_millis(1)).await;
C::sleep(Duration::from_millis(1)).await;

snapshot.snapshot.seek(SeekFrom::Start(offset)).await.sto_res(subject_verb)?;

Expand Down Expand Up @@ -160,7 +159,7 @@ where C::SnapshotData: tokio::io::AsyncRead + tokio::io::AsyncWrite + tokio::io:
);

#[allow(deprecated)]
let res = AsyncRuntimeOf::<C>::timeout(option.hard_ttl(), net.install_snapshot(req, option.clone())).await;
let res = C::timeout(option.hard_ttl(), net.install_snapshot(req, option.clone())).await;

let resp = match res {
Ok(outer_res) => match outer_res {
Expand Down
Loading
Loading