From b72ea560c11c125a842d1acd3bf2614ee947b57d Mon Sep 17 00:00:00 2001 From: Steve Lau Date: Thu, 1 Aug 2024 16:14:10 +0800 Subject: [PATCH] Feature: monoio runtime support - Add monoio runtime in a separate crate. - Test: test MonoioRuntime with `Suite::test_all()` - Add CI: run monoio unit tests with a separate job, because it does belong to the workspace --- .github/workflows/ci.yaml | 27 +++ Cargo.toml | 1 + rt-monoio/Cargo.toml | 23 +++ rt-monoio/README.md | 5 + rt-monoio/src/lib.rs | 416 ++++++++++++++++++++++++++++++++++++++ 5 files changed, 472 insertions(+) create mode 100644 rt-monoio/Cargo.toml create mode 100644 rt-monoio/README.md create mode 100644 rt-monoio/src/lib.rs diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index f5b13d884..f438d7ab1 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -206,6 +206,33 @@ jobs: RUST_LOG: debug RUST_BACKTRACE: full + + # Test external crate. + rt-monoio: + runs-on: ubuntu-latest + + steps: + - name: Setup | Checkout + uses: actions/checkout@v2 + + + - name: Setup | Toolchain + uses: actions-rs/toolchain@v1.0.6 + with: + toolchain: "nightly" + override: true + + + - name: Unit Tests + uses: actions-rs/cargo@v1 + with: + command: test + args: --tests --manifest-path "rt-monoio/Cargo.toml" + env: + RUST_LOG: debug + RUST_BACKTRACE: full + + # Feature "serde" will be enabled if one of the member crates enables # "serde", such as `memstore`, when building a cargo workspace. # diff --git a/Cargo.toml b/Cargo.toml index 3406d4f61..8b216789a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,4 +64,5 @@ exclude = [ "examples/raft-kv-memstore-network-v2", "examples/raft-kv-memstore-opendal-snapshot-data", "examples/raft-kv-rocksdb", + "rt-monoio" ] diff --git a/rt-monoio/Cargo.toml b/rt-monoio/Cargo.toml new file mode 100644 index 000000000..c30f8f104 --- /dev/null +++ b/rt-monoio/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "openraft-rt-monoio" +description = "monoio AsyncRuntime support for Openraft" +documentation = "https://docs.rs/openraft-rt-monoio" +readme = "README.md" +version = "0.10.0" +edition = "2021" +authors = [ + "Databend Authors ", +] +categories = ["algorithms", "asynchronous", "data-structures"] +homepage = "https://github.com/datafuselabs/openraft" +keywords = ["raft", "consensus"] +license = "MIT OR Apache-2.0" +repository = "https://github.com/datafuselabs/openraft" + +[dependencies] +openraft = { path = "../openraft", version = "0.10.0", default-features = false, features = ["singlethreaded"] } + +rand = "0.8" +tokio = { version = "1.22", features = ["sync"] } +monoio = "0.2.3" +local-sync = "0.1.1" diff --git a/rt-monoio/README.md b/rt-monoio/README.md new file mode 100644 index 000000000..729ced282 --- /dev/null +++ b/rt-monoio/README.md @@ -0,0 +1,5 @@ +# openraft-rt-monoio + +monoio [`AsyncRuntime`][rt_link] support for Openraft. + +[rt_link]: https://docs.rs/openraft/latest/openraft/async_runtime/trait.AsyncRuntime.html \ No newline at end of file diff --git a/rt-monoio/src/lib.rs b/rt-monoio/src/lib.rs new file mode 100644 index 000000000..62441f0cd --- /dev/null +++ b/rt-monoio/src/lib.rs @@ -0,0 +1,416 @@ +//! This crate provides a [`MonoioRuntime`] type, which has [`AsyncRuntime`] +//! implemented so that you can use Openraft with [Monoio](monoio). +//! +//! ```ignore +//! pub struct TypeConfig {} +//! +//! impl openraft::RaftTypeConfig for TypeConfig { +//! // Other type are omitted +//! +//! type AsyncRuntime = openraft_rt_monoio::MonoioRuntime; +//! } +//! ``` +//! +//! # NOTE +//! +//! 1. For the Openraft dependency used with this crate +//! 1. You can disable the `default` feature as you don't need the built-in Tokio runtime. +//! 2. The `singlethreaded` feature needs to be enabled or this crate won't work. +//! 2. With the `singlethreaded` feature enabled, the handle type [`Raft`](openraft::Raft) will be +//! no longer [`Send`] and [`Sync`]. +//! 3. Even though this crate allows you to use Monoio, it still uses some primitives from Tokio +//! 1. `MpscUnbounded`: Monoio (or `local_sync`)'s unbounded MPSC implementation does not have a +//! weak sender. +//! 2. `Watch`: Monoio (or `local_sync`) does not have a watch channel. +//! 3. `Mutex`: Monoio does not provide a Mutex implementation. + +use std::future::Future; +use std::time::Duration; + +use openraft::AsyncRuntime; +use openraft::OptionalSend; + +/// [`AsyncRuntime`] implementation for Monoio. +#[derive(Debug, Default, PartialEq, Eq)] +pub struct MonoioRuntime; + +impl AsyncRuntime for MonoioRuntime { + // Joining an async task on Monoio always succeeds + type JoinError = openraft::error::Infallible; + type JoinHandle = monoio::task::JoinHandle>; + type Sleep = monoio::time::Sleep; + type Instant = instant_mod::MonoioInstant; + type TimeoutError = monoio::time::error::Elapsed; + type Timeout + OptionalSend> = monoio::time::Timeout; + type ThreadLocalRng = rand::rngs::ThreadRng; + + #[inline] + fn spawn(future: T) -> Self::JoinHandle + where + T: Future + OptionalSend + 'static, + T::Output: OptionalSend + 'static, + { + monoio::spawn(async move { Ok(future.await) }) + } + + #[inline] + fn sleep(duration: Duration) -> Self::Sleep { + monoio::time::sleep(duration) + } + + #[inline] + fn sleep_until(deadline: Self::Instant) -> Self::Sleep { + monoio::time::sleep_until(deadline.0) + } + + #[inline] + fn timeout + OptionalSend>(duration: Duration, future: F) -> Self::Timeout { + monoio::time::timeout(duration, future) + } + + #[inline] + fn timeout_at + OptionalSend>(deadline: Self::Instant, future: F) -> Self::Timeout { + monoio::time::timeout_at(deadline.0, future) + } + + #[inline] + fn is_panic(_join_error: &Self::JoinError) -> bool { + // Given that joining a task will never fail, i.e., `Self::JoinError` + // will never be constructed, and it is impossible to construct an + // enum like `Infallible`, this function could never be invoked. + unreachable!("unreachable since argument `join_error` could never be constructed") + } + + #[inline] + fn thread_rng() -> Self::ThreadLocalRng { + rand::thread_rng() + } + + type MpscUnbounded = mpsc_mod::TokioMpscUnbounded; + type Watch = watch_mod::TokioWatch; + type Oneshot = oneshot_mod::MonoioOneshot; + type Mutex = mutex_mod::TokioMutex; +} + +// Put the wrapper types in a private module to make them `pub` but not +// exposed to the user. +mod instant_mod { + //! Instant channel wrapper type and its trait impl. + + use std::ops::Add; + use std::ops::AddAssign; + use std::ops::Sub; + use std::ops::SubAssign; + use std::time::Duration; + + use openraft::instant; + + #[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)] + pub struct MonoioInstant(pub(crate) monoio::time::Instant); + + impl Add for MonoioInstant { + type Output = Self; + + #[inline] + fn add(self, rhs: Duration) -> Self::Output { + Self(self.0.add(rhs)) + } + } + + impl AddAssign for MonoioInstant { + #[inline] + fn add_assign(&mut self, rhs: Duration) { + self.0.add_assign(rhs) + } + } + + impl Sub for MonoioInstant { + type Output = Self; + + #[inline] + fn sub(self, rhs: Duration) -> Self::Output { + Self(self.0.sub(rhs)) + } + } + + impl Sub for MonoioInstant { + type Output = Duration; + + #[inline] + fn sub(self, rhs: Self) -> Self::Output { + self.0.sub(rhs.0) + } + } + + impl SubAssign for MonoioInstant { + #[inline] + fn sub_assign(&mut self, rhs: Duration) { + self.0.sub_assign(rhs) + } + } + + impl instant::Instant for MonoioInstant { + #[inline] + fn now() -> Self { + let inner = monoio::time::Instant::now(); + Self(inner) + } + + #[inline] + fn elapsed(&self) -> Duration { + self.0.elapsed() + } + } +} + +// Put the wrapper types in a private module to make them `pub` but not +// exposed to the user. +mod oneshot_mod { + //! Oneshot channel wrapper types and their trait impl. + + use local_sync::oneshot as monoio_oneshot; + use openraft::type_config::async_runtime::oneshot; + use openraft::OptionalSend; + + pub struct MonoioOneshot; + + pub struct MonoioOneshotSender(monoio_oneshot::Sender); + + impl oneshot::Oneshot for MonoioOneshot { + type Sender = MonoioOneshotSender; + type Receiver = monoio_oneshot::Receiver; + type ReceiverError = monoio_oneshot::error::RecvError; + + #[inline] + fn channel() -> (Self::Sender, Self::Receiver) + where T: OptionalSend { + let (tx, rx) = monoio_oneshot::channel(); + let tx_wrapper = MonoioOneshotSender(tx); + + (tx_wrapper, rx) + } + } + + impl oneshot::OneshotSender for MonoioOneshotSender + where T: OptionalSend + { + #[inline] + fn send(self, t: T) -> Result<(), T> { + self.0.send(t) + } + } +} + +// Put the wrapper types in a private module to make them `pub` but not +// exposed to the user. +mod mpsc_mod { + //! Unbounded MPSC channel wrapper types and their trait impl. + + use openraft::type_config::async_runtime::mpsc_unbounded; + use openraft::OptionalSend; + use tokio::sync::mpsc as tokio_mpsc; + + pub struct TokioMpscUnbounded; + + pub struct TokioMpscUnboundedSender(tokio_mpsc::UnboundedSender); + + impl Clone for TokioMpscUnboundedSender { + #[inline] + fn clone(&self) -> Self { + Self(self.0.clone()) + } + } + + pub struct TokioMpscUnboundedReceiver(tokio_mpsc::UnboundedReceiver); + + pub struct TokioMpscUnboundedWeakSender(tokio_mpsc::WeakUnboundedSender); + + impl Clone for TokioMpscUnboundedWeakSender { + #[inline] + fn clone(&self) -> Self { + Self(self.0.clone()) + } + } + + impl mpsc_unbounded::MpscUnbounded for TokioMpscUnbounded { + type Sender = TokioMpscUnboundedSender; + type Receiver = TokioMpscUnboundedReceiver; + type WeakSender = TokioMpscUnboundedWeakSender; + + #[inline] + fn channel() -> (Self::Sender, Self::Receiver) { + let (tx, rx) = tokio_mpsc::unbounded_channel(); + let tx_wrapper = TokioMpscUnboundedSender(tx); + let rx_wrapper = TokioMpscUnboundedReceiver(rx); + + (tx_wrapper, rx_wrapper) + } + } + + impl mpsc_unbounded::MpscUnboundedSender for TokioMpscUnboundedSender + where T: OptionalSend + { + #[inline] + fn send(&self, msg: T) -> Result<(), mpsc_unbounded::SendError> { + self.0.send(msg).map_err(|e| mpsc_unbounded::SendError(e.0)) + } + + #[inline] + fn downgrade(&self) -> ::WeakSender { + let inner = self.0.downgrade(); + TokioMpscUnboundedWeakSender(inner) + } + } + + impl mpsc_unbounded::MpscUnboundedReceiver for TokioMpscUnboundedReceiver { + #[inline] + async fn recv(&mut self) -> Option { + self.0.recv().await + } + + #[inline] + fn try_recv(&mut self) -> Result { + self.0.try_recv().map_err(|e| match e { + tokio_mpsc::error::TryRecvError::Empty => mpsc_unbounded::TryRecvError::Empty, + tokio_mpsc::error::TryRecvError::Disconnected => mpsc_unbounded::TryRecvError::Disconnected, + }) + } + } + + impl mpsc_unbounded::MpscUnboundedWeakSender for TokioMpscUnboundedWeakSender + where T: OptionalSend + { + #[inline] + fn upgrade(&self) -> Option<::Sender> { + self.0.upgrade().map(TokioMpscUnboundedSender) + } + } +} + +// Put the wrapper types in a private module to make them `pub` but not +// exposed to the user. +mod watch_mod { + //! Watch channel wrapper types and their trait impl. + + use std::ops::Deref; + + use openraft::async_runtime::watch::RecvError; + use openraft::async_runtime::watch::SendError; + use openraft::type_config::async_runtime::watch; + use openraft::OptionalSend; + use openraft::OptionalSync; + use tokio::sync::watch as tokio_watch; + + pub struct TokioWatch; + pub struct TokioWatchSender(tokio_watch::Sender); + pub struct TokioWatchReceiver(tokio_watch::Receiver); + pub struct TokioWatchRef<'a, T>(tokio_watch::Ref<'a, T>); + + impl watch::Watch for TokioWatch { + type Sender = TokioWatchSender; + type Receiver = TokioWatchReceiver; + type Ref<'a, T: OptionalSend + 'a> = TokioWatchRef<'a, T>; + + #[inline] + fn channel(init: T) -> (Self::Sender, Self::Receiver) { + let (tx, rx) = tokio_watch::channel(init); + let tx_wrapper = TokioWatchSender(tx); + let rx_wrapper = TokioWatchReceiver(rx); + + (tx_wrapper, rx_wrapper) + } + } + + impl watch::WatchSender for TokioWatchSender + where T: OptionalSend + OptionalSync + { + #[inline] + fn send(&self, value: T) -> Result<(), SendError> { + self.0.send(value).map_err(|e| watch::SendError(e.0)) + } + + #[inline] + fn send_if_modified(&self, modify: F) -> bool + where F: FnOnce(&mut T) -> bool { + self.0.send_if_modified(modify) + } + + #[inline] + fn borrow_watched(&self) -> ::Ref<'_, T> { + let inner = self.0.borrow(); + TokioWatchRef(inner) + } + } + + impl Clone for TokioWatchReceiver { + #[inline] + fn clone(&self) -> Self { + Self(self.0.clone()) + } + } + + impl watch::WatchReceiver for TokioWatchReceiver + where T: OptionalSend + OptionalSync + { + #[inline] + async fn changed(&mut self) -> Result<(), RecvError> { + self.0.changed().await.map_err(|_| watch::RecvError(())) + } + + #[inline] + fn borrow_watched(&self) -> ::Ref<'_, T> { + TokioWatchRef(self.0.borrow()) + } + } + + impl<'a, T> Deref for TokioWatchRef<'a, T> { + type Target = T; + + #[inline] + fn deref(&self) -> &Self::Target { + self.0.deref() + } + } +} + +// Put the wrapper types in a private module to make them `pub` but not +// exposed to the user. +mod mutex_mod { + //! Mutex wrapper type and its trait impl. + + use std::future::Future; + + use openraft::type_config::async_runtime::mutex; + use openraft::OptionalSend; + + pub struct TokioMutex(tokio::sync::Mutex); + + impl mutex::Mutex for TokioMutex + where T: OptionalSend + 'static + { + type Guard<'a> = tokio::sync::MutexGuard<'a, T>; + + #[inline] + fn new(value: T) -> Self { + TokioMutex(tokio::sync::Mutex::new(value)) + } + + #[inline] + fn lock(&self) -> impl Future> + OptionalSend { + self.0.lock() + } + } +} + +#[cfg(test)] +mod tests { + use openraft::testing::runtime::Suite; + + use super::*; + + #[test] + fn test_monoio_rt() { + let mut rt = monoio::RuntimeBuilder::::new().enable_all().build().unwrap(); + rt.block_on(Suite::::test_all()); + } +}