diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index c7a102be1..c1d2a428b 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -88,10 +88,10 @@ use crate::storage::LogFlushed; use crate::storage::RaftLogReaderExt; use crate::storage::RaftLogStorage; use crate::storage::RaftStateMachine; -use crate::type_config::alias::AsyncRuntimeOf; use crate::type_config::alias::InstantOf; use crate::type_config::alias::OneshotReceiverOf; use crate::type_config::alias::ResponderOf; +use crate::type_config::TypeConfigExt; use crate::AsyncRuntime; use crate::ChangeMembers; use crate::Instant; @@ -146,7 +146,7 @@ pub(crate) struct LeaderData { impl LeaderData { pub(crate) fn new() -> Self { Self { - next_heartbeat: InstantOf::::now(), + next_heartbeat: C::now(), } } } @@ -501,16 +501,12 @@ where /// Currently heartbeat is a blank log #[tracing::instrument(level = "debug", skip_all, fields(id = display(self.id)))] pub fn send_heartbeat(&mut self, emitter: impl Display) -> bool { - tracing::debug!(now = debug(InstantOf::::now()), "send_heartbeat"); + tracing::debug!(now = debug(C::now()), "send_heartbeat"); let mut lh = if let Some((lh, _)) = self.engine.get_leader_handler_or_reject(None) { lh } else { - tracing::debug!( - now = debug(InstantOf::::now()), - "{} failed to send heartbeat", - emitter - ); + tracing::debug!(now = debug(C::now()), "{} failed to send heartbeat", emitter); return false; }; @@ -1136,7 +1132,7 @@ where self.handle_append_entries_request(rpc, tx); } RaftMsg::RequestVote { rpc, tx } => { - let now = InstantOf::::now(); + let now = C::now(); tracing::info!( now = display(now.display()), vote_request = display(&rpc), @@ -1223,7 +1219,7 @@ where resp, sender_vote, } => { - let now = InstantOf::::now(); + let now = C::now(); tracing::info!( now = display(now.display()), @@ -1259,7 +1255,7 @@ where Notify::Tick { i } => { // check every timer - let now = InstantOf::::now(); + let now = C::now(); tracing::debug!("received tick: {}, now: {:?}", i, now); self.handle_tick_election(); @@ -1277,8 +1273,7 @@ where // Install next heartbeat if let Some(l) = &mut self.leader_data { - l.next_heartbeat = - InstantOf::::now() + Duration::from_millis(self.config.heartbeat_interval); + l.next_heartbeat = C::now() + Duration::from_millis(self.config.heartbeat_interval); } } } @@ -1417,7 +1412,7 @@ where #[tracing::instrument(level = "debug", skip_all)] fn handle_tick_election(&mut self) { - let now = InstantOf::::now(); + let now = C::now(); tracing::debug!("try to trigger election by tick, now: {:?}", now); @@ -1646,7 +1641,7 @@ where // False positive lint warning(`non-binding `let` on a future`): https://github.com/rust-lang/rust-clippy/issues/9932 #[allow(clippy::let_underscore_future)] - let _ = AsyncRuntimeOf::::spawn(async move { + let _ = C::spawn(async move { for (log_index, tx) in removed.into_iter() { tx.send(Err(ClientWriteError::ForwardToLeader(ForwardToLeader { leader_id, diff --git a/openraft/src/core/sm/handle.rs b/openraft/src/core/sm/handle.rs index 8a718663c..ff4346868 100644 --- a/openraft/src/core/sm/handle.rs +++ b/openraft/src/core/sm/handle.rs @@ -3,9 +3,8 @@ use tokio::sync::mpsc; use crate::core::sm; -use crate::type_config::alias::AsyncRuntimeOf; use crate::type_config::alias::JoinHandleOf; -use crate::AsyncRuntime; +use crate::type_config::TypeConfigExt; use crate::RaftTypeConfig; use crate::Snapshot; @@ -55,7 +54,7 @@ where C: RaftTypeConfig /// If the state machine worker has shutdown, it will return an error. /// If there is not snapshot available, it will return `Ok(None)`. pub(crate) async fn get_snapshot(&self) -> Result>, &'static str> { - let (tx, rx) = AsyncRuntimeOf::::oneshot(); + let (tx, rx) = C::oneshot(); let cmd = sm::Command::get_snapshot(tx); tracing::debug!("SnapshotReader sending command to sm::Worker: {:?}", cmd); diff --git a/openraft/src/core/tick.rs b/openraft/src/core/tick.rs index b2091e217..3247a2058 100644 --- a/openraft/src/core/tick.rs +++ b/openraft/src/core/tick.rs @@ -14,11 +14,8 @@ use tracing::Level; use tracing::Span; use crate::core::notify::Notify; -use crate::type_config::alias::AsyncRuntimeOf; -use crate::type_config::alias::InstantOf; use crate::type_config::alias::JoinHandleOf; -use crate::AsyncRuntime; -use crate::Instant; +use crate::type_config::TypeConfigExt; use crate::RaftTypeConfig; /// Emit RaftMsg::Tick event at regular `interval`. @@ -68,7 +65,7 @@ where C: RaftTypeConfig let shutdown = Mutex::new(Some(shutdown)); - let join_handle = AsyncRuntimeOf::::spawn(this.tick_loop(shutdown_rx).instrument(tracing::span!( + let join_handle = C::spawn(this.tick_loop(shutdown_rx).instrument(tracing::span!( parent: &Span::current(), Level::DEBUG, "tick" @@ -87,8 +84,8 @@ where C: RaftTypeConfig let mut cancel = std::pin::pin!(cancel_rx); loop { - let at = InstantOf::::now() + self.interval; - let mut sleep_fut = AsyncRuntimeOf::::sleep_until(at); + let at = C::now() + self.interval; + let mut sleep_fut = C::sleep_until(at); let sleep_fut = std::pin::pin!(sleep_fut); let cancel_fut = cancel.as_mut(); @@ -159,8 +156,7 @@ mod tests { use tokio::time::Duration; use crate::core::Tick; - use crate::type_config::alias::AsyncRuntimeOf; - use crate::AsyncRuntime; + use crate::type_config::TypeConfigExt; use crate::RaftTypeConfig; use crate::TokioRuntime; @@ -187,9 +183,9 @@ mod tests { let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); let th = Tick::::spawn(Duration::from_millis(100), tx, true); - AsyncRuntimeOf::::sleep(Duration::from_millis(500)).await; + TickUTConfig::sleep(Duration::from_millis(500)).await; let _ = th.shutdown().unwrap().await; - AsyncRuntimeOf::::sleep(Duration::from_millis(500)).await; + TickUTConfig::sleep(Duration::from_millis(500)).await; let mut received = vec![]; while let Some(x) = rx.recv().await { diff --git a/openraft/src/display_ext.rs b/openraft/src/display_ext.rs index 0df1a7783..65afd0953 100644 --- a/openraft/src/display_ext.rs +++ b/openraft/src/display_ext.rs @@ -1,4 +1,4 @@ -//! Implement [`fmt::Display`] for types such as `Option` and slice `&[T]`. +//! Implement [`std::fmt::Display`] for types such as `Option` and slice `&[T]`. pub(crate) mod display_instant; pub(crate) mod display_option; diff --git a/openraft/src/engine/engine_impl.rs b/openraft/src/engine/engine_impl.rs index 3e011de31..785340296 100644 --- a/openraft/src/engine/engine_impl.rs +++ b/openraft/src/engine/engine_impl.rs @@ -41,10 +41,9 @@ use crate::raft::VoteRequest; use crate::raft::VoteResponse; use crate::raft_state::LogStateReader; use crate::raft_state::RaftState; -use crate::type_config::alias::InstantOf; use crate::type_config::alias::ResponderOf; use crate::type_config::alias::SnapshotDataOf; -use crate::Instant; +use crate::type_config::TypeConfigExt; use crate::LogId; use crate::LogIdOptionExt; use crate::Membership; @@ -123,7 +122,7 @@ where C: RaftTypeConfig /// The candidate `last_log_id` is initialized with the attributes of Acceptor part: /// [`RaftState`] pub(crate) fn new_candidate(&mut self, vote: Vote) -> &mut Candidate> { - let now = InstantOf::::now(); + let now = C::now(); let last_log_id = self.state.last_log_id().copied(); let membership = self.state.membership_state.effective().membership(); @@ -282,7 +281,7 @@ where C: RaftTypeConfig #[tracing::instrument(level = "debug", skip_all)] pub(crate) fn handle_vote_req(&mut self, req: VoteRequest) -> VoteResponse { - let now = InstantOf::::now(); + let now = C::now(); let lease = self.config.timer_config.leader_lease; let vote = self.state.vote_ref(); diff --git a/openraft/src/engine/handler/vote_handler/accept_vote_test.rs b/openraft/src/engine/handler/vote_handler/accept_vote_test.rs index a64f0f46e..98ea7c14c 100644 --- a/openraft/src/engine/handler/vote_handler/accept_vote_test.rs +++ b/openraft/src/engine/handler/vote_handler/accept_vote_test.rs @@ -11,9 +11,8 @@ use crate::engine::Respond; use crate::error::Infallible; use crate::raft::VoteResponse; use crate::testing::log_id; -use crate::type_config::alias::AsyncRuntimeOf; +use crate::type_config::TypeConfigExt; use crate::utime::UTime; -use crate::AsyncRuntime; use crate::EffectiveMembership; use crate::Membership; use crate::TokioInstant; @@ -48,12 +47,12 @@ fn test_accept_vote_reject_smaller_vote() -> anyhow::Result<()> { let mut eng = eng(); eng.output.take_commands(); - let (tx, _rx) = AsyncRuntimeOf::::oneshot(); + let (tx, _rx) = UTConfig::<()>::oneshot(); let resp = eng.vote_handler().accept_vote(&Vote::new(1, 2), tx, |_state, _err| mk_res()); assert!(resp.is_none()); - let (tx, _rx) = AsyncRuntimeOf::::oneshot(); + let (tx, _rx) = UTConfig::<()>::oneshot(); assert_eq!( vec![ // @@ -74,7 +73,7 @@ fn test_accept_vote_granted_greater_vote() -> anyhow::Result<()> { let mut eng = eng(); eng.output.take_commands(); - let (tx, _rx) = AsyncRuntimeOf::::oneshot(); + let (tx, _rx) = UTConfig::<()>::oneshot(); let resp = eng.vote_handler().accept_vote(&Vote::new(3, 3), tx, |_state, _err| mk_res()); assert!(resp.is_some()); diff --git a/openraft/src/engine/handler/vote_handler/mod.rs b/openraft/src/engine/handler/vote_handler/mod.rs index 21b970e6a..fed7aee27 100644 --- a/openraft/src/engine/handler/vote_handler/mod.rs +++ b/openraft/src/engine/handler/vote_handler/mod.rs @@ -14,8 +14,7 @@ use crate::error::RejectVoteRequest; use crate::proposer::CandidateState; use crate::proposer::LeaderState; use crate::raft_state::LogStateReader; -use crate::type_config::alias::InstantOf; -use crate::Instant; +use crate::type_config::TypeConfigExt; use crate::LogId; use crate::OptionalSend; use crate::RaftState; @@ -133,15 +132,15 @@ where C: RaftTypeConfig if vote > self.state.vote_ref() { tracing::info!("vote is changing from {} to {}", self.state.vote_ref(), vote); - self.state.vote.update(InstantOf::::now(), *vote); + self.state.vote.update(C::now(), *vote); self.output.push_command(Command::SaveVote { vote: *vote }); } else { - self.state.vote.touch(InstantOf::::now()); + self.state.vote.touch(C::now()); } // Update vote related timer and lease. - tracing::debug!(now = debug(InstantOf::::now()), "{}", func_name!()); + tracing::debug!(now = debug(C::now()), "{}", func_name!()); self.update_internal_server_state(); diff --git a/openraft/src/engine/tests/install_full_snapshot_test.rs b/openraft/src/engine/tests/install_full_snapshot_test.rs index bfe5899cd..81b238625 100644 --- a/openraft/src/engine/tests/install_full_snapshot_test.rs +++ b/openraft/src/engine/tests/install_full_snapshot_test.rs @@ -12,8 +12,7 @@ use crate::engine::LogIdList; use crate::engine::Respond; use crate::raft::SnapshotResponse; use crate::testing::log_id; -use crate::type_config::alias::AsyncRuntimeOf; -use crate::AsyncRuntime; +use crate::type_config::TypeConfigExt; use crate::Membership; use crate::Snapshot; use crate::SnapshotMeta; @@ -63,7 +62,7 @@ fn test_handle_install_full_snapshot_lt_last_snapshot() -> anyhow::Result<()> { let curr_vote = *eng.state.vote_ref(); - let (tx, _rx) = AsyncRuntimeOf::::oneshot(); + let (tx, _rx) = UTConfig::<()>::oneshot(); eng.handle_install_full_snapshot( curr_vote, @@ -87,7 +86,7 @@ fn test_handle_install_full_snapshot_lt_last_snapshot() -> anyhow::Result<()> { eng.state.snapshot_meta ); - let (dummy_tx, _rx) = AsyncRuntimeOf::::oneshot(); + let (dummy_tx, _rx) = UTConfig::<()>::oneshot(); assert_eq!( vec![ // @@ -111,7 +110,7 @@ fn test_handle_install_full_snapshot_no_conflict() -> anyhow::Result<()> { let curr_vote = *eng.state.vote_ref(); - let (tx, _rx) = AsyncRuntimeOf::::oneshot(); + let (tx, _rx) = UTConfig::<()>::oneshot(); eng.handle_install_full_snapshot( curr_vote, @@ -135,7 +134,7 @@ fn test_handle_install_full_snapshot_no_conflict() -> anyhow::Result<()> { eng.state.snapshot_meta ); - let (dummy_tx, _rx) = AsyncRuntimeOf::::oneshot(); + let (dummy_tx, _rx) = UTConfig::<()>::oneshot(); assert_eq!( vec![ // diff --git a/openraft/src/lib.rs b/openraft/src/lib.rs index 50a34e5c6..8916c0963 100644 --- a/openraft/src/lib.rs +++ b/openraft/src/lib.rs @@ -46,7 +46,6 @@ pub(crate) mod log_id_range; pub(crate) mod proposer; pub(crate) mod raft_state; pub(crate) mod timer; -pub(crate) mod type_config; pub(crate) mod utime; pub mod async_runtime; @@ -63,6 +62,7 @@ pub mod network; pub mod raft; pub mod storage; pub mod testing; +pub mod type_config; #[cfg(test)] mod feature_serde_test; diff --git a/openraft/src/metrics/wait.rs b/openraft/src/metrics/wait.rs index 50f3458e0..343d93f74 100644 --- a/openraft/src/metrics/wait.rs +++ b/openraft/src/metrics/wait.rs @@ -7,10 +7,7 @@ use crate::core::ServerState; use crate::metrics::Condition; use crate::metrics::Metric; use crate::metrics::RaftMetrics; -use crate::type_config::alias::AsyncRuntimeOf; -use crate::type_config::alias::InstantOf; -use crate::AsyncRuntime; -use crate::Instant; +use crate::type_config::TypeConfigExt; use crate::LogId; use crate::OptionalSend; use crate::RaftTypeConfig; @@ -40,7 +37,7 @@ where C: RaftTypeConfig #[tracing::instrument(level = "trace", skip(self, func), fields(msg=%msg.to_string()))] pub async fn metrics(&self, func: T, msg: impl ToString) -> Result, WaitError> where T: Fn(&RaftMetrics) -> bool + OptionalSend { - let timeout_at = InstantOf::::now() + self.timeout; + let timeout_at = C::now() + self.timeout; let mut rx = self.rx.clone(); loop { @@ -53,7 +50,7 @@ where C: RaftTypeConfig return Ok(latest); } - let now = InstantOf::::now(); + let now = C::now(); if now >= timeout_at { return Err(WaitError::Timeout( self.timeout, @@ -63,7 +60,7 @@ where C: RaftTypeConfig let sleep_time = timeout_at - now; tracing::debug!(?sleep_time, "wait timeout"); - let delay = AsyncRuntimeOf::::sleep(sleep_time); + let delay = C::sleep(sleep_time); tokio::select! { _ = delay => { diff --git a/openraft/src/network/snapshot_transport.rs b/openraft/src/network/snapshot_transport.rs index 29b52ce1b..8ef911446 100644 --- a/openraft/src/network/snapshot_transport.rs +++ b/openraft/src/network/snapshot_transport.rs @@ -20,8 +20,7 @@ use crate::error::StreamingError; use crate::network::RPCOption; use crate::raft::InstallSnapshotRequest; use crate::raft::SnapshotResponse; -use crate::type_config::alias::AsyncRuntimeOf; -use crate::AsyncRuntime; +use crate::type_config::TypeConfigExt; use crate::ErrorSubject; use crate::ErrorVerb; use crate::OptionalSend; @@ -124,7 +123,7 @@ where C::SnapshotData: tokio::io::AsyncRead + tokio::io::AsyncWrite + tokio::io: // Sleep a short time otherwise in test environment it is a dead-loop that never // yields. // Because network implementation does not yield. - AsyncRuntimeOf::::sleep(Duration::from_millis(1)).await; + C::sleep(Duration::from_millis(1)).await; snapshot.snapshot.seek(SeekFrom::Start(offset)).await.sto_res(subject_verb)?; @@ -160,7 +159,7 @@ where C::SnapshotData: tokio::io::AsyncRead + tokio::io::AsyncWrite + tokio::io: ); #[allow(deprecated)] - let res = AsyncRuntimeOf::::timeout(option.hard_ttl(), net.install_snapshot(req, option.clone())).await; + let res = C::timeout(option.hard_ttl(), net.install_snapshot(req, option.clone())).await; let resp = match res { Ok(outer_res) => match outer_res { diff --git a/openraft/src/proposer/leader.rs b/openraft/src/proposer/leader.rs index 0d77372c0..36526088a 100644 --- a/openraft/src/proposer/leader.rs +++ b/openraft/src/proposer/leader.rs @@ -209,7 +209,7 @@ mod tests { use crate::proposer::Leader; use crate::testing::blank_ent; use crate::testing::log_id; - use crate::type_config::alias::InstantOf; + use crate::type_config::TypeConfigExt; use crate::Entry; use crate::RaftLogId; use crate::Vote; @@ -309,7 +309,7 @@ mod tests { fn test_leading_last_quorum_acked_time_leader_is_voter() { let mut leading = Leader::>::new(Vote::new_committed(2, 1), vec![1, 2, 3], [4], &[]); - let now1 = InstantOf::::now(); + let now1 = UTConfig::<()>::now(); let _t2 = leading.clock_progress.increase_to(&2, Some(now1)); let t1 = leading.last_quorum_acked_time(); @@ -320,12 +320,12 @@ mod tests { fn test_leading_last_quorum_acked_time_leader_is_learner() { let mut leading = Leader::>::new(Vote::new_committed(2, 4), vec![1, 2, 3], [4], &[]); - let t2 = InstantOf::::now(); + let t2 = UTConfig::<()>::now(); let _ = leading.clock_progress.increase_to(&2, Some(t2)); let t = leading.last_quorum_acked_time(); assert!(t.is_none(), "n1(leader+learner) does not count in quorum"); - let t3 = InstantOf::::now(); + let t3 = UTConfig::<()>::now(); let _ = leading.clock_progress.increase_to(&3, Some(t3)); let t = leading.last_quorum_acked_time(); assert_eq!(Some(t2), t, "n2 and n3 acked"); @@ -335,12 +335,12 @@ mod tests { fn test_leading_last_quorum_acked_time_leader_is_not_member() { let mut leading = Leader::>::new(Vote::new_committed(2, 5), vec![1, 2, 3], [4], &[]); - let t2 = InstantOf::::now(); + let t2 = UTConfig::<()>::now(); let _ = leading.clock_progress.increase_to(&2, Some(t2)); let t = leading.last_quorum_acked_time(); assert!(t.is_none(), "n1(leader+learner) does not count in quorum"); - let t3 = InstantOf::::now(); + let t3 = UTConfig::<()>::now(); let _ = leading.clock_progress.increase_to(&3, Some(t3)); let t = leading.last_quorum_acked_time(); assert_eq!(Some(t2), t, "n2 and n3 acked"); diff --git a/openraft/src/raft/mod.rs b/openraft/src/raft/mod.rs index 1ae155260..abf70cf24 100644 --- a/openraft/src/raft/mod.rs +++ b/openraft/src/raft/mod.rs @@ -77,11 +77,11 @@ pub use crate::raft::runtime_config_handle::RuntimeConfigHandle; use crate::raft::trigger::Trigger; use crate::storage::RaftLogStorage; use crate::storage::RaftStateMachine; -use crate::type_config::alias::AsyncRuntimeOf; use crate::type_config::alias::JoinErrorOf; use crate::type_config::alias::ResponderOf; use crate::type_config::alias::ResponderReceiverOf; use crate::type_config::alias::SnapshotDataOf; +use crate::type_config::TypeConfigExt; use crate::AsyncRuntime; use crate::LogId; use crate::LogIdOptionExt; @@ -398,7 +398,7 @@ where C: RaftTypeConfig pub async fn begin_receiving_snapshot(&self) -> Result>, RaftError> { tracing::info!("Raft::begin_receiving_snapshot()"); - let (tx, rx) = AsyncRuntimeOf::::oneshot(); + let (tx, rx) = C::oneshot(); let resp = self.inner.call_core(RaftMsg::BeginReceivingSnapshot { tx }, rx).await?; Ok(resp) } diff --git a/openraft/src/raft/responder/impls.rs b/openraft/src/raft/responder/impls.rs index 29a364941..9569cfb15 100644 --- a/openraft/src/raft/responder/impls.rs +++ b/openraft/src/raft/responder/impls.rs @@ -1,10 +1,9 @@ use crate::async_runtime::AsyncOneshotSendExt; use crate::raft::message::ClientWriteResult; use crate::raft::responder::Responder; -use crate::type_config::alias::AsyncRuntimeOf; use crate::type_config::alias::OneshotReceiverOf; use crate::type_config::alias::OneshotSenderOf; -use crate::AsyncRuntime; +use crate::type_config::TypeConfigExt; use crate::RaftTypeConfig; /// A [`Responder`] implementation that sends the response via a oneshot channel. @@ -22,6 +21,8 @@ impl OneshotResponder where C: RaftTypeConfig { /// Create a new instance from a [`AsyncRuntime::OneshotSender`]. + /// + /// [`AsyncRuntime::OneshotSender`]: `crate::async_runtime::AsyncRuntime::OneshotSender` pub fn new(tx: OneshotSenderOf>) -> Self { Self { tx } } @@ -34,7 +35,7 @@ where C: RaftTypeConfig fn from_app_data(app_data: C::D) -> (C::D, Self, Self::Receiver) where Self: Sized { - let (tx, rx) = AsyncRuntimeOf::::oneshot(); + let (tx, rx) = C::oneshot(); (app_data, Self { tx }, rx) } diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index e2da2d807..05ca2cb6e 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -48,12 +48,11 @@ use crate::replication::request_id::RequestId; use crate::storage::RaftLogReader; use crate::storage::RaftLogStorage; use crate::storage::Snapshot; -use crate::type_config::alias::AsyncRuntimeOf; use crate::type_config::alias::InstantOf; use crate::type_config::alias::JoinHandleOf; use crate::type_config::alias::LogIdOf; +use crate::type_config::TypeConfigExt; use crate::AsyncRuntime; -use crate::Instant; use crate::LogId; use crate::RaftLogId; use crate::RaftNetworkFactory; @@ -325,7 +324,7 @@ where Duration::from_millis(500) }); - self.backoff_drain_events(InstantOf::::now() + duration).await?; + self.backoff_drain_events(C::now() + duration).await?; } self.drain_events().await?; @@ -413,7 +412,7 @@ where } }; - let leader_time = InstantOf::::now(); + let leader_time = C::now(); // Build the heartbeat frame to be sent to the follower. let payload = AppendEntriesRequest { @@ -433,7 +432,7 @@ where let the_timeout = Duration::from_millis(self.config.heartbeat_interval); let option = RPCOption::new(the_timeout); - let res = AsyncRuntimeOf::::timeout(the_timeout, self.network.append_entries(payload, option)).await; + let res = C::timeout(the_timeout, self.network.append_entries(payload, option)).await; tracing::debug!("append_entries res: {:?}", res); @@ -571,7 +570,7 @@ where /// in case the channel is closed, it should quit at once. #[tracing::instrument(level = "debug", skip(self))] pub async fn backoff_drain_events(&mut self, until: InstantOf) -> Result<(), ReplicationClosed> { - let d = until - InstantOf::::now(); + let d = until - C::now(); tracing::warn!( interval = debug(d), "{} backoff mode: drain events without processing them", @@ -579,8 +578,8 @@ where ); loop { - let sleep_duration = until - InstantOf::::now(); - let sleep = C::AsyncRuntime::sleep(sleep_duration); + let sleep_duration = until - C::now(); + let sleep = C::sleep(sleep_duration); let recv = self.rx_event.recv(); @@ -734,7 +733,7 @@ where let (tx_cancel, rx_cancel) = oneshot::channel(); - let jh = AsyncRuntimeOf::::spawn(Self::send_snapshot( + let jh = C::spawn(Self::send_snapshot( request_id, self.snapshot_network.clone(), *self.session_id.vote_ref(), @@ -765,7 +764,7 @@ where let mut net = network.lock().await; - let start_time = InstantOf::::now(); + let start_time = C::now(); let cancel = async move { let _ = cancel.await; diff --git a/openraft/src/replication/response.rs b/openraft/src/replication/response.rs index 1ba7061df..3bddf3d68 100644 --- a/openraft/src/replication/response.rs +++ b/openraft/src/replication/response.rs @@ -142,17 +142,17 @@ mod tests { use crate::engine::testing::UTConfig; use crate::replication::response::ReplicationResult; use crate::testing::log_id; - use crate::type_config::alias::InstantOf; + use crate::type_config::TypeConfigExt; #[test] fn test_replication_result_display() { // NOTE that with single-term-leader, log id is `1-3` - let result = ReplicationResult::::new(InstantOf::::now(), Ok(Some(log_id(1, 2, 3)))); + let result = ReplicationResult::::new(UTConfig::<()>::now(), Ok(Some(log_id(1, 2, 3)))); let want = format!(", result:Match:{}}}", log_id(1, 2, 3)); assert!(result.to_string().ends_with(&want), "{}", result.to_string()); - let result = ReplicationResult::::new(InstantOf::::now(), Err(log_id(1, 2, 3))); + let result = ReplicationResult::::new(UTConfig::<()>::now(), Err(log_id(1, 2, 3))); let want = format!(", result:Conflict:{}}}", log_id(1, 2, 3)); assert!(result.to_string().ends_with(&want), "{}", result.to_string()); } diff --git a/openraft/src/storage/helper.rs b/openraft/src/storage/helper.rs index 90c528558..2f619032d 100644 --- a/openraft/src/storage/helper.rs +++ b/openraft/src/storage/helper.rs @@ -10,10 +10,9 @@ use crate::raft_state::IOState; use crate::storage::RaftLogReaderExt; use crate::storage::RaftLogStorage; use crate::storage::RaftStateMachine; -use crate::type_config::alias::InstantOf; +use crate::type_config::TypeConfigExt; use crate::utime::UTime; use crate::EffectiveMembership; -use crate::Instant; use crate::LogIdOptionExt; use crate::MembershipState; use crate::RaftSnapshotBuilder; @@ -149,7 +148,7 @@ where last_purged_log_id, ); - let now = InstantOf::::now(); + let now = C::now(); Ok(RaftState { committed: last_applied, diff --git a/openraft/src/storage/v2/raft_log_storage_ext.rs b/openraft/src/storage/v2/raft_log_storage_ext.rs index c5a80e9fc..733cfd903 100644 --- a/openraft/src/storage/v2/raft_log_storage_ext.rs +++ b/openraft/src/storage/v2/raft_log_storage_ext.rs @@ -4,8 +4,7 @@ use openraft_macros::add_async_trait; use crate::raft_state::LogIOId; use crate::storage::LogFlushed; use crate::storage::RaftLogStorage; -use crate::type_config::alias::AsyncRuntimeOf; -use crate::AsyncRuntime; +use crate::type_config::TypeConfigExt; use crate::OptionalSend; use crate::RaftTypeConfig; use crate::StorageError; @@ -27,7 +26,7 @@ where C: RaftTypeConfig I: IntoIterator + OptionalSend, I::IntoIter: OptionalSend, { - let (tx, rx) = AsyncRuntimeOf::::oneshot(); + let (tx, rx) = C::oneshot(); // dummy log_io_id let log_io_id = LogIOId::::new(Vote::::default(), None); diff --git a/openraft/src/testing/suite.rs b/openraft/src/testing/suite.rs index 708a4a624..427f88b03 100644 --- a/openraft/src/testing/suite.rs +++ b/openraft/src/testing/suite.rs @@ -20,9 +20,8 @@ use crate::storage::RaftLogStorage; use crate::storage::RaftStateMachine; use crate::storage::StorageHelper; use crate::testing::StoreBuilder; -use crate::type_config::alias::AsyncRuntimeOf; +use crate::type_config::TypeConfigExt; use crate::vote::CommittedLeaderId; -use crate::AsyncRuntime; use crate::LogId; use crate::Membership; use crate::NodeId; @@ -1304,7 +1303,7 @@ where I: IntoIterator + OptionalSend, I::IntoIter: OptionalSend, { - let (tx, rx) = AsyncRuntimeOf::::oneshot(); + let (tx, rx) = C::oneshot(); // Dummy log io id for blocking append let log_io_id = LogIOId::::new(Vote::::default(), None); diff --git a/openraft/src/type_config.rs b/openraft/src/type_config.rs index 9fdba10ad..0c689e67d 100644 --- a/openraft/src/type_config.rs +++ b/openraft/src/type_config.rs @@ -1,5 +1,14 @@ +//! Define the configuration of types used by the Raft, such as [`NodeId`], log [`Entry`], etc. +//! +//! [`NodeId`]: `RaftTypeConfig::NodeId` +//! [`Entry`]: `RaftTypeConfig::Entry` + +pub(crate) mod util; + use std::fmt::Debug; +pub use util::TypeConfigExt; + use crate::entry::FromAppData; use crate::entry::RaftEntry; use crate::raft::responder::Responder; diff --git a/openraft/src/type_config/util.rs b/openraft/src/type_config/util.rs new file mode 100644 index 000000000..a22dfa289 --- /dev/null +++ b/openraft/src/type_config/util.rs @@ -0,0 +1,71 @@ +use std::future::Future; +use std::time::Duration; + +use crate::type_config::alias::AsyncRuntimeOf; +use crate::type_config::alias::InstantOf; +use crate::type_config::alias::JoinHandleOf; +use crate::type_config::alias::OneshotReceiverOf; +use crate::type_config::alias::OneshotSenderOf; +use crate::type_config::alias::SleepOf; +use crate::type_config::alias::TimeoutOf; +use crate::AsyncRuntime; +use crate::Instant; +use crate::OptionalSend; +use crate::RaftTypeConfig; + +/// Collection of utility methods to `RaftTypeConfig` function. +pub trait TypeConfigExt: RaftTypeConfig { + // Time related methods + + /// Returns the current time. + fn now() -> InstantOf { + InstantOf::::now() + } + + /// Wait until `duration` has elapsed. + fn sleep(duration: Duration) -> SleepOf { + AsyncRuntimeOf::::sleep(duration) + } + + /// Wait until `deadline` is reached. + fn sleep_until(deadline: InstantOf) -> SleepOf { + AsyncRuntimeOf::::sleep_until(deadline) + } + + /// Require a [`Future`] to complete before the specified duration has elapsed. + fn timeout + OptionalSend>(duration: Duration, future: F) -> TimeoutOf { + AsyncRuntimeOf::::timeout(duration, future) + } + + /// Require a [`Future`] to complete before the specified instant in time. + fn timeout_at + OptionalSend>( + deadline: InstantOf, + future: F, + ) -> TimeoutOf { + AsyncRuntimeOf::::timeout_at(deadline, future) + } + + // Synchronization methods + + /// Creates a new one-shot channel for sending single values. + /// + /// This is just a wrapper of + /// [`AsyncRuntime::oneshot`](`crate::async_runtime::AsyncRuntime::oneshot`). + fn oneshot() -> (OneshotSenderOf, OneshotReceiverOf) + where T: OptionalSend { + AsyncRuntimeOf::::oneshot() + } + + // Task methods + + /// Spawn a new task. + fn spawn(future: T) -> JoinHandleOf + where + T: Future + OptionalSend + 'static, + T::Output: OptionalSend + 'static, + { + AsyncRuntimeOf::::spawn(future) + } +} + +impl TypeConfigExt for T where T: RaftTypeConfig {} diff --git a/tests/tests/append_entries/t60_enable_heartbeat.rs b/tests/tests/append_entries/t60_enable_heartbeat.rs index f3c9a75eb..3eba34e9d 100644 --- a/tests/tests/append_entries/t60_enable_heartbeat.rs +++ b/tests/tests/append_entries/t60_enable_heartbeat.rs @@ -3,7 +3,7 @@ use std::time::Duration; use anyhow::Result; use maplit::btreeset; -use openraft::alias::InstantOf; +use openraft::type_config::TypeConfigExt; use openraft::AsyncRuntime; use openraft::Config; use openraft::TokioRuntime; @@ -32,7 +32,7 @@ async fn enable_heartbeat() -> Result<()> { node0.runtime_config().heartbeat(true); for _i in 0..3 { - let now = InstantOf::::now(); + let now = TypeConfig::now(); TokioRuntime::sleep(Duration::from_millis(500)).await; for node_id in [1, 2, 3] { diff --git a/tests/tests/metrics/t10_leader_last_ack.rs b/tests/tests/metrics/t10_leader_last_ack.rs index 1abe2b907..e2c55bc09 100644 --- a/tests/tests/metrics/t10_leader_last_ack.rs +++ b/tests/tests/metrics/t10_leader_last_ack.rs @@ -3,8 +3,7 @@ use std::time::Duration; use anyhow::Result; use maplit::btreeset; -use openraft::alias::AsyncRuntimeOf; -use openraft::AsyncRuntime; +use openraft::type_config::TypeConfigExt; use openraft::Config; use openraft_memstore::TypeConfig; @@ -41,7 +40,7 @@ async fn leader_last_ack_3_nodes() -> Result<()> { tracing::info!(log_index, "--- sleep 500 ms, the `millis` should extend"); { - AsyncRuntimeOf::::sleep(Duration::from_millis(500)).await; + TypeConfig::sleep(Duration::from_millis(500)).await; let greater = n0.metrics().borrow().millis_since_quorum_ack; println!("greater: {:?}", greater); @@ -70,7 +69,7 @@ async fn leader_last_ack_3_nodes() -> Result<()> { "--- sleep and heartbeat again; millis_since_quorum_ack refreshes" ); { - AsyncRuntimeOf::::sleep(Duration::from_millis(500)).await; + TypeConfig::sleep(Duration::from_millis(500)).await; n0.trigger().heartbeat().await?; @@ -93,7 +92,7 @@ async fn leader_last_ack_3_nodes() -> Result<()> { "--- sleep and heartbeat again; millis_since_quorum_ack does not refresh" ); { - AsyncRuntimeOf::::sleep(Duration::from_millis(500)).await; + TypeConfig::sleep(Duration::from_millis(500)).await; n0.trigger().heartbeat().await?;