Skip to content

Commit

Permalink
Change: remove N, LS from Raft<C, N, LS, _>
Browse files Browse the repository at this point in the history
- `Raft<C, ..>`: is a control handle of `RaftCore` and it does not directly
  rely on `N: RaftNetworkFactory` and `LS: RaftLogStorage`.
  Thus these two type should not be part of `Raft`.

  In this commit, we remove `N, LS` from `Raft<C, N, LS, SM>`,
  `RaftInner<C, N, LS>` and `RaftMsg<C, N, LS>`.
  Type `N, LS` now is only used by `Raft::new()` which needs these two
  types to create `RaftCore`.

- `Raft::external_request()`: Another change is the signature of the
  `Fn` passed to `Raft::external_request()` changes from
  `FnOnce(&RaftState, &mut LS, &mut N)` to `FnOnce(&RaftState)`.

- Fix: the `FnOnce` passed to `Raft::external_request()` should always
  be `Send`, unoptionally. Because it is used to send from `Raft` to
  `RaftCore`.

- Fix: #939
  • Loading branch information
drmingdrmer committed Nov 20, 2023
1 parent 4ad89fe commit a034382
Show file tree
Hide file tree
Showing 18 changed files with 95 additions and 121 deletions.
2 changes: 1 addition & 1 deletion cluster_benchmark/tests/benchmark/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::store::NodeId;
use crate::store::StateMachineStore;
use crate::store::TypeConfig as MemConfig;

pub type BenchRaft = Raft<MemConfig, Router, Arc<LogStore>, Arc<StateMachineStore>>;
pub type BenchRaft = Raft<MemConfig, Arc<StateMachineStore>>;

#[derive(Clone)]
pub struct Router {
Expand Down
2 changes: 1 addition & 1 deletion examples/raft-kv-memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ openraft::declare_raft_types!(

pub type LogStore = Adaptor<TypeConfig, Arc<Store>>;
pub type StateMachineStore = Adaptor<TypeConfig, Arc<Store>>;
pub type Raft = openraft::Raft<TypeConfig, Network, LogStore, StateMachineStore>;
pub type Raft = openraft::Raft<TypeConfig, StateMachineStore>;

pub mod typ {
use openraft::BasicNode;
Expand Down
2 changes: 1 addition & 1 deletion examples/raft-kv-rocksdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ openraft::declare_raft_types!(

pub type LogStore = Adaptor<TypeConfig, Arc<Store>>;
pub type StateMachineStore = Adaptor<TypeConfig, Arc<Store>>;
pub type ExampleRaft = openraft::Raft<TypeConfig, Network, LogStore, StateMachineStore>;
pub type ExampleRaft = openraft::Raft<TypeConfig, StateMachineStore>;

type Server = tide::Server<Arc<App>>;

Expand Down
8 changes: 4 additions & 4 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,8 @@ where
pub(crate) leader_data: Option<LeaderData<C>>,

#[allow(dead_code)]
pub(crate) tx_api: mpsc::UnboundedSender<RaftMsg<C, N, LS>>,
pub(crate) rx_api: mpsc::UnboundedReceiver<RaftMsg<C, N, LS>>,
pub(crate) tx_api: mpsc::UnboundedSender<RaftMsg<C>>,
pub(crate) rx_api: mpsc::UnboundedReceiver<RaftMsg<C>>,

/// A Sender to send callback by other components to [`RaftCore`], when an action is finished,
/// such as flushing log to disk, or applying log entries to state machine.
Expand Down Expand Up @@ -1063,7 +1063,7 @@ where

// TODO: Make this method non-async. It does not need to run any async command in it.
#[tracing::instrument(level = "debug", skip(self, msg), fields(state = debug(self.engine.state.server_state), id=display(self.id)))]
pub(crate) async fn handle_api_msg(&mut self, msg: RaftMsg<C, N, LS>) {
pub(crate) async fn handle_api_msg(&mut self, msg: RaftMsg<C>) {
tracing::debug!("recv from rx_api: {}", msg.summary());

match msg {
Expand Down Expand Up @@ -1120,7 +1120,7 @@ where
self.change_membership(changes, retain, tx);
}
RaftMsg::ExternalRequest { req } => {
req(&self.engine.state, &mut self.log_store, &mut self.network);
req(&self.engine.state);
}
RaftMsg::ExternalCommand { cmd } => {
tracing::info!(cmd = debug(&cmd), "received RaftMsg::ExternalCommand: {}", func_name!());
Expand Down
32 changes: 6 additions & 26 deletions openraft/src/core/raft_msg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,14 @@ use crate::error::InitializeError;
use crate::error::InstallSnapshotError;
use crate::raft::AppendEntriesRequest;
use crate::raft::AppendEntriesResponse;
use crate::raft::BoxCoreFn;
use crate::raft::ClientWriteResponse;
use crate::raft::InstallSnapshotRequest;
use crate::raft::InstallSnapshotResponse;
use crate::raft::VoteRequest;
use crate::raft::VoteResponse;
use crate::storage::RaftLogStorage;
use crate::AsyncRuntime;
use crate::ChangeMembers;
use crate::MessageSummary;
use crate::RaftNetworkFactory;
use crate::RaftState;
use crate::RaftTypeConfig;

pub(crate) mod external_command;
Expand All @@ -44,11 +41,8 @@ pub(crate) type ClientWriteTx<C> =
/// A message sent by application to the [`RaftCore`].
///
/// [`RaftCore`]: crate::core::RaftCore
pub(crate) enum RaftMsg<C, N, LS>
where
C: RaftTypeConfig,
N: RaftNetworkFactory<C>,
LS: RaftLogStorage<C>,
pub(crate) enum RaftMsg<C>
where C: RaftTypeConfig
{
AppendEntries {
rpc: AppendEntriesRequest<C>,
Expand Down Expand Up @@ -89,31 +83,17 @@ where
tx: ResultSender<ClientWriteResponse<C>, ClientWriteError<C::NodeId, C::Node>>,
},

#[allow(clippy::type_complexity)]
ExternalRequest {
#[cfg(not(feature = "singlethreaded"))]
req: Box<
dyn FnOnce(&RaftState<C::NodeId, C::Node, <C::AsyncRuntime as AsyncRuntime>::Instant>, &mut LS, &mut N)
+ Send
+ 'static,
>,
#[cfg(feature = "singlethreaded")]
req: Box<
dyn FnOnce(&RaftState<C::NodeId, C::Node, <C::AsyncRuntime as AsyncRuntime>::Instant>, &mut LS, &mut N)
+ 'static,
>,
req: BoxCoreFn<C>,
},

ExternalCommand {
cmd: ExternalCommand,
},
}

impl<C, N, LS> MessageSummary<RaftMsg<C, N, LS>> for RaftMsg<C, N, LS>
where
C: RaftTypeConfig,
N: RaftNetworkFactory<C>,
LS: RaftLogStorage<C>,
impl<C> MessageSummary<RaftMsg<C>> for RaftMsg<C>
where C: RaftTypeConfig
{
fn summary(&self) -> String {
match self {
Expand Down
9 changes: 9 additions & 0 deletions openraft/src/raft/external_request.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
//! Defines API for application to send request to access Raft core.

use crate::type_config::alias::InstantOf;
use crate::type_config::alias::NodeIdOf;
use crate::type_config::alias::NodeOf;
use crate::RaftState;

/// Boxed trait object for external request function run in `RaftCore` task.
pub(crate) type BoxCoreFn<C> = Box<dyn FnOnce(&RaftState<NodeIdOf<C>, NodeOf<C>, InstantOf<C>>) + Send + 'static>;
53 changes: 22 additions & 31 deletions openraft/src/raft/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
//! Public Raft interface and data types.

mod external_request;
mod message;
mod raft_inner;
mod runtime_config_handle;
mod trigger;

pub(crate) use self::external_request::BoxCoreFn;

pub(in crate::raft) mod core_state;

use std::fmt::Debug;
Expand Down Expand Up @@ -59,7 +62,6 @@ use crate::ChangeMembers;
use crate::LogId;
use crate::LogIdOptionExt;
use crate::MessageSummary;
use crate::OptionalSend;
use crate::RaftState;
pub use crate::RaftTypeConfig;
use crate::StorageHelper;
Expand Down Expand Up @@ -124,22 +126,18 @@ macro_rules! declare_raft_types {
/// `shutdown` method should be called on this type to await the shutdown of the node. If the parent
/// application needs to shutdown the Raft node for any reason, calling `shutdown` will do the
/// trick.
pub struct Raft<C, N, LS, SM>
pub struct Raft<C, SM>
where
C: RaftTypeConfig,
N: RaftNetworkFactory<C>,
LS: RaftLogStorage<C>,
SM: RaftStateMachine<C>,
{
inner: Arc<RaftInner<C, N, LS>>,
inner: Arc<RaftInner<C>>,
_phantom: PhantomData<SM>,
}

impl<C, N, LS, SM> Clone for Raft<C, N, LS, SM>
impl<C, SM> Clone for Raft<C, SM>
where
C: RaftTypeConfig,
N: RaftNetworkFactory<C>,
LS: RaftLogStorage<C>,
SM: RaftStateMachine<C>,
{
fn clone(&self) -> Self {
Expand All @@ -159,11 +157,9 @@ where
//
// Notably, the state machine, log storage and network factory DO NOT have to be `Send`, those
// are only used within Raft task(s) on a single thread.
unsafe impl<C, N, LS, SM> Send for Raft<C, N, LS, SM>
unsafe impl<C, SM> Send for Raft<C, SM>
where
C: RaftTypeConfig,
N: RaftNetworkFactory<C>,
LS: RaftLogStorage<C>,
SM: RaftStateMachine<C>,
C::D: Send,
C::Entry: Send,
Expand All @@ -177,11 +173,9 @@ where
// SAFETY: Even for a single-threaded Raft, the API object is MT-capable.
//
// See above for details.
unsafe impl<C, N, LS, SM> Sync for Raft<C, N, LS, SM>
unsafe impl<C, SM> Sync for Raft<C, SM>
where
C: RaftTypeConfig + Send,
N: RaftNetworkFactory<C>,
LS: RaftLogStorage<C>,
SM: RaftStateMachine<C>,
C::D: Send,
C::Entry: Send,
Expand All @@ -191,11 +185,9 @@ where
{
}

impl<C, N, LS, SM> Raft<C, N, LS, SM>
impl<C, SM> Raft<C, SM>
where
C: RaftTypeConfig,
N: RaftNetworkFactory<C>,
LS: RaftLogStorage<C>,
SM: RaftStateMachine<C>,
{
/// Create and spawn a new Raft task.
Expand All @@ -218,13 +210,17 @@ where
/// An implementation of the `RaftStorage` trait which will be used by Raft for data storage.
/// See the docs on the `RaftStorage` trait for more details.
#[tracing::instrument(level="debug", skip_all, fields(cluster=%config.cluster_name))]
pub async fn new(
pub async fn new<LS, N>(
id: C::NodeId,
config: Arc<Config>,
network: N,
mut log_store: LS,
mut state_machine: SM,
) -> Result<Self, Fatal<C::NodeId>> {
) -> Result<Self, Fatal<C::NodeId>>
where
N: RaftNetworkFactory<C>,
LS: RaftLogStorage<C>,
{
let (tx_api, rx_api) = mpsc::unbounded_channel();
let (tx_notify, rx_notify) = mpsc::unbounded_channel();
let (tx_metrics, rx_metrics) = watch::channel(RaftMetrics::new_initial(id));
Expand Down Expand Up @@ -310,7 +306,7 @@ where
/// let raft = Raft::new(...).await?;
/// raft.runtime_config().heartbeat(true);
/// ```
pub fn runtime_config(&self) -> RuntimeConfigHandle<C, N, LS> {
pub fn runtime_config(&self) -> RuntimeConfigHandle<C> {
RuntimeConfigHandle::new(self.inner.as_ref())
}

Expand All @@ -337,7 +333,7 @@ where
/// let raft = Raft::new(...).await?;
/// raft.trigger().elect().await?;
/// ```
pub fn trigger(&self) -> Trigger<C, N, LS> {
pub fn trigger(&self) -> Trigger<C> {
Trigger::new(self.inner.as_ref())
}

Expand Down Expand Up @@ -694,7 +690,7 @@ where
#[tracing::instrument(level = "debug", skip(self, mes, rx))]
pub(crate) async fn call_core<T, E>(
&self,
mes: RaftMsg<C, N, LS>,
mes: RaftMsg<C>,
rx: oneshot::Receiver<Result<T, E>>,
) -> Result<T, RaftError<C::NodeId, E>>
where
Expand Down Expand Up @@ -738,15 +734,10 @@ where
///
/// If the API channel is already closed (Raft is in shutdown), then the request functor is
/// destroyed right away and not called at all.
pub fn external_request<
F: FnOnce(&RaftState<C::NodeId, C::Node, <C::AsyncRuntime as AsyncRuntime>::Instant>, &mut LS, &mut N)
+ OptionalSend
+ 'static,
>(
&self,
req: F,
) {
let _ignore_error = self.inner.tx_api.send(RaftMsg::ExternalRequest { req: Box::new(req) });
pub fn external_request<F>(&self, req: F)
where F: FnOnce(&RaftState<C::NodeId, C::Node, <C::AsyncRuntime as AsyncRuntime>::Instant>) + Send + 'static {
let req: BoxCoreFn<C> = Box::new(req);
let _ignore_error = self.inner.tx_api.send(RaftMsg::ExternalRequest { req });
}

/// Get a handle to the metrics channel.
Expand Down
18 changes: 5 additions & 13 deletions openraft/src/raft/raft_inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,21 @@ use crate::core::raft_msg::RaftMsg;
use crate::core::TickHandle;
use crate::error::Fatal;
use crate::raft::core_state::CoreState;
use crate::storage::RaftLogStorage;
use crate::AsyncRuntime;
use crate::Config;
use crate::RaftMetrics;
use crate::RaftNetworkFactory;
use crate::RaftTypeConfig;

/// RaftInner is the internal handle and provides internally used APIs to communicate with
/// `RaftCore`.
pub(in crate::raft) struct RaftInner<C, N, LS>
where
C: RaftTypeConfig,
N: RaftNetworkFactory<C>,
LS: RaftLogStorage<C>,
pub(in crate::raft) struct RaftInner<C>
where C: RaftTypeConfig
{
pub(in crate::raft) id: C::NodeId,
pub(in crate::raft) config: Arc<Config>,
pub(in crate::raft) runtime_config: Arc<RuntimeConfig>,
pub(in crate::raft) tick_handle: TickHandle<C>,
pub(in crate::raft) tx_api: mpsc::UnboundedSender<RaftMsg<C, N, LS>>,
pub(in crate::raft) tx_api: mpsc::UnboundedSender<RaftMsg<C>>,
pub(in crate::raft) rx_metrics: watch::Receiver<RaftMetrics<C::NodeId, C::Node>>,

// TODO(xp): it does not need to be a async mutex.
Expand All @@ -40,11 +35,8 @@ where
pub(in crate::raft) core_state: Mutex<CoreState<C::NodeId, C::AsyncRuntime>>,
}

impl<C, N, LS> RaftInner<C, N, LS>
where
C: RaftTypeConfig,
N: RaftNetworkFactory<C>,
LS: RaftLogStorage<C>,
impl<C> RaftInner<C>
where C: RaftTypeConfig
{
/// Send an [`ExternalCommand`] to RaftCore to execute in the `RaftCore` thread.
///
Expand Down
20 changes: 6 additions & 14 deletions openraft/src/raft/runtime_config_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,22 @@
use std::sync::atomic::Ordering;

use crate::raft::RaftInner;
use crate::storage::RaftLogStorage;
use crate::RaftNetworkFactory;
use crate::RaftTypeConfig;

/// RuntimeConfigHandle is an interface to update runtime config.
///
/// These config are mainly designed for testing purpose and special use cases.
/// Usually you don't need to change runtime config.
pub struct RuntimeConfigHandle<'r, C, N, LS>
where
C: RaftTypeConfig,
N: RaftNetworkFactory<C>,
LS: RaftLogStorage<C>,
pub struct RuntimeConfigHandle<'r, C>
where C: RaftTypeConfig
{
raft_inner: &'r RaftInner<C, N, LS>,
raft_inner: &'r RaftInner<C>,
}

impl<'r, C, N, LS> RuntimeConfigHandle<'r, C, N, LS>
where
C: RaftTypeConfig,
N: RaftNetworkFactory<C>,
LS: RaftLogStorage<C>,
impl<'r, C> RuntimeConfigHandle<'r, C>
where C: RaftTypeConfig
{
pub(in crate::raft) fn new(raft_inner: &'r RaftInner<C, N, LS>) -> Self {
pub(in crate::raft) fn new(raft_inner: &'r RaftInner<C>) -> Self {
Self { raft_inner }
}

Expand Down
Loading

0 comments on commit a034382

Please sign in to comment.