Skip to content

Commit

Permalink
Refactor: gate tokio rt with feature tokio-rt
Browse files Browse the repository at this point in the history
  • Loading branch information
SteveLauC committed Aug 2, 2024
1 parent ada27b4 commit c895163
Show file tree
Hide file tree
Showing 16 changed files with 309 additions and 262 deletions.
1 change: 0 additions & 1 deletion examples/raft-kv-memstore-singlethreaded/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#![allow(clippy::uninlined_format_args)]
#![deny(unused_qualifications)]

use std::io::Cursor;
use std::marker::PhantomData;
use std::rc::Rc;
use std::sync::Arc;
Expand Down
1 change: 0 additions & 1 deletion examples/raft-kv-memstore/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#![allow(clippy::uninlined_format_args)]
#![deny(unused_qualifications)]

use std::io::Cursor;
use std::sync::Arc;

use actix_web::middleware;
Expand Down
9 changes: 8 additions & 1 deletion openraft/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@ serde = { workspace = true, optional = true }
serde_json = { workspace = true, optional = true }
tempfile = { workspace = true, optional = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio = { workspace = true, optional = true }
tracing = { workspace = true }
tracing-futures = { workspace = true }
validit = { workspace = true }

or07 = { package = "openraft", version = "0.7.4", optional = true }


[dev-dependencies]
anyhow = { workspace = true }
async-entry = { workspace = true }
Expand All @@ -44,6 +45,10 @@ serde_json = { workspace = true }


[features]
default = ["tokio-rt"]

# Enable the default Tokio runtime
tokio-rt = ["dep:tokio"]

# Enables benchmarks in unittest.
#
Expand Down Expand Up @@ -113,6 +118,8 @@ features = [
"tracing-log",
]

no-default-features = false

# Do not use this to enable all features:
# "singlethreaded" makes `Raft<C>` a `!Send`, which confuses users.
# all-features = true
Expand Down
20 changes: 9 additions & 11 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ use std::time::Duration;

use anyerror::AnyError;
use futures::stream::FuturesUnordered;
use futures::FutureExt;
use futures::StreamExt;
use futures::TryFutureExt;
use maplit::btreeset;
use tokio::select;
use tracing::Instrument;
use tracing::Level;
use tracing::Span;
Expand Down Expand Up @@ -910,19 +910,17 @@ where
// In each loop, the first step is blocking waiting for any message from any channel.
// Then if there is any message, process as many as possible to maximize throughput.

select! {
// Check shutdown in each loop first so that a message flood in `tx_api` won't block shutting down.
// `select!` without `biased` provides a random fairness.
// We want to check shutdown prior to other channels.
// See: https://docs.rs/tokio/latest/tokio/macro.select.html#fairness
biased;

_ = &mut rx_shutdown => {
// Check shutdown in each loop first so that a message flood in `tx_api` won't block shutting down.
// `select!` without `biased` provides a random fairness.
// We want to check shutdown prior to other channels.
// See: https://docs.rs/tokio/latest/tokio/macro.select.html#fairness
futures::select_biased! {
_ = (&mut rx_shutdown).fuse() => {
tracing::info!("recv from rx_shutdown");
return Err(Fatal::Stopped);
}

notify_res = self.rx_notification.recv() => {
notify_res = self.rx_notification.recv().fuse() => {
match notify_res {
Some(notify) => self.handle_notification(notify)?,
None => {
Expand All @@ -932,7 +930,7 @@ where
};
}

msg_res = self.rx_api.recv() => {
msg_res = self.rx_api.recv().fuse() => {
match msg_res {
Some(msg) => self.handle_api_msg(msg).await,
None => {
Expand Down
2 changes: 2 additions & 0 deletions openraft/src/engine/log_id_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,8 @@ where C: RaftTypeConfig
self.key_log_ids.last()
}

// This method will only be used under feature tokio-rt
#[cfg_attr(not(feature = "tokio-rt"), allow(dead_code))]
pub(crate) fn key_log_ids(&self) -> &[LogId<C::NodeId>] {
&self.key_log_ids
}
Expand Down
3 changes: 2 additions & 1 deletion openraft/src/impls/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ pub use crate::entry::Entry;
pub use crate::node::BasicNode;
pub use crate::node::EmptyNode;
pub use crate::raft::responder::impls::OneshotResponder;
pub use crate::type_config::async_runtime::impls::TokioRuntime;
#[cfg(feature = "tokio-rt")]
pub use crate::type_config::async_runtime::tokio_impls::TokioRuntime;
2 changes: 2 additions & 0 deletions openraft/src/instant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,10 @@ pub trait Instant:
}
}

#[cfg(feature = "tokio-rt")]
pub type TokioInstant = tokio::time::Instant;

#[cfg(feature = "tokio-rt")]
impl Instant for tokio::time::Instant {
#[inline]
fn now() -> Self {
Expand Down
5 changes: 4 additions & 1 deletion openraft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ pub mod metrics;
pub mod network;
pub mod raft;
pub mod storage;
#[cfg(feature = "tokio-rt")]
pub mod testing;
pub mod type_config;

Expand All @@ -72,7 +73,8 @@ pub use anyerror;
pub use anyerror::AnyError;
pub use openraft_macros::add_async_trait;
pub use type_config::async_runtime;
pub use type_config::async_runtime::impls::TokioRuntime;
#[cfg(feature = "tokio-rt")]
pub use type_config::async_runtime::tokio_impls::TokioRuntime;
pub use type_config::AsyncRuntime;

pub use crate::base::OptionalSend;
Expand All @@ -86,6 +88,7 @@ pub use crate::core::ServerState;
pub use crate::entry::Entry;
pub use crate::entry::EntryPayload;
pub use crate::instant::Instant;
#[cfg(feature = "tokio-rt")]
pub use crate::instant::TokioInstant;
pub use crate::log_id::LogId;
pub use crate::log_id::LogIdOptionExt;
Expand Down
10 changes: 6 additions & 4 deletions openraft/src/metrics/wait.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use core::time::Duration;
use std::collections::BTreeSet;

use futures::FutureExt;

use crate::async_runtime::watch::WatchReceiver;
use crate::core::ServerState;
use crate::metrics::Condition;
Expand Down Expand Up @@ -62,12 +64,12 @@ where C: RaftTypeConfig
tracing::debug!(?sleep_time, "wait timeout");
let delay = C::sleep(sleep_time);

tokio::select! {
_ = delay => {
tracing::debug!( "id={} timeout wait {:} latest: {}", latest.id, msg.to_string(), latest );
futures::select_biased! {
_ = delay.fuse() => {
tracing::debug!( "id={} timeout wait {:} latest: {}", latest.id, msg.to_string(), latest );
return Err(WaitError::Timeout(self.timeout, format!("{} latest: {}", msg.to_string(), latest)));
}
changed = rx.changed() => {
changed = rx.changed().fuse() => {
match changed {
Ok(_) => {
// metrics changed, continue the waiting loop
Expand Down
Loading

0 comments on commit c895163

Please sign in to comment.