Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: Remove request_id from replication #1231

Merged
merged 1 commit into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 9 additions & 50 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use crate::core::sm;
use crate::core::ServerState;
use crate::display_ext::DisplayInstantExt;
use crate::display_ext::DisplayOptionExt;
use crate::display_ext::DisplayResultExt;
use crate::display_ext::DisplaySlice;
use crate::display_ext::DisplaySliceExt;
use crate::engine::Command;
Expand Down Expand Up @@ -68,7 +67,6 @@ use crate::network::RPCOption;
use crate::network::RPCTypes;
use crate::network::RaftNetworkFactory;
use crate::progress::entry::ProgressEntry;
use crate::progress::Inflight;
use crate::progress::Progress;
use crate::quorum::QuorumSet;
use crate::raft::message::TransferLeaderRequest;
Expand All @@ -81,8 +79,6 @@ use crate::raft::VoteResponse;
use crate::raft_state::io_state::io_id::IOId;
use crate::raft_state::LogStateReader;
use crate::replication::request::Replicate;
use crate::replication::request_id::RequestId;
use crate::replication::response::ReplicationResult;
use crate::replication::ReplicationCore;
use crate::replication::ReplicationHandle;
use crate::replication::ReplicationSessionId;
Expand Down Expand Up @@ -1426,7 +1422,11 @@ where
// If vote or membership changes, ignore the message.
// There is chance delayed message reports a wrong state.
if self.does_replication_session_match(&progress.session_id, "ReplicationProgress") {
self.handle_replication_progress(progress.target, progress.request_id, progress.result);
tracing::debug!(progress = display(&progress), "recv Notification::ReplicationProgress");

// replication_handler() won't panic because:
// The leader is still valid because progress.session_id.leader_vote does not change.
self.engine.replication_handler().update_progress(progress.target, progress.result);
}
}

Expand All @@ -1442,9 +1442,9 @@ where
sending_time = display(sending_time.display()),
"HeartbeatProgress"
);
if self.engine.leader.is_some() {
self.engine.replication_handler().update_leader_clock(target, sending_time);
}
// replication_handler() won't panic because:
// The leader is still valid because progress.session_id.leader_vote does not change.
self.engine.replication_handler().update_leader_clock(target, sending_time);
}
}

Expand Down Expand Up @@ -1551,33 +1551,6 @@ where
self.engine.elect();
}

#[tracing::instrument(level = "debug", skip_all)]
fn handle_replication_progress(
&mut self,
target: C::NodeId,
request_id: u64,
result: Result<ReplicationResult<C>, String>,
) {
tracing::debug!(
target = display(target),
request_id = display(request_id),
result = display(result.display()),
"handle_replication_progress"
);

#[allow(clippy::collapsible_if)]
if tracing::enabled!(Level::DEBUG) {
if !self.replications.contains_key(&target) {
tracing::warn!("leader has removed target: {}", target);
};
}

// A leader may have stepped down.
if self.engine.leader.is_some() {
self.engine.replication_handler().update_progress(target, request_id, result);
}
}

/// If a message is sent by a previous server state but is received by current server state,
/// it is a stale message and should be just ignored.
fn does_vote_match(&self, sender_vote: &Vote<C::NodeId>, msg: impl fmt::Display) -> bool {
Expand Down Expand Up @@ -1790,21 +1763,7 @@ where
}
Command::Replicate { req, target } => {
let node = self.replications.get(&target).expect("replication to target node exists");

match req {
Inflight::None => {
let _ = node.tx_repl.send(Replicate::Heartbeat);
}
Inflight::Logs { id, log_id_range } => {
let _ = node.tx_repl.send(Replicate::logs(RequestId::new_append_entries(id), log_id_range));
}
Inflight::Snapshot { id, last_log_id } => {
// unwrap: The replication channel must not be dropped or it is a bug.
node.tx_repl.send(Replicate::snapshot(RequestId::new_snapshot(id), last_log_id)).map_err(
|_e| StorageError::read_snapshot(None, AnyError::error("replication channel closed")),
)?;
}
}
let _ = node.tx_repl.send(req);
}
Command::BroadcastTransferLeader { req } => self.broadcast_transfer_leader(req).await,

Expand Down
2 changes: 1 addition & 1 deletion openraft/src/docs/protocol/snapshot_replication.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ Installing snapshot includes two steps:
The final step is to purge logs up to [`snapshot_meta.last_log_id`].
This step is necessary because:

- 1) A local log that is <= [`snapshot_meta.last_log_id`] may conflict with the leader, and can not be used anymore.
- 1) A local log that is `<=` [`snapshot_meta.last_log_id`] may conflict with the leader, and can not be used anymore.

- 2) There may be a hole in the logs, if `snapshot_last_log_id > local_last_log_id`:

Expand Down
4 changes: 2 additions & 2 deletions openraft/src/engine/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ use crate::engine::CommandKind;
use crate::error::Infallible;
use crate::error::InitializeError;
use crate::error::InstallSnapshotError;
use crate::progress::Inflight;
use crate::raft::message::TransferLeaderRequest;
use crate::raft::AppendEntriesResponse;
use crate::raft::InstallSnapshotResponse;
use crate::raft::SnapshotResponse;
use crate::raft::VoteRequest;
use crate::raft::VoteResponse;
use crate::raft_state::IOId;
use crate::replication::request::Replicate;
use crate::replication::ReplicationSessionId;
use crate::type_config::alias::OneshotSenderOf;
use crate::vote::CommittedVote;
Expand Down Expand Up @@ -96,7 +96,7 @@ where C: RaftTypeConfig
},

/// Replicate log entries or snapshot to a target.
Replicate { target: C::NodeId, req: Inflight<C> },
Replicate { target: C::NodeId, req: Replicate<C> },

/// Broadcast transfer Leader message to all other nodes.
BroadcastTransferLeader { req: TransferLeaderRequest<C> },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ use crate::engine::Command;
use crate::engine::Engine;
use crate::engine::ReplicationProgress;
use crate::entry::RaftEntry;
use crate::log_id_range::LogIdRange;
use crate::progress::entry::ProgressEntry;
use crate::progress::Inflight;
use crate::raft_state::IOId;
use crate::raft_state::LogStateReader;
use crate::replication::request::Replicate;
use crate::testing::blank_ent;
use crate::testing::log_id;
use crate::type_config::TypeConfigExt;
Expand Down Expand Up @@ -153,11 +154,11 @@ fn test_leader_append_entries_normal() -> anyhow::Result<()> {
},
Command::Replicate {
target: 2,
req: Inflight::logs(None, Some(log_id(3, 1, 6))).with_id(1),
req: Replicate::logs(LogIdRange::new(None, Some(log_id(3, 1, 6)))),
},
Command::Replicate {
target: 3,
req: Inflight::logs(None, Some(log_id(3, 1, 6))).with_id(1),
req: Replicate::logs(LogIdRange::new(None, Some(log_id(3, 1, 6)))),
},
],
eng.output.take_commands()
Expand Down Expand Up @@ -285,7 +286,7 @@ fn test_leader_append_entries_with_membership_log() -> anyhow::Result<()> {
},
Command::Replicate {
target: 2,
req: Inflight::logs(None, Some(log_id(3, 1, 6))).with_id(1),
req: Replicate::logs(LogIdRange::new(None, Some(log_id(3, 1, 6))))
},
],
eng.output.take_commands()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,32 +155,27 @@ fn test_leader_append_membership_update_learner_process() -> anyhow::Result<()>
if let Some(l) = &mut eng.leader.as_mut() {
assert_eq!(
&ProgressEntry::new(Some(log_id(1, 1, 4)))
.with_inflight(Inflight::logs(Some(log_id(1, 1, 4)), Some(log_id(5, 1, 10))).with_id(1))
.with_curr_inflight_id(1),
.with_inflight(Inflight::logs(Some(log_id(1, 1, 4)), Some(log_id(5, 1, 10)))),
l.progress.get(&4),
"learner-4 progress should be transferred to voter progress"
);

assert_eq!(
&ProgressEntry::new(Some(log_id(1, 1, 3)))
.with_inflight(Inflight::logs(Some(log_id(1, 1, 3)), Some(log_id(5, 1, 10))).with_id(1))
.with_curr_inflight_id(1),
.with_inflight(Inflight::logs(Some(log_id(1, 1, 3)), Some(log_id(5, 1, 10)))),
l.progress.get(&3),
"voter-3 progress should be transferred to learner progress"
);

assert_eq!(
&ProgressEntry::new(Some(log_id(1, 1, 5)))
.with_inflight(Inflight::logs(Some(log_id(1, 1, 5)), Some(log_id(5, 1, 10))).with_id(1))
.with_curr_inflight_id(1),
.with_inflight(Inflight::logs(Some(log_id(1, 1, 5)), Some(log_id(5, 1, 10)))),
l.progress.get(&5),
"learner-5 has previous value"
);

assert_eq!(
&ProgressEntry::empty(11)
.with_inflight(Inflight::logs(None, Some(log_id(5, 1, 10))).with_id(1))
.with_curr_inflight_id(1),
&ProgressEntry::empty(11).with_inflight(Inflight::logs(None, Some(log_id(5, 1, 10)))),
l.progress.get(&6)
);
} else {
Expand Down
63 changes: 16 additions & 47 deletions openraft/src/engine/handler/replication_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::progress::Progress;
use crate::proposer::Leader;
use crate::proposer::LeaderQuorumSet;
use crate::raft_state::LogStateReader;
use crate::replication::request::Replicate;
use crate::replication::response::ReplicationResult;
use crate::type_config::alias::InstantOf;
use crate::EffectiveMembership;
Expand Down Expand Up @@ -145,10 +146,9 @@ where C: RaftTypeConfig
/// Update progress when replicated data(logs or snapshot) matches on follower/learner and is
/// accepted.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn update_matching(&mut self, node_id: C::NodeId, inflight_id: u64, log_id: Option<LogId<C::NodeId>>) {
pub(crate) fn update_matching(&mut self, node_id: C::NodeId, log_id: Option<LogId<C::NodeId>>) {
tracing::debug!(
node_id = display(node_id),
inflight_id = display(inflight_id),
log_id = display(log_id.display()),
"{}",
func_name!()
Expand All @@ -161,13 +161,7 @@ where C: RaftTypeConfig
let quorum_accepted = *self
.leader
.progress
.update_with(&node_id, |prog_entry| {
let res = prog_entry.update_matching(inflight_id, log_id);
if let Err(e) = &res {
tracing::error!(error = display(e), "update_matching");
panic!("update_matching error: {}", e);
}
})
.update_with(&node_id, |prog_entry| prog_entry.update_matching(log_id))
.expect("it should always update existing progress");

tracing::debug!(
Expand Down Expand Up @@ -213,33 +207,19 @@ where C: RaftTypeConfig
/// Update progress when replicated data(logs or snapshot) does not match follower/learner state
/// and is rejected.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn update_conflicting(&mut self, target: C::NodeId, inflight_id: u64, conflict: LogId<C::NodeId>) {
pub(crate) fn update_conflicting(&mut self, target: C::NodeId, conflict: LogId<C::NodeId>) {
// TODO(2): test it?

let prog_entry = self.leader.progress.get_mut(&target).unwrap();

debug_assert_eq!(
prog_entry.inflight.get_id(),
Some(inflight_id),
"inflight({:?}) id should match: {}",
prog_entry.inflight,
inflight_id
);

prog_entry.update_conflicting(inflight_id, conflict.index).unwrap();
prog_entry.update_conflicting(conflict.index);
}

/// Update replication progress when a response is received.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn update_progress(
&mut self,
target: C::NodeId,
request_id: u64,
repl_res: Result<ReplicationResult<C>, String>,
) {
pub(crate) fn update_progress(&mut self, target: C::NodeId, repl_res: Result<ReplicationResult<C>, String>) {
tracing::debug!(
target = display(target),
request_id = display(request_id),
result = display(repl_res.display()),
progress = display(&self.leader.progress),
"{}",
Expand All @@ -249,29 +229,17 @@ where C: RaftTypeConfig
match repl_res {
Ok(p) => match p.0 {
Ok(matching) => {
self.update_matching(target, request_id, matching);
self.update_matching(target, matching);
}
Err(conflict) => {
self.update_conflicting(target, request_id, conflict);
self.update_conflicting(target, conflict);
}
},
Err(err_str) => {
tracing::warn!(
request_id = display(request_id),
result = display(&err_str),
"update progress error"
);
tracing::warn!(result = display(&err_str), "update progress error");

// Reset inflight state and it will retry.
let p = self.leader.progress.get_mut(&target).unwrap();

debug_assert!(
p.inflight.is_my_id(request_id),
"inflight({:?}) id should match: {}",
p.inflight,
request_id
);

p.inflight = Inflight::None;
}
};
Expand Down Expand Up @@ -328,10 +296,12 @@ where C: RaftTypeConfig

#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn send_to_target(output: &mut EngineOutput<C>, target: &C::NodeId, inflight: &Inflight<C>) {
output.push_command(Command::Replicate {
target: *target,
req: *inflight,
});
let req = match inflight {
Inflight::None => unreachable!("no data to send"),
Inflight::Logs { log_id_range } => Replicate::logs(*log_id_range),
Inflight::Snapshot { last_log_id } => Replicate::snapshot(*last_log_id),
};
output.push_command(Command::Replicate { target: *target, req });
}

/// Try to run a pending purge job, if no tasks are using the logs to be purged.
Expand Down Expand Up @@ -403,8 +373,7 @@ where C: RaftTypeConfig
// TODO: It should be self.state.last_log_id() but None is ok.
prog_entry.inflight = Inflight::logs(None, upto);

let inflight_id = prog_entry.inflight.get_id().unwrap();
self.update_matching(id, inflight_id, upto);
self.update_matching(id, upto);
}
}

Expand Down
Loading
Loading