Skip to content

Commit

Permalink
Refactor: move snapshot receiving out of state machine worker
Browse files Browse the repository at this point in the history
Handling snapshot receiving is moved out of state-machine worker task.
Now it is in implemented outside the `RaftCore`.
Receiving snapshot could be totally application specific and should not
be part of Openraft.

The in sm-worker snapshot receiving is removed.

- Part of databendlabs#606
  • Loading branch information
drmingdrmer committed Feb 15, 2024
1 parent c805b29 commit 4170ad6
Show file tree
Hide file tree
Showing 18 changed files with 364 additions and 611 deletions.
30 changes: 2 additions & 28 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use crate::core::raft_msg::external_command::ExternalCommand;
use crate::core::raft_msg::AppendEntriesTx;
use crate::core::raft_msg::ClientReadTx;
use crate::core::raft_msg::ClientWriteTx;
use crate::core::raft_msg::InstallSnapshotTx;
use crate::core::raft_msg::RaftMsg;
use crate::core::raft_msg::ResultSender;
use crate::core::raft_msg::VoteTx;
Expand Down Expand Up @@ -69,7 +68,6 @@ use crate::quorum::QuorumSet;
use crate::raft::AppendEntriesRequest;
use crate::raft::AppendEntriesResponse;
use crate::raft::ClientWriteResponse;
use crate::raft::InstallSnapshotRequest;
use crate::raft::VoteRequest;
use crate::raft_state::LogStateReader;
use crate::replication;
Expand Down Expand Up @@ -607,21 +605,6 @@ where
});
}

/// Invoked by leader to send chunks of a snapshot to a follower.
///
/// Leaders always send chunks in order. It is important to note that, according to the Raft
/// spec, a node may only have one snapshot at any time. As snapshot contents are application
/// specific.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn handle_install_snapshot_request(
&mut self,
req: InstallSnapshotRequest<C>,
tx: InstallSnapshotTx<C::NodeId>,
) {
tracing::info!(req = display(req.summary()), "{}", func_name!());
self.engine.handle_install_snapshot(req, tx);
}

/// Trigger a snapshot building(log compaction) job if there is no pending building job.
#[tracing::instrument(level = "debug", skip(self))]
pub(crate) fn trigger_snapshot(&mut self) {
Expand Down Expand Up @@ -1125,14 +1108,8 @@ where

self.handle_vote_request(rpc, tx);
}
RaftMsg::InstallSnapshot { rpc, tx } => {
tracing::info!(
req = display(rpc.summary()),
"received RaftMst::InstallSnapshot: {}",
func_name!()
);

self.handle_install_snapshot_request(rpc, tx);
RaftMsg::BeginReceiveSnapshot { vote, tx } => {
self.engine.handle_begin_receiving_snapshot(vote, tx);
}
RaftMsg::InstallCompleteSnapshot { vote, snapshot, tx } => {
self.engine.handle_install_complete_snapshot(vote, snapshot, tx);
Expand Down Expand Up @@ -1375,9 +1352,6 @@ where
let st = self.engine.state.io_state_mut();
st.update_snapshot(last_log_id);
}
sm::Response::ReceiveSnapshotChunk(_) => {
tracing::info!("sm::StateMachine command done: ReceiveSnapshotChunk: {}", func_name!());
}
sm::Response::InstallSnapshot(meta) => {
tracing::info!(
"sm::StateMachine command done: InstallSnapshot: {}: {}",
Expand Down
26 changes: 14 additions & 12 deletions openraft/src/core/raft_msg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,20 @@ use tokio::sync::oneshot;
use crate::core::raft_msg::external_command::ExternalCommand;
use crate::error::CheckIsLeaderError;
use crate::error::ClientWriteError;
use crate::error::HigherVote;
use crate::error::Infallible;
use crate::error::InitializeError;
use crate::error::InstallSnapshotError;
use crate::raft::AppendEntriesRequest;
use crate::raft::AppendEntriesResponse;
use crate::raft::BoxCoreFn;
use crate::raft::ClientWriteResponse;
use crate::raft::InstallSnapshotRequest;
use crate::raft::InstallSnapshotResponse;
use crate::raft::VoteRequest;
use crate::raft::VoteResponse;
use crate::type_config::alias::LogIdOf;
use crate::type_config::alias::NodeIdOf;
use crate::type_config::alias::NodeOf;
use crate::type_config::alias::SnapshotDataOf;
use crate::ChangeMembers;
use crate::MessageSummary;
use crate::RaftTypeConfig;
Expand All @@ -32,9 +32,6 @@ pub(crate) type ResultSender<T, E = Infallible> = oneshot::Sender<Result<T, E>>;

pub(crate) type ResultReceiver<T, E = Infallible> = oneshot::Receiver<Result<T, E>>;

/// TX for Install Snapshot Response
pub(crate) type InstallSnapshotTx<NID> = ResultSender<InstallSnapshotResponse<NID>, InstallSnapshotError>;

/// TX for Vote Response
pub(crate) type VoteTx<NID> = ResultSender<VoteResponse<NID>>;

Expand Down Expand Up @@ -64,17 +61,22 @@ where C: RaftTypeConfig
tx: VoteTx<C::NodeId>,
},

InstallSnapshot {
rpc: InstallSnapshotRequest<C>,
tx: InstallSnapshotTx<C::NodeId>,
},

InstallCompleteSnapshot {
vote: Vote<C::NodeId>,
snapshot: Snapshot<C>,
tx: ResultSender<InstallSnapshotResponse<C::NodeId>>,
},

/// Begin receiving a snapshot from the leader.
///
/// Returns a handle to a snapshot data ready for receiving if successful.
/// Otherwise, it is an error because of the `vote` is not GE the local `vote`, the local `vote`
/// will be returned in a Err
BeginReceiveSnapshot {
vote: Vote<C::NodeId>,
tx: ResultSender<Box<SnapshotDataOf<C>>, HigherVote<C::NodeId>>,
},

ClientWriteRequest {
app_data: C::D,
tx: ClientWriteTx<C>,
Expand Down Expand Up @@ -119,8 +121,8 @@ where C: RaftTypeConfig
RaftMsg::RequestVote { rpc, .. } => {
format!("RequestVote: {}", rpc.summary())
}
RaftMsg::InstallSnapshot { rpc, .. } => {
format!("InstallSnapshot: {}", rpc.summary())
RaftMsg::BeginReceiveSnapshot { vote, .. } => {
format!("BeginReceiveSnapshot: vote: {}", vote)
}
RaftMsg::InstallCompleteSnapshot { vote, snapshot, .. } => {
format!("InstallCompleteSnapshot: vote: {}, snapshot: {}", vote, snapshot)
Expand Down
69 changes: 13 additions & 56 deletions openraft/src/core/sm/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ use std::fmt::Formatter;

use crate::core::raft_msg::ResultSender;
use crate::display_ext::DisplaySlice;
use crate::error::HigherVote;
use crate::log_id::RaftLogId;
use crate::raft::InstallSnapshotRequest;
use crate::MessageSummary;
use crate::type_config::alias::SnapshotDataOf;
use crate::RaftTypeConfig;
use crate::Snapshot;
use crate::SnapshotMeta;

#[derive(PartialEq)]
pub(crate) struct Command<C>
Expand Down Expand Up @@ -60,17 +59,8 @@ where C: RaftTypeConfig
Command::new(payload)
}

pub(crate) fn receive(req: InstallSnapshotRequest<C>) -> Self {
let payload = CommandPayload::ReceiveSnapshotChunk { req };
Command::new(payload)
}

// TODO: all sm command should have a command seq.
pub(crate) fn install_snapshot(snapshot_meta: SnapshotMeta<C::NodeId, C::Node>) -> Self {
let payload = CommandPayload::FinalizeSnapshot {
install: true,
snapshot_meta,
};
pub(crate) fn begin_receiving_snapshot(tx: ResultSender<Box<SnapshotDataOf<C>>, HigherVote<C::NodeId>>) -> Self {
let payload = CommandPayload::BeginReceivingSnapshot { tx };
Command::new(payload)
}

Expand All @@ -79,14 +69,6 @@ where C: RaftTypeConfig
Command::new(payload)
}

pub(crate) fn cancel_snapshot(snapshot_meta: SnapshotMeta<C::NodeId, C::Node>) -> Self {
let payload = CommandPayload::FinalizeSnapshot {
install: false,
snapshot_meta,
};
Command::new(payload)
}

pub(crate) fn apply(entries: Vec<C::Entry>) -> Self {
let payload = CommandPayload::Apply { entries };
Command::new(payload)
Expand All @@ -112,21 +94,8 @@ where C: RaftTypeConfig
tx: ResultSender<Option<Snapshot<C>>>,
},

/// Receive a chunk of snapshot.
///
/// If it is the final chunk, the snapshot stream will be closed and saved.
///
/// Installing a snapshot includes two steps: ReceiveSnapshotChunk and FinalizeSnapshot.
ReceiveSnapshotChunk {
req: InstallSnapshotRequest<C>,
},

/// After receiving all chunks, finalize the snapshot by installing it or discarding it,
/// if the snapshot is stale(the snapshot last log id is smaller than the local committed).
FinalizeSnapshot {
/// To install it, or just discard it.
install: bool,
snapshot_meta: SnapshotMeta<C::NodeId, C::Node>,
BeginReceivingSnapshot {
tx: ResultSender<Box<SnapshotDataOf<C>>, HigherVote<C::NodeId>>,
},

InstallCompleteSnapshot {
Expand All @@ -146,15 +115,12 @@ where C: RaftTypeConfig
match self {
CommandPayload::BuildSnapshot => write!(f, "BuildSnapshot"),
CommandPayload::GetSnapshot { .. } => write!(f, "GetSnapshot"),
CommandPayload::ReceiveSnapshotChunk { req, .. } => {
write!(f, "ReceiveSnapshotChunk: {}", req.summary())
}
CommandPayload::FinalizeSnapshot { install, snapshot_meta } => {
write!(f, "FinalizeSnapshot: install:{} {:?}", install, snapshot_meta)
}
CommandPayload::InstallCompleteSnapshot { snapshot } => {
write!(f, "InstallCompleteSnapshot: meta: {:?}", snapshot.meta)
}
CommandPayload::BeginReceivingSnapshot { .. } => {
write!(f, "BeginReceivingSnapshot")
}
CommandPayload::Apply { entries } => write!(f, "Apply: {}", DisplaySlice::<_>(entries)),
}
}
Expand All @@ -168,20 +134,11 @@ where C: RaftTypeConfig
match (self, other) {
(CommandPayload::BuildSnapshot, CommandPayload::BuildSnapshot) => true,
(CommandPayload::GetSnapshot { .. }, CommandPayload::GetSnapshot { .. }) => true,
(CommandPayload::BeginReceivingSnapshot { .. }, CommandPayload::BeginReceivingSnapshot { .. }) => true,
(
CommandPayload::ReceiveSnapshotChunk { req: req1, .. },
CommandPayload::ReceiveSnapshotChunk { req: req2, .. },
) => req1 == req2,
(
CommandPayload::FinalizeSnapshot {
install: install1,
snapshot_meta: meta1,
},
CommandPayload::FinalizeSnapshot {
install: install2,
snapshot_meta: meta2,
},
) => install1 == install2 && meta1 == meta2,
CommandPayload::InstallCompleteSnapshot { snapshot: s1 },
CommandPayload::InstallCompleteSnapshot { snapshot: s2 },
) => s1.meta == s2.meta,
(CommandPayload::Apply { entries: entries1 }, CommandPayload::Apply { entries: entries2 }) => {
// Entry may not be `Eq`, we just compare log id.
// This would be enough for testing.
Expand Down
Loading

0 comments on commit 4170ad6

Please sign in to comment.