Skip to content

Commit

Permalink
Refactor: replace tokio oneshot with the one configured
Browse files Browse the repository at this point in the history
  • Loading branch information
SteveLauC authored and drmingdrmer committed Jul 26, 2024
1 parent cd19048 commit 590d943
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 12 deletions.
10 changes: 6 additions & 4 deletions openraft/src/core/tick.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::sync::Mutex;
use std::time::Duration;

use futures::future::Either;
use tokio::sync::oneshot;
use tracing::Instrument;
use tracing::Level;
use tracing::Span;
Expand All @@ -16,6 +15,9 @@ use crate::async_runtime::MpscUnboundedSender;
use crate::core::notification::Notification;
use crate::type_config::alias::JoinHandleOf;
use crate::type_config::alias::MpscUnboundedSenderOf;
use crate::type_config::alias::OneshotReceiverOf;
use crate::type_config::alias::OneshotSenderOf;
use crate::type_config::async_runtime::oneshot::OneshotSender;
use crate::type_config::TypeConfigExt;
use crate::RaftTypeConfig;

Expand All @@ -35,7 +37,7 @@ pub(crate) struct TickHandle<C>
where C: RaftTypeConfig
{
enabled: Arc<AtomicBool>,
shutdown: Mutex<Option<oneshot::Sender<()>>>,
shutdown: Mutex<Option<OneshotSenderOf<C, ()>>>,
join_handle: Mutex<Option<JoinHandleOf<C, ()>>>,
}

Expand Down Expand Up @@ -66,7 +68,7 @@ where C: RaftTypeConfig
tx,
};

let (shutdown, shutdown_rx) = oneshot::channel();
let (shutdown, shutdown_rx) = C::oneshot();

let shutdown = Mutex::new(Some(shutdown));

Expand All @@ -83,7 +85,7 @@ where C: RaftTypeConfig
}
}

pub(crate) async fn tick_loop(self, mut cancel_rx: oneshot::Receiver<()>) {
pub(crate) async fn tick_loop(self, mut cancel_rx: OneshotReceiverOf<C, ()>) {
let mut i = 0;

let mut cancel = std::pin::pin!(cancel_rx);
Expand Down
9 changes: 5 additions & 4 deletions openraft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use request::Replicate;
use response::ReplicationResult;
pub(crate) use response::Response;
use tokio::select;
use tokio::sync::oneshot;
use tokio::sync::Mutex;
use tracing_futures::Instrument;

Expand Down Expand Up @@ -57,6 +56,8 @@ use crate::type_config::alias::LogIdOf;
use crate::type_config::alias::MpscUnboundedReceiverOf;
use crate::type_config::alias::MpscUnboundedSenderOf;
use crate::type_config::alias::MpscUnboundedWeakSenderOf;
use crate::type_config::alias::OneshotReceiverOf;
use crate::type_config::alias::OneshotSenderOf;
use crate::type_config::TypeConfigExt;
use crate::LogId;
use crate::RaftLogId;
Expand Down Expand Up @@ -120,7 +121,7 @@ where
/// It includes a cancel signaler and the join handle of the snapshot replication task.
/// When ReplicationCore is dropped, this Sender is dropped, the snapshot task will be notified
/// to quit.
snapshot_state: Option<(oneshot::Sender<()>, JoinHandleOf<C, ()>)>,
snapshot_state: Option<(OneshotSenderOf<C, ()>, JoinHandleOf<C, ()>)>,

/// The backoff policy if an [`Unreachable`](`crate::error::Unreachable`) error is returned.
/// It will be reset to `None` when an successful response is received.
Expand Down Expand Up @@ -731,7 +732,7 @@ where
let mut option = RPCOption::new(self.config.install_snapshot_timeout());
option.snapshot_chunk_size = Some(self.config.snapshot_max_chunk_size as usize);

let (tx_cancel, rx_cancel) = oneshot::channel();
let (tx_cancel, rx_cancel) = C::oneshot();

let jh = C::spawn(Self::send_snapshot(
request_id,
Expand All @@ -757,7 +758,7 @@ where
vote: Vote<C::NodeId>,
snapshot: Snapshot<C>,
option: RPCOption,
cancel: oneshot::Receiver<()>,
cancel: OneshotReceiverOf<C, ()>,
weak_tx: MpscUnboundedWeakSenderOf<C, Replicate<C>>,
) {
let meta = snapshot.meta.clone();
Expand Down
8 changes: 4 additions & 4 deletions openraft/src/storage/callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

use std::io;

use tokio::sync::oneshot;

use crate::async_runtime::MpscUnboundedSender;
use crate::async_runtime::MpscUnboundedWeakSender;
use crate::core::notification::Notification;
use crate::type_config::alias::MpscUnboundedWeakSenderOf;
use crate::type_config::alias::OneshotSenderOf;
use crate::type_config::async_runtime::oneshot::OneshotSender;
use crate::ErrorSubject;
use crate::ErrorVerb;
use crate::LogId;
Expand Down Expand Up @@ -101,7 +101,7 @@ pub struct LogApplied<C>
where C: RaftTypeConfig
{
last_log_id: LogId<C::NodeId>,
tx: oneshot::Sender<Result<(LogId<C::NodeId>, Vec<C::R>), StorageError<C>>>,
tx: OneshotSenderOf<C, Result<(LogId<C::NodeId>, Vec<C::R>), StorageError<C>>>,
}

impl<C> LogApplied<C>
Expand All @@ -110,7 +110,7 @@ where C: RaftTypeConfig
#[allow(dead_code)]
pub(crate) fn new(
last_log_id: LogId<C::NodeId>,
tx: oneshot::Sender<Result<(LogId<C::NodeId>, Vec<C::R>), StorageError<C>>>,
tx: OneshotSenderOf<C, Result<(LogId<C::NodeId>, Vec<C::R>), StorageError<C>>>,
) -> Self {
Self { last_log_id, tx }
}
Expand Down

0 comments on commit 590d943

Please sign in to comment.