Skip to content

Commit

Permalink
Refactor: call_core() should be a method of RaftInner instead of a me…
Browse files Browse the repository at this point in the history
…thod of Raft
  • Loading branch information
drmingdrmer committed Feb 14, 2024
1 parent c958647 commit c805b29
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 53 deletions.
73 changes: 20 additions & 53 deletions openraft/src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ where C: RaftTypeConfig
tracing::debug!(rpc = display(rpc.summary()), "Raft::append_entries");

let (tx, rx) = oneshot::channel();
self.call_core(RaftMsg::AppendEntries { rpc, tx }, rx).await
self.inner.call_core(RaftMsg::AppendEntries { rpc, tx }, rx).await
}

/// Submit a VoteRequest (RequestVote in the spec) RPC to this Raft node.
Expand All @@ -346,7 +346,7 @@ where C: RaftTypeConfig
tracing::info!(rpc = display(rpc.summary()), "Raft::vote()");

let (tx, rx) = oneshot::channel();
self.call_core(RaftMsg::RequestVote { rpc, tx }, rx).await
self.inner.call_core(RaftMsg::RequestVote { rpc, tx }, rx).await
}

/// Get the latest snapshot from the state machine.
Expand All @@ -359,7 +359,7 @@ where C: RaftTypeConfig

let (tx, rx) = oneshot::channel();
let cmd = ExternalCommand::GetSnapshot { tx };
self.call_core(RaftMsg::ExternalCommand { cmd }, rx).await
self.inner.call_core(RaftMsg::ExternalCommand { cmd }, rx).await
}

/// Install a completely received snapshot to the state machine.
Expand All @@ -376,7 +376,7 @@ where C: RaftTypeConfig
tracing::debug!("Raft::install_complete_snapshot()");

let (tx, rx) = oneshot::channel();
self.call_core(RaftMsg::InstallCompleteSnapshot { vote, snapshot, tx }, rx).await
self.inner.call_core(RaftMsg::InstallCompleteSnapshot { vote, snapshot, tx }, rx).await
}

/// Submit an InstallSnapshot RPC to this Raft node.
Expand All @@ -391,7 +391,7 @@ where C: RaftTypeConfig
tracing::debug!(rpc = display(rpc.summary()), "Raft::install_snapshot()");

let (tx, rx) = oneshot::channel();
self.call_core(RaftMsg::InstallSnapshot { rpc, tx }, rx).await
self.inner.call_core(RaftMsg::InstallSnapshot { rpc, tx }, rx).await
}

/// Get the ID of the current leader from this Raft node.
Expand All @@ -413,7 +413,7 @@ where C: RaftTypeConfig
#[tracing::instrument(level = "debug", skip(self))]
pub async fn is_leader(&self) -> Result<(), RaftError<C::NodeId, CheckIsLeaderError<C::NodeId, C::Node>>> {
let (tx, rx) = oneshot::channel();
let _ = self.call_core(RaftMsg::CheckIsLeaderRequest { tx }, rx).await?;
let _ = self.inner.call_core(RaftMsg::CheckIsLeaderRequest { tx }, rx).await?;
Ok(())
}

Expand Down Expand Up @@ -497,7 +497,7 @@ where C: RaftTypeConfig
RaftError<C::NodeId, CheckIsLeaderError<C::NodeId, C::Node>>,
> {
let (tx, rx) = oneshot::channel();
let (read_log_id, applied) = self.call_core(RaftMsg::CheckIsLeaderRequest { tx }, rx).await?;
let (read_log_id, applied) = self.inner.call_core(RaftMsg::CheckIsLeaderRequest { tx }, rx).await?;
Ok((read_log_id, applied))
}

Expand Down Expand Up @@ -525,7 +525,7 @@ where C: RaftTypeConfig
app_data: C::D,
) -> Result<ClientWriteResponse<C>, RaftError<C::NodeId, ClientWriteError<C::NodeId, C::Node>>> {
let (tx, rx) = oneshot::channel();
self.call_core(RaftMsg::ClientWriteRequest { app_data, tx }, rx).await
self.inner.call_core(RaftMsg::ClientWriteRequest { app_data, tx }, rx).await
}

/// Initialize a pristine Raft node with the given config.
Expand Down Expand Up @@ -558,14 +558,15 @@ where C: RaftTypeConfig
T: IntoNodes<C::NodeId, C::Node> + Debug,
{
let (tx, rx) = oneshot::channel();
self.call_core(
RaftMsg::Initialize {
members: members.into_nodes(),
tx,
},
rx,
)
.await
self.inner
.call_core(
RaftMsg::Initialize {
members: members.into_nodes(),
tx,
},
rx,
)
.await
}

/// Add a new learner raft node, optionally, blocking until up-to-speed.
Expand Down Expand Up @@ -593,6 +594,7 @@ where C: RaftTypeConfig
) -> Result<ClientWriteResponse<C>, RaftError<C::NodeId, ClientWriteError<C::NodeId, C::Node>>> {
let (tx, rx) = oneshot::channel();
let resp = self
.inner
.call_core(
RaftMsg::ChangeMembership {
changes: ChangeMembers::AddNodes(btreemap! {id=>node}),
Expand Down Expand Up @@ -724,6 +726,7 @@ where C: RaftTypeConfig
// res is error if membership can not be changed.
// If no error, it will enter a joint state
let res = self
.inner
.call_core(
RaftMsg::ChangeMembership {
changes: changes.clone(),
Expand Down Expand Up @@ -751,7 +754,7 @@ where C: RaftTypeConfig
tracing::debug!("the second step is to change to uniform config: {:?}", changes);

let (tx, rx) = oneshot::channel();
let res = self.call_core(RaftMsg::ChangeMembership { changes, retain, tx }, rx).await;
let res = self.inner.call_core(RaftMsg::ChangeMembership { changes, retain, tx }, rx).await;

if let Err(e) = &res {
tracing::error!("the second step error: {}", e);
Expand All @@ -763,42 +766,6 @@ where C: RaftTypeConfig
Ok(res)
}

/// Invoke RaftCore by sending a RaftMsg and blocks waiting for response.
#[tracing::instrument(level = "debug", skip(self, mes, rx))]
pub(crate) async fn call_core<T, E>(
&self,
mes: RaftMsg<C>,
rx: oneshot::Receiver<Result<T, E>>,
) -> Result<T, RaftError<C::NodeId, E>>
where
E: Debug,
{
let sum = if tracing::enabled!(Level::DEBUG) {
Some(mes.summary())
} else {
None
};

let send_res = self.inner.tx_api.send(mes);

if send_res.is_err() {
let fatal = self.inner.get_core_stopped_error("sending tx to RaftCore", sum).await;
return Err(RaftError::Fatal(fatal));
}

let recv_res = rx.await;
tracing::debug!("call_core receives result is error: {:?}", recv_res.is_err());

match recv_res {
Ok(x) => x.map_err(|e| RaftError::APIError(e)),
Err(_) => {
let fatal = self.inner.get_core_stopped_error("receiving rx from RaftCore", sum).await;
tracing::error!(error = debug(&fatal), "core_call fatal error");
Err(RaftError::Fatal(fatal))
}
}
}

/// Provides read-only access to [`RaftState`] through a user-provided function.
///
/// The function `func` is applied to the current [`RaftState`]. The result of this function,
Expand Down
40 changes: 40 additions & 0 deletions openraft/src/raft/raft_inner.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
use std::fmt;
use std::fmt::Debug;
use std::sync::Arc;

use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::sync::watch;
use tokio::sync::Mutex;
use tracing::Level;

use crate::config::RuntimeConfig;
use crate::core::raft_msg::external_command::ExternalCommand;
use crate::core::raft_msg::RaftMsg;
use crate::core::TickHandle;
use crate::error::Fatal;
use crate::error::RaftError;
use crate::metrics::RaftDataMetrics;
use crate::metrics::RaftServerMetrics;
use crate::raft::core_state::CoreState;
use crate::AsyncRuntime;
use crate::Config;
use crate::MessageSummary;
use crate::RaftMetrics;
use crate::RaftTypeConfig;

Expand All @@ -42,6 +46,42 @@ where C: RaftTypeConfig
impl<C> RaftInner<C>
where C: RaftTypeConfig
{
/// Invoke RaftCore by sending a RaftMsg and blocks waiting for response.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) async fn call_core<T, E>(
&self,
mes: RaftMsg<C>,
rx: oneshot::Receiver<Result<T, E>>,
) -> Result<T, RaftError<C::NodeId, E>>
where
E: Debug,
{
let sum = if tracing::enabled!(Level::DEBUG) {
Some(mes.summary())
} else {
None
};

let send_res = self.tx_api.send(mes);

if send_res.is_err() {
let fatal = self.get_core_stopped_error("sending tx to RaftCore", sum).await;
return Err(RaftError::Fatal(fatal));
}

let recv_res = rx.await;
tracing::debug!("call_core receives result is error: {:?}", recv_res.is_err());

match recv_res {
Ok(x) => x.map_err(|e| RaftError::APIError(e)),
Err(_) => {
let fatal = self.get_core_stopped_error("receiving rx from RaftCore", sum).await;
tracing::error!(error = debug(&fatal), "core_call fatal error");
Err(RaftError::Fatal(fatal))
}
}
}

/// Send an [`ExternalCommand`] to RaftCore to execute in the `RaftCore` thread.
///
/// It returns at once.
Expand Down

0 comments on commit c805b29

Please sign in to comment.