From f1792dce153fd592634076a6340bf07696a0c43f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Sun, 7 Jul 2024 13:25:42 +0800 Subject: [PATCH] Feature: `AsyncRuntime::MpscUnbounded` to abstract mpsc channels - Part of: #1013 --- openraft/src/core/raft_core.rs | 22 +++--- openraft/src/core/sm/handle.rs | 13 ++-- openraft/src/core/sm/worker.rs | 16 +++-- openraft/src/core/tick.rs | 9 +-- openraft/src/raft/mod.rs | 6 +- openraft/src/raft/raft_inner.rs | 5 +- openraft/src/replication/mod.rs | 21 +++--- openraft/src/type_config.rs | 9 +++ .../async_runtime/impls/tokio_runtime.rs | 60 ++++++++++++++++ openraft/src/type_config/async_runtime/mod.rs | 9 +++ .../async_runtime/mpsc_unbounded/mod.rs | 69 +++++++++++++++++++ .../mpsc_unbounded/send_error.rs | 19 +++++ .../mpsc_unbounded/try_recv_error.rs | 23 +++++++ openraft/src/type_config/util.rs | 14 ++++ 14 files changed, 257 insertions(+), 38 deletions(-) create mode 100644 openraft/src/type_config/async_runtime/mpsc_unbounded/mod.rs create mode 100644 openraft/src/type_config/async_runtime/mpsc_unbounded/send_error.rs create mode 100644 openraft/src/type_config/async_runtime/mpsc_unbounded/try_recv_error.rs diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index d7f814528..dfbb36470 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -14,13 +14,14 @@ use futures::StreamExt; use futures::TryFutureExt; use maplit::btreeset; use tokio::select; -use tokio::sync::mpsc; use tokio::sync::watch; use tracing::Instrument; use tracing::Level; use tracing::Span; +use crate::async_runtime::MpscUnboundedSender; use crate::async_runtime::OneshotSender; +use crate::async_runtime::TryRecvError; use crate::config::Config; use crate::config::RuntimeConfig; use crate::core::balancer::Balancer; @@ -86,8 +87,11 @@ use crate::runtime::RaftRuntime; use crate::storage::LogFlushed; use crate::storage::RaftLogStorage; use crate::type_config::alias::InstantOf; +use crate::type_config::alias::MpscUnboundedReceiverOf; +use crate::type_config::alias::MpscUnboundedSenderOf; use crate::type_config::alias::OneshotReceiverOf; use crate::type_config::alias::ResponderOf; +use crate::type_config::async_runtime::MpscUnboundedReceiver; use crate::type_config::TypeConfigExt; use crate::ChangeMembers; use crate::Instant; @@ -164,15 +168,15 @@ where pub(crate) replications: BTreeMap>, #[allow(dead_code)] - pub(crate) tx_api: mpsc::UnboundedSender>, - pub(crate) rx_api: mpsc::UnboundedReceiver>, + pub(crate) tx_api: MpscUnboundedSenderOf>, + pub(crate) rx_api: MpscUnboundedReceiverOf>, /// A Sender to send callback by other components to [`RaftCore`], when an action is finished, /// such as flushing log to disk, or applying log entries to state machine. - pub(crate) tx_notify: mpsc::UnboundedSender>, + pub(crate) tx_notify: MpscUnboundedSenderOf>, /// A Receiver to receive callback from other components. - pub(crate) rx_notify: mpsc::UnboundedReceiver>, + pub(crate) rx_notify: MpscUnboundedReceiverOf>, pub(crate) tx_metrics: watch::Sender>, pub(crate) tx_data_metrics: watch::Sender>, @@ -931,11 +935,11 @@ where let msg = match res { Ok(msg) => msg, Err(e) => match e { - mpsc::error::TryRecvError::Empty => { + TryRecvError::Empty => { tracing::debug!("all RaftMsg are processed, wait for more"); return Ok(i + 1); } - mpsc::error::TryRecvError::Disconnected => { + TryRecvError::Disconnected => { tracing::debug!("rx_api is disconnected, quit"); return Err(Fatal::Stopped); } @@ -966,11 +970,11 @@ where let notify = match res { Ok(msg) => msg, Err(e) => match e { - mpsc::error::TryRecvError::Empty => { + TryRecvError::Empty => { tracing::debug!("all Notify are processed, wait for more"); return Ok(i + 1); } - mpsc::error::TryRecvError::Disconnected => { + TryRecvError::Disconnected => { tracing::error!("rx_notify is disconnected, quit"); return Err(Fatal::Stopped); } diff --git a/openraft/src/core/sm/handle.rs b/openraft/src/core/sm/handle.rs index ff4346868..a1eb9571c 100644 --- a/openraft/src/core/sm/handle.rs +++ b/openraft/src/core/sm/handle.rs @@ -1,9 +1,12 @@ //! State machine control handle -use tokio::sync::mpsc; - +use crate::async_runtime::MpscUnboundedSender; +use crate::async_runtime::MpscUnboundedWeakSender; +use crate::async_runtime::SendError; use crate::core::sm; use crate::type_config::alias::JoinHandleOf; +use crate::type_config::alias::MpscUnboundedSenderOf; +use crate::type_config::alias::MpscUnboundedWeakSenderOf; use crate::type_config::TypeConfigExt; use crate::RaftTypeConfig; use crate::Snapshot; @@ -12,7 +15,7 @@ use crate::Snapshot; pub(crate) struct Handle where C: RaftTypeConfig { - pub(in crate::core::sm) cmd_tx: mpsc::UnboundedSender>, + pub(in crate::core::sm) cmd_tx: MpscUnboundedSenderOf>, #[allow(dead_code)] pub(in crate::core::sm) join_handle: JoinHandleOf, @@ -21,7 +24,7 @@ where C: RaftTypeConfig impl Handle where C: RaftTypeConfig { - pub(crate) fn send(&mut self, cmd: sm::Command) -> Result<(), mpsc::error::SendError>> { + pub(crate) fn send(&mut self, cmd: sm::Command) -> Result<(), SendError>> { tracing::debug!("sending command to state machine worker: {:?}", cmd); self.cmd_tx.send(cmd) } @@ -43,7 +46,7 @@ where C: RaftTypeConfig /// It is weak because the [`Worker`] watches the close event of this channel for shutdown. /// /// [`Worker`]: sm::worker::Worker - cmd_tx: mpsc::WeakUnboundedSender>, + cmd_tx: MpscUnboundedWeakSenderOf>, } impl SnapshotReader diff --git a/openraft/src/core/sm/worker.rs b/openraft/src/core/sm/worker.rs index b259a5f2a..d45a04544 100644 --- a/openraft/src/core/sm/worker.rs +++ b/openraft/src/core/sm/worker.rs @@ -1,5 +1,5 @@ -use tokio::sync::mpsc; - +use crate::async_runtime::MpscUnboundedReceiver; +use crate::async_runtime::MpscUnboundedSender; use crate::async_runtime::OneshotSender; use crate::core::notify::Notify; use crate::core::raft_msg::ResultSender; @@ -18,6 +18,8 @@ use crate::storage::RaftLogReaderExt; use crate::storage::RaftStateMachine; use crate::type_config::alias::JoinHandleOf; use crate::type_config::alias::LogIdOf; +use crate::type_config::alias::MpscUnboundedReceiverOf; +use crate::type_config::alias::MpscUnboundedSenderOf; use crate::type_config::TypeConfigExt; use crate::RaftLogId; use crate::RaftLogReader; @@ -41,10 +43,10 @@ where log_reader: LR, /// Raed command from RaftCore to execute. - cmd_rx: mpsc::UnboundedReceiver>, + cmd_rx: MpscUnboundedReceiverOf>, /// Send back the result of the command to RaftCore. - resp_tx: mpsc::UnboundedSender>, + resp_tx: MpscUnboundedSenderOf>, } impl Worker @@ -54,8 +56,8 @@ where LR: RaftLogReader, { /// Spawn a new state machine worker, return a controlling handle. - pub(crate) fn spawn(state_machine: SM, log_reader: LR, resp_tx: mpsc::UnboundedSender>) -> Handle { - let (cmd_tx, cmd_rx) = mpsc::unbounded_channel(); + pub(crate) fn spawn(state_machine: SM, log_reader: LR, resp_tx: MpscUnboundedSenderOf>) -> Handle { + let (cmd_tx, cmd_rx) = C::mpsc_unbounded(); let worker = Worker { state_machine, @@ -193,7 +195,7 @@ where /// as applying a log entry, /// - or it must be able to acquire a lock that prevents any write operations. #[tracing::instrument(level = "info", skip_all)] - async fn build_snapshot(&mut self, seq: CommandSeq, resp_tx: mpsc::UnboundedSender>) { + async fn build_snapshot(&mut self, seq: CommandSeq, resp_tx: MpscUnboundedSenderOf>) { // TODO: need to be abortable? // use futures::future::abortable; // let (fu, abort_handle) = abortable(async move { builder.build_snapshot().await }); diff --git a/openraft/src/core/tick.rs b/openraft/src/core/tick.rs index 28ee69cb4..7ec997231 100644 --- a/openraft/src/core/tick.rs +++ b/openraft/src/core/tick.rs @@ -7,14 +7,15 @@ use std::sync::Mutex; use std::time::Duration; use futures::future::Either; -use tokio::sync::mpsc; use tokio::sync::oneshot; use tracing::Instrument; use tracing::Level; use tracing::Span; +use crate::async_runtime::MpscUnboundedSender; use crate::core::notify::Notify; use crate::type_config::alias::JoinHandleOf; +use crate::type_config::alias::MpscUnboundedSenderOf; use crate::type_config::TypeConfigExt; use crate::RaftTypeConfig; @@ -24,7 +25,7 @@ where C: RaftTypeConfig { interval: Duration, - tx: mpsc::UnboundedSender>, + tx: MpscUnboundedSenderOf>, /// Emit event or not enabled: Arc, @@ -53,7 +54,7 @@ where C: RaftTypeConfig impl Tick where C: RaftTypeConfig { - pub(crate) fn spawn(interval: Duration, tx: mpsc::UnboundedSender>, enabled: bool) -> TickHandle { + pub(crate) fn spawn(interval: Duration, tx: MpscUnboundedSenderOf>, enabled: bool) -> TickHandle { let enabled = Arc::new(AtomicBool::from(enabled)); let this = Self { interval, @@ -180,7 +181,7 @@ mod tests { #[cfg(not(feature = "singlethreaded"))] #[tokio::test] async fn test_shutdown() -> anyhow::Result<()> { - let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let (tx, mut rx) = TickUTConfig::mpsc_unbounded(); let th = Tick::::spawn(Duration::from_millis(100), tx, true); TickUTConfig::sleep(Duration::from_millis(500)).await; diff --git a/openraft/src/raft/mod.rs b/openraft/src/raft/mod.rs index db3305e02..9a7f7392a 100644 --- a/openraft/src/raft/mod.rs +++ b/openraft/src/raft/mod.rs @@ -40,13 +40,13 @@ pub use message::InstallSnapshotResponse; pub use message::SnapshotResponse; pub use message::VoteRequest; pub use message::VoteResponse; -use tokio::sync::mpsc; use tokio::sync::watch; use tokio::sync::Mutex; use tracing::trace_span; use tracing::Instrument; use tracing::Level; +use crate::async_runtime::MpscUnboundedSender; use crate::async_runtime::OneshotSender; use crate::config::Config; use crate::config::RuntimeConfig; @@ -239,8 +239,8 @@ where C: RaftTypeConfig LS: RaftLogStorage, SM: RaftStateMachine, { - let (tx_api, rx_api) = mpsc::unbounded_channel(); - let (tx_notify, rx_notify) = mpsc::unbounded_channel(); + let (tx_api, rx_api) = C::mpsc_unbounded(); + let (tx_notify, rx_notify) = C::mpsc_unbounded(); let (tx_metrics, rx_metrics) = watch::channel(RaftMetrics::new_initial(id)); let (tx_data_metrics, rx_data_metrics) = watch::channel(RaftDataMetrics::default()); let (tx_server_metrics, rx_server_metrics) = watch::channel(RaftServerMetrics::default()); diff --git a/openraft/src/raft/raft_inner.rs b/openraft/src/raft/raft_inner.rs index ad51e76d8..2fcec8173 100644 --- a/openraft/src/raft/raft_inner.rs +++ b/openraft/src/raft/raft_inner.rs @@ -3,11 +3,11 @@ use std::fmt::Debug; use std::future::Future; use std::sync::Arc; -use tokio::sync::mpsc; use tokio::sync::watch; use tokio::sync::Mutex; use tracing::Level; +use crate::async_runtime::MpscUnboundedSender; use crate::config::RuntimeConfig; use crate::core::raft_msg::external_command::ExternalCommand; use crate::core::raft_msg::RaftMsg; @@ -17,6 +17,7 @@ use crate::error::RaftError; use crate::metrics::RaftDataMetrics; use crate::metrics::RaftServerMetrics; use crate::raft::core_state::CoreState; +use crate::type_config::alias::MpscUnboundedSenderOf; use crate::type_config::alias::OneshotReceiverOf; use crate::type_config::alias::OneshotSenderOf; use crate::type_config::AsyncRuntime; @@ -34,7 +35,7 @@ where C: RaftTypeConfig pub(in crate::raft) config: Arc, pub(in crate::raft) runtime_config: Arc, pub(in crate::raft) tick_handle: TickHandle, - pub(in crate::raft) tx_api: mpsc::UnboundedSender>, + pub(in crate::raft) tx_api: MpscUnboundedSenderOf>, pub(in crate::raft) rx_metrics: watch::Receiver>, pub(in crate::raft) rx_data_metrics: watch::Receiver>, pub(in crate::raft) rx_server_metrics: watch::Receiver>, diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index d0a460b52..4bb74f6c0 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -19,11 +19,13 @@ use request::Replicate; use response::ReplicationResult; pub(crate) use response::Response; use tokio::select; -use tokio::sync::mpsc; use tokio::sync::oneshot; use tokio::sync::Mutex; use tracing_futures::Instrument; +use crate::async_runtime::MpscUnboundedReceiver; +use crate::async_runtime::MpscUnboundedSender; +use crate::async_runtime::MpscUnboundedWeakSender; use crate::config::Config; use crate::core::notify::Notify; use crate::core::sm::handle::SnapshotReader; @@ -51,6 +53,9 @@ use crate::storage::Snapshot; use crate::type_config::alias::InstantOf; use crate::type_config::alias::JoinHandleOf; use crate::type_config::alias::LogIdOf; +use crate::type_config::alias::MpscUnboundedReceiverOf; +use crate::type_config::alias::MpscUnboundedSenderOf; +use crate::type_config::alias::MpscUnboundedWeakSenderOf; use crate::type_config::TypeConfigExt; use crate::LogId; use crate::RaftLogId; @@ -68,7 +73,7 @@ where C: RaftTypeConfig pub(crate) join_handle: JoinHandleOf>, /// The channel used for communicating with the replication task. - pub(crate) tx_repl: mpsc::UnboundedSender>, + pub(crate) tx_repl: MpscUnboundedSenderOf>, } /// A task responsible for sending replication events to a target follower in the Raft cluster. @@ -90,17 +95,17 @@ where /// A channel for sending events to the RaftCore. #[allow(clippy::type_complexity)] - tx_raft_core: mpsc::UnboundedSender>, + tx_raft_core: MpscUnboundedSenderOf>, /// A channel for receiving events from the RaftCore and snapshot transmitting task. - rx_event: mpsc::UnboundedReceiver>, + rx_event: MpscUnboundedReceiverOf>, /// A weak reference to the Sender for the separate sending-snapshot task to send callback. /// /// Because 1) ReplicationCore replies on the `close` event to shutdown. /// 2) ReplicationCore holds this tx; It is made a weak so that when /// RaftCore drops the only non-weak tx, the Receiver `rx_repl` will be closed. - weak_tx_event: mpsc::WeakUnboundedSender>, + weak_tx_event: MpscUnboundedWeakSenderOf>, /// The `RaftNetwork` interface for replicating logs and heartbeat. network: N::Network, @@ -164,7 +169,7 @@ where snapshot_network: N::Network, log_reader: LS::LogReader, snapshot_reader: SnapshotReader, - tx_raft_core: mpsc::UnboundedSender>, + tx_raft_core: MpscUnboundedSenderOf>, span: tracing::Span, ) -> ReplicationHandle { tracing::debug!( @@ -176,7 +181,7 @@ where ); // other component to ReplicationStream - let (tx_event, rx_event) = mpsc::unbounded_channel(); + let (tx_event, rx_event) = C::mpsc_unbounded(); let this = Self { target, @@ -757,7 +762,7 @@ where snapshot: Snapshot, option: RPCOption, cancel: oneshot::Receiver<()>, - weak_tx: mpsc::WeakUnboundedSender>, + weak_tx: MpscUnboundedWeakSenderOf>, ) { let meta = snapshot.meta.clone(); diff --git a/openraft/src/type_config.rs b/openraft/src/type_config.rs index db7c0ae67..a2a34ebea 100644 --- a/openraft/src/type_config.rs +++ b/openraft/src/type_config.rs @@ -9,6 +9,7 @@ pub(crate) mod util; use std::fmt::Debug; pub use async_runtime::AsyncRuntime; +pub use async_runtime::MpscUnbounded; pub use async_runtime::OneshotSender; pub use util::TypeConfigExt; @@ -93,6 +94,7 @@ pub trait RaftTypeConfig: /// /// [`type-alias`]: crate::docs::feature_flags#feature-flag-type-alias pub mod alias { + use crate::async_runtime::MpscUnbounded; use crate::raft::responder::Responder; use crate::type_config::AsyncRuntime; use crate::RaftTypeConfig; @@ -118,6 +120,13 @@ pub mod alias { pub type OneshotSenderOf = as AsyncRuntime>::OneshotSender; pub type OneshotReceiverErrorOf = as AsyncRuntime>::OneshotReceiverError; pub type OneshotReceiverOf = as AsyncRuntime>::OneshotReceiver; + pub type MpscUnboundedOf = as AsyncRuntime>::MpscUnbounded; + + type Mpsc = MpscUnboundedOf; + + pub type MpscUnboundedSenderOf = as MpscUnbounded>::Sender; + pub type MpscUnboundedReceiverOf = as MpscUnbounded>::Receiver; + pub type MpscUnboundedWeakSenderOf = as MpscUnbounded>::WeakSender; // Usually used types pub type LogIdOf = crate::LogId>; diff --git a/openraft/src/type_config/async_runtime/impls/tokio_runtime.rs b/openraft/src/type_config/async_runtime/impls/tokio_runtime.rs index 7008fdc2e..58b10551f 100644 --- a/openraft/src/type_config/async_runtime/impls/tokio_runtime.rs +++ b/openraft/src/type_config/async_runtime/impls/tokio_runtime.rs @@ -1,6 +1,10 @@ use std::future::Future; use std::time::Duration; +use tokio::sync::mpsc; + +use crate::async_runtime::mpsc_unbounded; +use crate::async_runtime::mpsc_unbounded::MpscUnbounded; use crate::type_config::OneshotSender; use crate::AsyncRuntime; use crate::OptionalSend; @@ -74,6 +78,8 @@ impl AsyncRuntime for TokioRuntime { let (tx, rx) = tokio::sync::oneshot::channel(); (tx, rx) } + + type MpscUnbounded = TokioMpscUnbounded; } impl OneshotSender for tokio::sync::oneshot::Sender { @@ -82,3 +88,57 @@ impl OneshotSender for tokio::sync::oneshot::Sender { self.send(t) } } + +pub struct TokioMpscUnbounded; + +impl MpscUnbounded for TokioMpscUnbounded { + type Sender = mpsc::UnboundedSender; + type Receiver = mpsc::UnboundedReceiver; + type WeakSender = mpsc::WeakUnboundedSender; + + /// Creates an unbounded mpsc channel for communicating between asynchronous + /// tasks without backpressure. + fn channel() -> (Self::Sender, Self::Receiver) { + mpsc::unbounded_channel() + } +} + +impl mpsc_unbounded::MpscUnboundedSender for mpsc::UnboundedSender +where T: OptionalSend +{ + #[inline] + fn send(&self, msg: T) -> Result<(), mpsc_unbounded::SendError> { + self.send(msg).map_err(|e| mpsc_unbounded::SendError(e.0)) + } + + #[inline] + fn downgrade(&self) -> ::WeakSender { + self.downgrade() + } +} + +impl mpsc_unbounded::MpscUnboundedReceiver for mpsc::UnboundedReceiver +where T: OptionalSend +{ + #[inline] + async fn recv(&mut self) -> Option { + self.recv().await + } + + #[inline] + fn try_recv(&mut self) -> Result { + self.try_recv().map_err(|e| match e { + mpsc::error::TryRecvError::Empty => mpsc_unbounded::TryRecvError::Empty, + mpsc::error::TryRecvError::Disconnected => mpsc_unbounded::TryRecvError::Disconnected, + }) + } +} + +impl mpsc_unbounded::MpscUnboundedWeakSender for mpsc::WeakUnboundedSender +where T: OptionalSend +{ + #[inline] + fn upgrade(&self) -> Option<::Sender> { + self.upgrade() + } +} diff --git a/openraft/src/type_config/async_runtime/mod.rs b/openraft/src/type_config/async_runtime/mod.rs index 47a789c94..db943e574 100644 --- a/openraft/src/type_config/async_runtime/mod.rs +++ b/openraft/src/type_config/async_runtime/mod.rs @@ -8,6 +8,7 @@ pub(crate) mod impls { pub use tokio_runtime::TokioRuntime; } +pub mod mpsc_unbounded; mod oneshot; use std::fmt::Debug; @@ -15,6 +16,12 @@ use std::fmt::Display; use std::future::Future; use std::time::Duration; +pub use mpsc_unbounded::MpscUnbounded; +pub use mpsc_unbounded::MpscUnboundedReceiver; +pub use mpsc_unbounded::MpscUnboundedSender; +pub use mpsc_unbounded::MpscUnboundedWeakSender; +pub use mpsc_unbounded::SendError; +pub use mpsc_unbounded::TryRecvError; pub use oneshot::OneshotSender; use crate::Instant; @@ -107,4 +114,6 @@ pub trait AsyncRuntime: Debug + Default + PartialEq + Eq + OptionalSend + Option /// Each handle can be used on separate tasks. fn oneshot() -> (Self::OneshotSender, Self::OneshotReceiver) where T: OptionalSend; + + type MpscUnbounded: MpscUnbounded; } diff --git a/openraft/src/type_config/async_runtime/mpsc_unbounded/mod.rs b/openraft/src/type_config/async_runtime/mpsc_unbounded/mod.rs new file mode 100644 index 000000000..c5790c6c2 --- /dev/null +++ b/openraft/src/type_config/async_runtime/mpsc_unbounded/mod.rs @@ -0,0 +1,69 @@ +mod send_error; +mod try_recv_error; + +pub use send_error::SendError; +pub use try_recv_error::TryRecvError; + +use crate::OptionalSend; +use crate::OptionalSync; + +pub trait MpscUnbounded: Sized + OptionalSend { + type Sender: MpscUnboundedSender; + type Receiver: MpscUnboundedReceiver; + type WeakSender: MpscUnboundedWeakSender; + + fn channel() -> (Self::Sender, Self::Receiver); +} + +/// Send values to the associated [`MpscUnboundedReceiver`]. +pub trait MpscUnboundedSender: OptionalSend + OptionalSync + Clone +where + MU: MpscUnbounded, + T: OptionalSend, +{ + /// Attempts to send a message without blocking. + /// + /// If the receive half of the channel is closed, this + /// function returns an error. The error includes the value passed to `send`. + fn send(&self, msg: T) -> Result<(), SendError>; + + /// Converts the [`MpscUnboundedSender`] to a [`MpscUnboundedWeakSender`] that does not count + /// towards RAII semantics, i.e. if all `Sender` instances of the + /// channel were dropped and only `WeakSender` instances remain, + /// the channel is closed. + fn downgrade(&self) -> MU::WeakSender; +} + +/// Receive values from the associated [`MpscUnboundedSender`]. +pub trait MpscUnboundedReceiver: OptionalSend + OptionalSync { + /// Receives the next value for this receiver. + /// + /// This method returns `None` if the channel has been closed and there are + /// no remaining messages in the channel's buffer. + fn recv(&mut self) -> impl std::future::Future> + OptionalSend; + + /// Tries to receive the next value for this receiver. + /// + /// This method returns the [`TryRecvError::Empty`] error if the channel is currently + /// empty, but there are still outstanding senders. + /// + /// This method returns the [`TryRecvError::Disconnected`] error if the channel is + /// currently empty, and there are no outstanding senders. + fn try_recv(&mut self) -> Result; +} + +/// A sender that does not prevent the channel from being closed. +/// +/// If all [`MpscUnboundedSender`] instances of a channel were dropped and only +/// `WeakSender` instances remain, the channel is closed. +pub trait MpscUnboundedWeakSender: OptionalSend + OptionalSync + Clone +where + MU: MpscUnbounded, + T: OptionalSend, +{ + /// Tries to convert a [`MpscUnboundedWeakSender`] into an [`MpscUnboundedSender`]. + /// + /// This will return `Some` if there are other `Sender` instances alive and + /// the channel wasn't previously dropped, otherwise `None` is returned. + fn upgrade(&self) -> Option>; +} diff --git a/openraft/src/type_config/async_runtime/mpsc_unbounded/send_error.rs b/openraft/src/type_config/async_runtime/mpsc_unbounded/send_error.rs new file mode 100644 index 000000000..ef448ee33 --- /dev/null +++ b/openraft/src/type_config/async_runtime/mpsc_unbounded/send_error.rs @@ -0,0 +1,19 @@ +use std::fmt; + +/// Error returned by the `Sender`. +#[derive(PartialEq, Eq, Clone, Copy)] +pub struct SendError(pub T); + +impl fmt::Debug for SendError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SendError").finish_non_exhaustive() + } +} + +impl fmt::Display for SendError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "channel closed") + } +} + +impl std::error::Error for SendError {} diff --git a/openraft/src/type_config/async_runtime/mpsc_unbounded/try_recv_error.rs b/openraft/src/type_config/async_runtime/mpsc_unbounded/try_recv_error.rs new file mode 100644 index 000000000..94d5a0473 --- /dev/null +++ b/openraft/src/type_config/async_runtime/mpsc_unbounded/try_recv_error.rs @@ -0,0 +1,23 @@ +use std::fmt; + +/// Error returned by `try_recv`. +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +pub enum TryRecvError { + /// This **channel** is currently empty, but the **Sender**(s) have not yet + /// disconnected, so data may yet become available. + Empty, + /// The **channel**'s sending half has become disconnected, and there will + /// never be any more data received on it. + Disconnected, +} + +impl fmt::Display for TryRecvError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + TryRecvError::Empty => "receiving on an empty channel".fmt(fmt), + TryRecvError::Disconnected => "receiving on a closed channel".fmt(fmt), + } + } +} + +impl std::error::Error for TryRecvError {} diff --git a/openraft/src/type_config/util.rs b/openraft/src/type_config/util.rs index c26e14038..eca62d7f6 100644 --- a/openraft/src/type_config/util.rs +++ b/openraft/src/type_config/util.rs @@ -3,9 +3,13 @@ use std::time::Duration; use openraft_macros::since; +use crate::async_runtime::MpscUnbounded; use crate::type_config::alias::AsyncRuntimeOf; use crate::type_config::alias::InstantOf; use crate::type_config::alias::JoinHandleOf; +use crate::type_config::alias::MpscUnboundedOf; +use crate::type_config::alias::MpscUnboundedReceiverOf; +use crate::type_config::alias::MpscUnboundedSenderOf; use crate::type_config::alias::OneshotReceiverOf; use crate::type_config::alias::OneshotSenderOf; use crate::type_config::alias::SleepOf; @@ -59,6 +63,16 @@ pub trait TypeConfigExt: RaftTypeConfig { AsyncRuntimeOf::::oneshot() } + /// Creates an unbounded mpsc channel for communicating between asynchronous + /// tasks without backpressure. + /// + /// This is just a wrapper of + /// [`AsyncRuntime::MpscUnbounded::channel()`](`crate::async_runtime::MpscUnbounded::channel`). + fn mpsc_unbounded() -> (MpscUnboundedSenderOf, MpscUnboundedReceiverOf) + where T: OptionalSend { + MpscUnboundedOf::::channel() + } + // Task methods /// Spawn a new task.