From 51074394a56fe11dd3cffc95617c99e24844e2fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Tue, 13 Aug 2024 10:36:03 +0800 Subject: [PATCH] Refactor: Remove `request_id` from replication The `request_id` was used to track request/response pairs between `RaftCore` and `ReplicationCore` in a one-request, one-response mode. To support streaming replication, which requires a one-request, many-response mode, `request_id` has been removed in this commit. --- openraft/src/core/raft_core.rs | 59 ++------- .../src/docs/protocol/snapshot_replication.md | 2 +- openraft/src/engine/command.rs | 4 +- .../leader_handler/append_entries_test.rs | 9 +- .../append_membership_test.rs | 13 +- .../engine/handler/replication_handler/mod.rs | 63 +++------ .../update_matching_test.rs | 19 ++- .../vote_handler/become_leader_test.rs | 5 +- .../src/engine/tests/handle_vote_resp_test.rs | 5 +- openraft/src/engine/tests/startup_test.rs | 8 +- openraft/src/progress/entry/mod.rs | 41 +----- openraft/src/progress/entry/tests.rs | 30 ++--- openraft/src/progress/inflight/mod.rs | 107 ++------------- openraft/src/progress/inflight/tests.rs | 63 ++------- openraft/src/replication/callbacks.rs | 1 + openraft/src/replication/mod.rs | 125 +++++++----------- openraft/src/replication/request.rs | 109 ++++----------- openraft/src/replication/request_id.rs | 43 ------ openraft/src/replication/response.rs | 6 +- 19 files changed, 180 insertions(+), 532 deletions(-) delete mode 100644 openraft/src/replication/request_id.rs diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index a6ee487ab..369ce554c 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -37,7 +37,6 @@ use crate::core::sm; use crate::core::ServerState; use crate::display_ext::DisplayInstantExt; use crate::display_ext::DisplayOptionExt; -use crate::display_ext::DisplayResultExt; use crate::display_ext::DisplaySlice; use crate::display_ext::DisplaySliceExt; use crate::engine::Command; @@ -68,7 +67,6 @@ use crate::network::RPCOption; use crate::network::RPCTypes; use crate::network::RaftNetworkFactory; use crate::progress::entry::ProgressEntry; -use crate::progress::Inflight; use crate::progress::Progress; use crate::quorum::QuorumSet; use crate::raft::message::TransferLeaderRequest; @@ -81,8 +79,6 @@ use crate::raft::VoteResponse; use crate::raft_state::io_state::io_id::IOId; use crate::raft_state::LogStateReader; use crate::replication::request::Replicate; -use crate::replication::request_id::RequestId; -use crate::replication::response::ReplicationResult; use crate::replication::ReplicationCore; use crate::replication::ReplicationHandle; use crate::replication::ReplicationSessionId; @@ -1426,7 +1422,11 @@ where // If vote or membership changes, ignore the message. // There is chance delayed message reports a wrong state. if self.does_replication_session_match(&progress.session_id, "ReplicationProgress") { - self.handle_replication_progress(progress.target, progress.request_id, progress.result); + tracing::debug!(progress = display(&progress), "recv Notification::ReplicationProgress"); + + // replication_handler() won't panic because: + // The leader is still valid because progress.session_id.leader_vote does not change. + self.engine.replication_handler().update_progress(progress.target, progress.result); } } @@ -1442,9 +1442,9 @@ where sending_time = display(sending_time.display()), "HeartbeatProgress" ); - if self.engine.leader.is_some() { - self.engine.replication_handler().update_leader_clock(target, sending_time); - } + // replication_handler() won't panic because: + // The leader is still valid because progress.session_id.leader_vote does not change. + self.engine.replication_handler().update_leader_clock(target, sending_time); } } @@ -1551,33 +1551,6 @@ where self.engine.elect(); } - #[tracing::instrument(level = "debug", skip_all)] - fn handle_replication_progress( - &mut self, - target: C::NodeId, - request_id: u64, - result: Result, String>, - ) { - tracing::debug!( - target = display(target), - request_id = display(request_id), - result = display(result.display()), - "handle_replication_progress" - ); - - #[allow(clippy::collapsible_if)] - if tracing::enabled!(Level::DEBUG) { - if !self.replications.contains_key(&target) { - tracing::warn!("leader has removed target: {}", target); - }; - } - - // A leader may have stepped down. - if self.engine.leader.is_some() { - self.engine.replication_handler().update_progress(target, request_id, result); - } - } - /// If a message is sent by a previous server state but is received by current server state, /// it is a stale message and should be just ignored. fn does_vote_match(&self, sender_vote: &Vote, msg: impl fmt::Display) -> bool { @@ -1790,21 +1763,7 @@ where } Command::Replicate { req, target } => { let node = self.replications.get(&target).expect("replication to target node exists"); - - match req { - Inflight::None => { - let _ = node.tx_repl.send(Replicate::Heartbeat); - } - Inflight::Logs { id, log_id_range } => { - let _ = node.tx_repl.send(Replicate::logs(RequestId::new_append_entries(id), log_id_range)); - } - Inflight::Snapshot { id, last_log_id } => { - // unwrap: The replication channel must not be dropped or it is a bug. - node.tx_repl.send(Replicate::snapshot(RequestId::new_snapshot(id), last_log_id)).map_err( - |_e| StorageError::read_snapshot(None, AnyError::error("replication channel closed")), - )?; - } - } + let _ = node.tx_repl.send(req); } Command::BroadcastTransferLeader { req } => self.broadcast_transfer_leader(req).await, diff --git a/openraft/src/docs/protocol/snapshot_replication.md b/openraft/src/docs/protocol/snapshot_replication.md index 40b169b7e..0a8637564 100644 --- a/openraft/src/docs/protocol/snapshot_replication.md +++ b/openraft/src/docs/protocol/snapshot_replication.md @@ -112,7 +112,7 @@ Installing snapshot includes two steps: The final step is to purge logs up to [`snapshot_meta.last_log_id`]. This step is necessary because: -- 1) A local log that is <= [`snapshot_meta.last_log_id`] may conflict with the leader, and can not be used anymore. +- 1) A local log that is `<=` [`snapshot_meta.last_log_id`] may conflict with the leader, and can not be used anymore. - 2) There may be a hole in the logs, if `snapshot_last_log_id > local_last_log_id`: diff --git a/openraft/src/engine/command.rs b/openraft/src/engine/command.rs index fd962573f..a7f234c4c 100644 --- a/openraft/src/engine/command.rs +++ b/openraft/src/engine/command.rs @@ -11,7 +11,6 @@ use crate::engine::CommandKind; use crate::error::Infallible; use crate::error::InitializeError; use crate::error::InstallSnapshotError; -use crate::progress::Inflight; use crate::raft::message::TransferLeaderRequest; use crate::raft::AppendEntriesResponse; use crate::raft::InstallSnapshotResponse; @@ -19,6 +18,7 @@ use crate::raft::SnapshotResponse; use crate::raft::VoteRequest; use crate::raft::VoteResponse; use crate::raft_state::IOId; +use crate::replication::request::Replicate; use crate::replication::ReplicationSessionId; use crate::type_config::alias::OneshotSenderOf; use crate::vote::CommittedVote; @@ -96,7 +96,7 @@ where C: RaftTypeConfig }, /// Replicate log entries or snapshot to a target. - Replicate { target: C::NodeId, req: Inflight }, + Replicate { target: C::NodeId, req: Replicate }, /// Broadcast transfer Leader message to all other nodes. BroadcastTransferLeader { req: TransferLeaderRequest }, diff --git a/openraft/src/engine/handler/leader_handler/append_entries_test.rs b/openraft/src/engine/handler/leader_handler/append_entries_test.rs index 7eacf45d6..a8acd5a6f 100644 --- a/openraft/src/engine/handler/leader_handler/append_entries_test.rs +++ b/openraft/src/engine/handler/leader_handler/append_entries_test.rs @@ -14,10 +14,11 @@ use crate::engine::Command; use crate::engine::Engine; use crate::engine::ReplicationProgress; use crate::entry::RaftEntry; +use crate::log_id_range::LogIdRange; use crate::progress::entry::ProgressEntry; -use crate::progress::Inflight; use crate::raft_state::IOId; use crate::raft_state::LogStateReader; +use crate::replication::request::Replicate; use crate::testing::blank_ent; use crate::testing::log_id; use crate::type_config::TypeConfigExt; @@ -153,11 +154,11 @@ fn test_leader_append_entries_normal() -> anyhow::Result<()> { }, Command::Replicate { target: 2, - req: Inflight::logs(None, Some(log_id(3, 1, 6))).with_id(1), + req: Replicate::logs(LogIdRange::new(None, Some(log_id(3, 1, 6)))), }, Command::Replicate { target: 3, - req: Inflight::logs(None, Some(log_id(3, 1, 6))).with_id(1), + req: Replicate::logs(LogIdRange::new(None, Some(log_id(3, 1, 6)))), }, ], eng.output.take_commands() @@ -285,7 +286,7 @@ fn test_leader_append_entries_with_membership_log() -> anyhow::Result<()> { }, Command::Replicate { target: 2, - req: Inflight::logs(None, Some(log_id(3, 1, 6))).with_id(1), + req: Replicate::logs(LogIdRange::new(None, Some(log_id(3, 1, 6)))) }, ], eng.output.take_commands() diff --git a/openraft/src/engine/handler/replication_handler/append_membership_test.rs b/openraft/src/engine/handler/replication_handler/append_membership_test.rs index ee0497663..af6ffd182 100644 --- a/openraft/src/engine/handler/replication_handler/append_membership_test.rs +++ b/openraft/src/engine/handler/replication_handler/append_membership_test.rs @@ -155,32 +155,27 @@ fn test_leader_append_membership_update_learner_process() -> anyhow::Result<()> if let Some(l) = &mut eng.leader.as_mut() { assert_eq!( &ProgressEntry::new(Some(log_id(1, 1, 4))) - .with_inflight(Inflight::logs(Some(log_id(1, 1, 4)), Some(log_id(5, 1, 10))).with_id(1)) - .with_curr_inflight_id(1), + .with_inflight(Inflight::logs(Some(log_id(1, 1, 4)), Some(log_id(5, 1, 10)))), l.progress.get(&4), "learner-4 progress should be transferred to voter progress" ); assert_eq!( &ProgressEntry::new(Some(log_id(1, 1, 3))) - .with_inflight(Inflight::logs(Some(log_id(1, 1, 3)), Some(log_id(5, 1, 10))).with_id(1)) - .with_curr_inflight_id(1), + .with_inflight(Inflight::logs(Some(log_id(1, 1, 3)), Some(log_id(5, 1, 10)))), l.progress.get(&3), "voter-3 progress should be transferred to learner progress" ); assert_eq!( &ProgressEntry::new(Some(log_id(1, 1, 5))) - .with_inflight(Inflight::logs(Some(log_id(1, 1, 5)), Some(log_id(5, 1, 10))).with_id(1)) - .with_curr_inflight_id(1), + .with_inflight(Inflight::logs(Some(log_id(1, 1, 5)), Some(log_id(5, 1, 10)))), l.progress.get(&5), "learner-5 has previous value" ); assert_eq!( - &ProgressEntry::empty(11) - .with_inflight(Inflight::logs(None, Some(log_id(5, 1, 10))).with_id(1)) - .with_curr_inflight_id(1), + &ProgressEntry::empty(11).with_inflight(Inflight::logs(None, Some(log_id(5, 1, 10)))), l.progress.get(&6) ); } else { diff --git a/openraft/src/engine/handler/replication_handler/mod.rs b/openraft/src/engine/handler/replication_handler/mod.rs index dbd1b07b5..bb438a7a3 100644 --- a/openraft/src/engine/handler/replication_handler/mod.rs +++ b/openraft/src/engine/handler/replication_handler/mod.rs @@ -13,6 +13,7 @@ use crate::progress::Progress; use crate::proposer::Leader; use crate::proposer::LeaderQuorumSet; use crate::raft_state::LogStateReader; +use crate::replication::request::Replicate; use crate::replication::response::ReplicationResult; use crate::type_config::alias::InstantOf; use crate::EffectiveMembership; @@ -145,10 +146,9 @@ where C: RaftTypeConfig /// Update progress when replicated data(logs or snapshot) matches on follower/learner and is /// accepted. #[tracing::instrument(level = "debug", skip_all)] - pub(crate) fn update_matching(&mut self, node_id: C::NodeId, inflight_id: u64, log_id: Option>) { + pub(crate) fn update_matching(&mut self, node_id: C::NodeId, log_id: Option>) { tracing::debug!( node_id = display(node_id), - inflight_id = display(inflight_id), log_id = display(log_id.display()), "{}", func_name!() @@ -161,13 +161,7 @@ where C: RaftTypeConfig let quorum_accepted = *self .leader .progress - .update_with(&node_id, |prog_entry| { - let res = prog_entry.update_matching(inflight_id, log_id); - if let Err(e) = &res { - tracing::error!(error = display(e), "update_matching"); - panic!("update_matching error: {}", e); - } - }) + .update_with(&node_id, |prog_entry| prog_entry.update_matching(log_id)) .expect("it should always update existing progress"); tracing::debug!( @@ -213,33 +207,19 @@ where C: RaftTypeConfig /// Update progress when replicated data(logs or snapshot) does not match follower/learner state /// and is rejected. #[tracing::instrument(level = "debug", skip_all)] - pub(crate) fn update_conflicting(&mut self, target: C::NodeId, inflight_id: u64, conflict: LogId) { + pub(crate) fn update_conflicting(&mut self, target: C::NodeId, conflict: LogId) { // TODO(2): test it? let prog_entry = self.leader.progress.get_mut(&target).unwrap(); - debug_assert_eq!( - prog_entry.inflight.get_id(), - Some(inflight_id), - "inflight({:?}) id should match: {}", - prog_entry.inflight, - inflight_id - ); - - prog_entry.update_conflicting(inflight_id, conflict.index).unwrap(); + prog_entry.update_conflicting(conflict.index); } /// Update replication progress when a response is received. #[tracing::instrument(level = "debug", skip_all)] - pub(crate) fn update_progress( - &mut self, - target: C::NodeId, - request_id: u64, - repl_res: Result, String>, - ) { + pub(crate) fn update_progress(&mut self, target: C::NodeId, repl_res: Result, String>) { tracing::debug!( target = display(target), - request_id = display(request_id), result = display(repl_res.display()), progress = display(&self.leader.progress), "{}", @@ -249,29 +229,17 @@ where C: RaftTypeConfig match repl_res { Ok(p) => match p.0 { Ok(matching) => { - self.update_matching(target, request_id, matching); + self.update_matching(target, matching); } Err(conflict) => { - self.update_conflicting(target, request_id, conflict); + self.update_conflicting(target, conflict); } }, Err(err_str) => { - tracing::warn!( - request_id = display(request_id), - result = display(&err_str), - "update progress error" - ); + tracing::warn!(result = display(&err_str), "update progress error"); // Reset inflight state and it will retry. let p = self.leader.progress.get_mut(&target).unwrap(); - - debug_assert!( - p.inflight.is_my_id(request_id), - "inflight({:?}) id should match: {}", - p.inflight, - request_id - ); - p.inflight = Inflight::None; } }; @@ -328,10 +296,12 @@ where C: RaftTypeConfig #[tracing::instrument(level = "debug", skip_all)] pub(crate) fn send_to_target(output: &mut EngineOutput, target: &C::NodeId, inflight: &Inflight) { - output.push_command(Command::Replicate { - target: *target, - req: *inflight, - }); + let req = match inflight { + Inflight::None => unreachable!("no data to send"), + Inflight::Logs { log_id_range } => Replicate::logs(*log_id_range), + Inflight::Snapshot { last_log_id } => Replicate::snapshot(*last_log_id), + }; + output.push_command(Command::Replicate { target: *target, req }); } /// Try to run a pending purge job, if no tasks are using the logs to be purged. @@ -403,8 +373,7 @@ where C: RaftTypeConfig // TODO: It should be self.state.last_log_id() but None is ok. prog_entry.inflight = Inflight::logs(None, upto); - let inflight_id = prog_entry.inflight.get_id().unwrap(); - self.update_matching(id, inflight_id, upto); + self.update_matching(id, upto); } } diff --git a/openraft/src/engine/handler/replication_handler/update_matching_test.rs b/openraft/src/engine/handler/replication_handler/update_matching_test.rs index 8efb64989..7e543ae1b 100644 --- a/openraft/src/engine/handler/replication_handler/update_matching_test.rs +++ b/openraft/src/engine/handler/replication_handler/update_matching_test.rs @@ -50,7 +50,7 @@ fn test_update_matching_no_leader() -> anyhow::Result<()> { let res = std::panic::catch_unwind(move || { let mut eng = eng(); - eng.replication_handler().update_matching(3, 0, Some(log_id(1, 1, 2))); + eng.replication_handler().update_matching(3, Some(log_id(1, 1, 2))); }); tracing::info!("res: {:?}", res); assert!(res.is_err()); @@ -65,25 +65,22 @@ fn test_update_matching() -> anyhow::Result<()> { eng.output.take_commands(); let mut rh = eng.replication_handler(); - let inflight_id_1 = { + { let prog_entry = rh.leader.progress.get_mut(&1).unwrap(); prog_entry.inflight = Inflight::logs(Some(log_id(2, 1, 3)), Some(log_id(2, 1, 4))); - prog_entry.inflight.get_id().unwrap() }; - let inflight_id_2 = { + { let prog_entry = rh.leader.progress.get_mut(&2).unwrap(); prog_entry.inflight = Inflight::logs(Some(log_id(1, 1, 0)), Some(log_id(2, 1, 4))); - prog_entry.inflight.get_id().unwrap() }; - let inflight_id_3 = { + { let prog_entry = rh.leader.progress.get_mut(&3).unwrap(); prog_entry.inflight = Inflight::logs(Some(log_id(1, 1, 1)), Some(log_id(2, 1, 4))); - prog_entry.inflight.get_id().unwrap() }; // progress: None, None, (1,2) { - rh.update_matching(3, inflight_id_3, Some(log_id(1, 1, 2))); + rh.update_matching(3, Some(log_id(1, 1, 2))); assert_eq!(None, rh.state.committed()); assert_eq!(0, rh.output.take_commands().len()); } @@ -91,7 +88,7 @@ fn test_update_matching() -> anyhow::Result<()> { // progress: None, (2,1), (1,2); quorum-ed: (1,2), not at leader vote, not committed { rh.output.clear_commands(); - rh.update_matching(2, inflight_id_2, Some(log_id(2, 1, 1))); + rh.update_matching(2, Some(log_id(2, 1, 1))); assert_eq!(None, rh.state.committed()); assert_eq!(0, rh.output.take_commands().len()); } @@ -99,7 +96,7 @@ fn test_update_matching() -> anyhow::Result<()> { // progress: None, (2,1), (2,3); committed: (2,1) { rh.output.clear_commands(); - rh.update_matching(3, inflight_id_3, Some(log_id(2, 1, 3))); + rh.update_matching(3, Some(log_id(2, 1, 3))); assert_eq!(Some(&log_id(2, 1, 1)), rh.state.committed()); assert_eq!( vec![ @@ -121,7 +118,7 @@ fn test_update_matching() -> anyhow::Result<()> { // progress: (2,4), (2,1), (2,3); committed: (1,3) { rh.output.clear_commands(); - rh.update_matching(1, inflight_id_1, Some(log_id(2, 1, 4))); + rh.update_matching(1, Some(log_id(2, 1, 4))); assert_eq!(Some(&log_id(2, 1, 3)), rh.state.committed()); assert_eq!( vec![ diff --git a/openraft/src/engine/handler/vote_handler/become_leader_test.rs b/openraft/src/engine/handler/vote_handler/become_leader_test.rs index fe6968e11..684a266a2 100644 --- a/openraft/src/engine/handler/vote_handler/become_leader_test.rs +++ b/openraft/src/engine/handler/vote_handler/become_leader_test.rs @@ -10,8 +10,9 @@ use crate::engine::Command; use crate::engine::Engine; use crate::engine::ReplicationProgress; use crate::entry::RaftEntry; +use crate::log_id_range::LogIdRange; use crate::progress::entry::ProgressEntry; -use crate::progress::Inflight; +use crate::replication::request::Replicate; use crate::testing::log_id; use crate::type_config::alias::EntryOf; use crate::type_config::TypeConfigExt; @@ -65,7 +66,7 @@ fn test_become_leader() -> anyhow::Result<()> { }, Command::Replicate { target: 0, - req: Inflight::logs(None, Some(log_id(2, 1, 0))).with_id(1), + req: Replicate::logs(LogIdRange::new(None, Some(log_id(2, 1, 0)))) } ]); diff --git a/openraft/src/engine/tests/handle_vote_resp_test.rs b/openraft/src/engine/tests/handle_vote_resp_test.rs index 357c5557c..01e3dd647 100644 --- a/openraft/src/engine/tests/handle_vote_resp_test.rs +++ b/openraft/src/engine/tests/handle_vote_resp_test.rs @@ -12,10 +12,11 @@ use crate::engine::Engine; use crate::engine::LogIdList; use crate::engine::ReplicationProgress; use crate::entry::RaftEntry; +use crate::log_id_range::LogIdRange; use crate::progress::entry::ProgressEntry; -use crate::progress::Inflight; use crate::raft::VoteResponse; use crate::raft_state::IOId; +use crate::replication::request::Replicate; use crate::testing::log_id; use crate::type_config::TypeConfigExt; use crate::utime::Leased; @@ -218,7 +219,7 @@ fn test_handle_vote_resp_equal_vote() -> anyhow::Result<()> { }, Command::Replicate { target: 2, - req: Inflight::logs(None, Some(log_id(2, 1, 1))).with_id(1), + req: Replicate::logs(LogIdRange::new(None, Some(log_id(2, 1, 1)))) }, ], eng.output.take_commands() diff --git a/openraft/src/engine/tests/startup_test.rs b/openraft/src/engine/tests/startup_test.rs index 487d6bc6a..b73402036 100644 --- a/openraft/src/engine/tests/startup_test.rs +++ b/openraft/src/engine/tests/startup_test.rs @@ -10,8 +10,10 @@ use crate::engine::Engine; use crate::engine::LogIdList; use crate::engine::ReplicationProgress; use crate::entry::RaftEntry; +use crate::log_id_range::LogIdRange; use crate::progress::entry::ProgressEntry; use crate::progress::Inflight; +use crate::replication::request::Replicate; use crate::testing::log_id; use crate::type_config::TypeConfigExt; use crate::utime::Leased; @@ -69,7 +71,6 @@ fn test_startup_as_leader_without_logs() -> anyhow::Result<()> { Command::RebuildReplicationStreams { targets: vec![ReplicationProgress(3, ProgressEntry { matching: None, - curr_inflight_id: 0, inflight: Inflight::None, searching_end: 4 })] @@ -80,7 +81,7 @@ fn test_startup_as_leader_without_logs() -> anyhow::Result<()> { }, Command::Replicate { target: 3, - req: Inflight::logs(None, Some(log_id(2, 2, 4))).with_id(1) + req: Replicate::logs(LogIdRange::new(None, Some(log_id(2, 2, 4)))) } ], eng.output.take_commands() @@ -118,14 +119,13 @@ fn test_startup_as_leader_with_proposed_logs() -> anyhow::Result<()> { Command::RebuildReplicationStreams { targets: vec![ReplicationProgress(3, ProgressEntry { matching: None, - curr_inflight_id: 0, inflight: Inflight::None, searching_end: 7 })] }, Command::Replicate { target: 3, - req: Inflight::logs(None, Some(log_id(1, 2, 6))).with_id(1) + req: Replicate::logs(LogIdRange::new(None, Some(log_id(1, 2, 6)))) } ], eng.output.take_commands() diff --git a/openraft/src/progress/entry/mod.rs b/openraft/src/progress/entry/mod.rs index 09b40ea2b..3b522f317 100644 --- a/openraft/src/progress/entry/mod.rs +++ b/openraft/src/progress/entry/mod.rs @@ -8,7 +8,6 @@ use validit::Validate; use crate::display_ext::DisplayOptionExt; use crate::progress::inflight::Inflight; -use crate::progress::inflight::InflightError; use crate::raft_state::LogStateReader; use crate::LogId; use crate::LogIdOptionExt; @@ -23,8 +22,6 @@ where C: RaftTypeConfig /// The id of the last matching log on the target following node. pub(crate) matching: Option>, - pub(crate) curr_inflight_id: u64, - /// The data being transmitted in flight. /// /// A non-none inflight expects a response when the data was successfully sent or failed. @@ -41,7 +38,6 @@ where C: RaftTypeConfig pub(crate) fn new(matching: Option>) -> Self { Self { matching, - curr_inflight_id: 0, inflight: Inflight::None, searching_end: matching.next_index(), } @@ -53,19 +49,11 @@ where C: RaftTypeConfig pub(crate) fn empty(end: u64) -> Self { Self { matching: None, - curr_inflight_id: 0, inflight: Inflight::None, searching_end: end, } } - // This method is only used by tests. - #[allow(dead_code)] - pub(crate) fn with_curr_inflight_id(mut self, v: u64) -> Self { - self.curr_inflight_id = v; - self - } - // This method is only used by tests. #[allow(dead_code)] pub(crate) fn with_inflight(mut self, inflight: Inflight) -> Self { @@ -89,27 +77,20 @@ where C: RaftTypeConfig } } - pub(crate) fn update_matching( - &mut self, - request_id: u64, - matching: Option>, - ) -> Result<(), InflightError> { + pub(crate) fn update_matching(&mut self, matching: Option>) { tracing::debug!( self = display(&self), - request_id = display(request_id), matching = display(matching.display()), "update_matching" ); - self.inflight.ack(request_id, matching)?; + self.inflight.ack(matching); debug_assert!(matching >= self.matching); self.matching = matching; let matching_next = self.matching.next_index(); self.searching_end = std::cmp::max(self.searching_end, matching_next); - - Ok(()) } /// Update conflicting log index. @@ -124,15 +105,10 @@ where C: RaftTypeConfig /// To allow a follower to clean its data, enable feature flag [`loosen-follower-log-revert`] . /// /// [`loosen-follower-log-revert`]: crate::docs::feature_flags#feature_flag_loosen_follower_log_revert - pub(crate) fn update_conflicting(&mut self, request_id: u64, conflict: u64) -> Result<(), InflightError> { - tracing::debug!( - self = debug(&self), - request_id = display(request_id), - conflict = display(conflict), - "update_conflict" - ); + pub(crate) fn update_conflicting(&mut self, conflict: u64) { + tracing::debug!(self = debug(&self), conflict = display(conflict), "update_conflict"); - self.inflight.conflict(request_id, conflict)?; + self.inflight.conflict(conflict); debug_assert!(conflict < self.searching_end); self.searching_end = conflict; @@ -162,7 +138,6 @@ where C: RaftTypeConfig conflict ); } - Ok(()) } /// Initialize a replication action: sending log entries or sending snapshot. @@ -200,9 +175,8 @@ where C: RaftTypeConfig // The log the follower needs is purged. // Replicate by snapshot. if self.searching_end < purge_upto_next { - self.curr_inflight_id += 1; let snapshot_last = log_state.snapshot_last_log_id(); - self.inflight = Inflight::snapshot(snapshot_last.copied()).with_id(self.curr_inflight_id); + self.inflight = Inflight::snapshot(snapshot_last.copied()); return Ok(&self.inflight); } @@ -223,8 +197,7 @@ where C: RaftTypeConfig let prev = log_state.prev_log_id(start); let last = log_state.prev_log_id(end); - self.curr_inflight_id += 1; - self.inflight = Inflight::logs(prev, last).with_id(self.curr_inflight_id); + self.inflight = Inflight::logs(prev, last); Ok(&self.inflight) } diff --git a/openraft/src/progress/entry/tests.rs b/openraft/src/progress/entry/tests.rs index db4b1ece3..f528f4884 100644 --- a/openraft/src/progress/entry/tests.rs +++ b/openraft/src/progress/entry/tests.rs @@ -42,12 +42,12 @@ fn test_update_matching() -> anyhow::Result<()> { { let mut pe = ProgressEntry::::empty(20); pe.inflight = inflight_logs(5, 10); - pe.update_matching(pe.inflight.id(), Some(log_id(6)))?; + pe.update_matching(Some(log_id(6))); assert_eq!(inflight_logs(6, 10), pe.inflight); assert_eq!(Some(log_id(6)), pe.matching); assert_eq!(20, pe.searching_end); - pe.update_matching(pe.inflight.id(), Some(log_id(10)))?; + pe.update_matching(Some(log_id(10))); assert_eq!(Inflight::None, pe.inflight); assert_eq!(Some(log_id(10)), pe.matching); assert_eq!(20, pe.searching_end); @@ -59,7 +59,7 @@ fn test_update_matching() -> anyhow::Result<()> { pe.matching = Some(log_id(6)); pe.inflight = inflight_logs(5, 20); - pe.update_matching(pe.inflight.id(), Some(log_id(20)))?; + pe.update_matching(Some(log_id(20))); assert_eq!(21, pe.searching_end); } @@ -71,7 +71,7 @@ fn test_update_conflicting() -> anyhow::Result<()> { let mut pe = ProgressEntry::::empty(20); pe.matching = Some(log_id(3)); pe.inflight = inflight_logs(5, 10); - pe.update_conflicting(pe.inflight.id(), 5)?; + pe.update_conflicting(5); assert_eq!(Inflight::None, pe.inflight); assert_eq!(&Some(log_id(3)), pe.borrow()); assert_eq!(5, pe.searching_end); @@ -145,7 +145,7 @@ impl LogStateReader for LogState { #[test] fn test_next_send() -> anyhow::Result<()> { - // There is already inflight data, return it in an Err + // There is already inflight data, return it in an Error { let mut pe = ProgressEntry::::empty(20); pe.inflight = inflight_logs(10, 11); @@ -165,7 +165,7 @@ fn test_next_send() -> anyhow::Result<()> { pe.matching = Some(log_id(4)); let res = pe.next_send(&LogState::new(6, 10, 20), 100); - assert_eq!(Ok(&Inflight::snapshot(Some(log_id(10))).with_id(1)), res); + assert_eq!(Ok(&Inflight::snapshot(Some(log_id(10)))), res); } { // matching,end @@ -179,7 +179,7 @@ fn test_next_send() -> anyhow::Result<()> { pe.matching = Some(log_id(4)); let res = pe.next_send(&LogState::new(6, 10, 20), 100); - assert_eq!(Ok(&Inflight::snapshot(Some(log_id(10))).with_id(1)), res); + assert_eq!(Ok(&Inflight::snapshot(Some(log_id(10)))), res); } { @@ -194,7 +194,7 @@ fn test_next_send() -> anyhow::Result<()> { pe.matching = Some(log_id(4)); let res = pe.next_send(&LogState::new(6, 10, 20), 100); - assert_eq!(Ok(&inflight_logs(6, 20).with_id(1)), res); + assert_eq!(Ok(&inflight_logs(6, 20)), res); } { @@ -209,7 +209,7 @@ fn test_next_send() -> anyhow::Result<()> { pe.matching = Some(log_id(4)); let res = pe.next_send(&LogState::new(6, 10, 20), 100); - assert_eq!(Ok(&inflight_logs(6, 20).with_id(1)), res); + assert_eq!(Ok(&inflight_logs(6, 20)), res); } //----------- @@ -226,7 +226,7 @@ fn test_next_send() -> anyhow::Result<()> { pe.matching = Some(log_id(6)); let res = pe.next_send(&LogState::new(6, 10, 20), 100); - assert_eq!(Ok(&inflight_logs(6, 20).with_id(1)), res); + assert_eq!(Ok(&inflight_logs(6, 20)), res); } { @@ -241,7 +241,7 @@ fn test_next_send() -> anyhow::Result<()> { pe.matching = Some(log_id(6)); let res = pe.next_send(&LogState::new(6, 10, 20), 100); - assert_eq!(Ok(&inflight_logs(6, 20).with_id(1)), res); + assert_eq!(Ok(&inflight_logs(6, 20)), res); } { @@ -256,7 +256,7 @@ fn test_next_send() -> anyhow::Result<()> { pe.matching = Some(log_id(6)); let res = pe.next_send(&LogState::new(6, 10, 20), 100); - assert_eq!(Ok(&inflight_logs(6, 20).with_id(1)), res); + assert_eq!(Ok(&inflight_logs(6, 20)), res); } { @@ -271,7 +271,7 @@ fn test_next_send() -> anyhow::Result<()> { pe.matching = Some(log_id(7)); let res = pe.next_send(&LogState::new(6, 10, 20), 100); - assert_eq!(Ok(&inflight_logs(7, 20).with_id(1)), res); + assert_eq!(Ok(&inflight_logs(7, 20)), res); } { @@ -286,7 +286,7 @@ fn test_next_send() -> anyhow::Result<()> { pe.matching = Some(log_id(7)); let res = pe.next_send(&LogState::new(6, 10, 20), 100); - assert_eq!(Ok(&inflight_logs(7, 20).with_id(1)), res); + assert_eq!(Ok(&inflight_logs(7, 20)), res); } { @@ -317,7 +317,7 @@ fn test_next_send() -> anyhow::Result<()> { pe.matching = Some(log_id(7)); let res = pe.next_send(&LogState::new(6, 10, 20), 5); - assert_eq!(Ok(&inflight_logs(7, 12).with_id(1)), res); + assert_eq!(Ok(&inflight_logs(7, 12)), res); } Ok(()) } diff --git a/openraft/src/progress/inflight/mod.rs b/openraft/src/progress/inflight/mod.rs index f42db7772..42b7afef3 100644 --- a/openraft/src/progress/inflight/mod.rs +++ b/openraft/src/progress/inflight/mod.rs @@ -13,19 +13,6 @@ use crate::LogId; use crate::LogIdOptionExt; use crate::RaftTypeConfig; -#[derive(Debug)] -#[derive(thiserror::Error)] -pub(crate) enum InflightError { - #[error("got invalid request id, mine={mine:?}, received={received}")] - InvalidRequestId { mine: Option, received: u64 }, -} - -impl InflightError { - pub(crate) fn invalid_request_id(mine: Option, received: u64) -> Self { - Self::InvalidRequestId { mine, received } - } -} - /// The inflight data being transmitting from leader to a follower/learner. /// /// If inflight data is non-None, it's waiting for responses from a follower/learner. @@ -39,15 +26,11 @@ where C: RaftTypeConfig /// Being replicating a series of logs. Logs { - id: u64, - log_id_range: LogIdRange, }, /// Being replicating a snapshot. Snapshot { - id: u64, - /// The last log id snapshot includes. /// /// It is None, if the snapshot is empty. @@ -73,9 +56,9 @@ where C: RaftTypeConfig fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { Inflight::None => write!(f, "None"), - Inflight::Logs { id, log_id_range: r } => write!(f, "Logs(id={}):{}", id, r), - Inflight::Snapshot { id, last_log_id } => { - write!(f, "Snapshot(id={}):{}", id, last_log_id.display()) + Inflight::Logs { log_id_range: r } => write!(f, "Logs:{}", r), + Inflight::Snapshot { last_log_id } => { + write!(f, "Snapshot:{}", last_log_id.display()) } } } @@ -91,7 +74,6 @@ where C: RaftTypeConfig Self::None } else { Self::Logs { - id: 0, log_id_range: LogIdRange::new(prev, last), } } @@ -100,58 +82,10 @@ where C: RaftTypeConfig /// Create inflight state for sending snapshot. pub(crate) fn snapshot(snapshot_last_log_id: Option>) -> Self { Self::Snapshot { - id: 0, last_log_id: snapshot_last_log_id, } } - /// Set a request id for identifying response. - pub(crate) fn with_id(self, id: u64) -> Self { - match self { - Inflight::None => Inflight::None, - Inflight::Logs { id: _, log_id_range } => Inflight::Logs { id, log_id_range }, - Inflight::Snapshot { id: _, last_log_id } => Inflight::Snapshot { id, last_log_id }, - } - } - - pub(crate) fn is_my_id(&self, res_id: u64) -> bool { - match self { - Inflight::None => false, - Inflight::Logs { id, .. } => *id == res_id, - Inflight::Snapshot { id, .. } => *id == res_id, - } - } - - pub(crate) fn assert_my_id(&self, res_id: u64) -> Result<(), InflightError> { - match self { - Inflight::None => return Err(InflightError::invalid_request_id(None, res_id)), - Inflight::Logs { id, .. } => { - if *id != res_id { - return Err(InflightError::invalid_request_id(Some(*id), res_id)); - } - } - Inflight::Snapshot { id, .. } => { - if *id != res_id { - return Err(InflightError::invalid_request_id(Some(*id), res_id)); - } - } - } - Ok(()) - } - - pub(crate) fn get_id(&self) -> Option { - match self { - Inflight::None => None, - Inflight::Logs { id, .. } => Some(*id), - Inflight::Snapshot { id, .. } => Some(*id), - } - } - - #[allow(unused)] - pub(crate) fn id(&self) -> u64 { - self.get_id().unwrap() - } - pub(crate) fn is_none(&self) -> bool { &Inflight::None == self } @@ -169,58 +103,39 @@ where C: RaftTypeConfig } /// Update inflight state when log upto `upto` is acknowledged by a follower/learner. - pub(crate) fn ack(&mut self, request_id: u64, upto: Option>) -> Result<(), InflightError> { - let res = self.assert_my_id(request_id); - if let Err(e) = &res { - tracing::error!("inflight ack error: {}", e); - return res; - } - + pub(crate) fn ack(&mut self, upto: Option>) { match self { Inflight::None => { unreachable!("no inflight data") } - Inflight::Logs { id, log_id_range } => { + Inflight::Logs { log_id_range } => { *self = { debug_assert!(upto >= log_id_range.prev); debug_assert!(upto <= log_id_range.last); - Inflight::logs(upto, log_id_range.last).with_id(*id) + Inflight::logs(upto, log_id_range.last) } } - Inflight::Snapshot { id: _, last_log_id } => { + Inflight::Snapshot { last_log_id } => { debug_assert_eq!(&upto, last_log_id); *self = Inflight::None; } } - - Ok(()) } /// Update inflight state when a conflicting log id is responded by a follower/learner. - pub(crate) fn conflict(&mut self, request_id: u64, conflict: u64) -> Result<(), InflightError> { - let res = self.assert_my_id(request_id); - if let Err(e) = &res { - tracing::error!("inflight ack error: {}", e); - return res; - } - + pub(crate) fn conflict(&mut self, conflict: u64) { match self { Inflight::None => { unreachable!("no inflight data") } - Inflight::Logs { - id: _, - log_id_range: logs, - } => { + Inflight::Logs { log_id_range: logs } => { // if prev_log_id==None, it will never conflict debug_assert_eq!(Some(conflict), logs.prev.index()); *self = Inflight::None } - Inflight::Snapshot { id: _, last_log_id: _ } => { + Inflight::Snapshot { last_log_id: _ } => { unreachable!("sending snapshot should not conflict"); } - }; - - Ok(()) + } } } diff --git a/openraft/src/progress/inflight/tests.rs b/openraft/src/progress/inflight/tests.rs index 2c1c0b343..b0d4e04ec 100644 --- a/openraft/src/progress/inflight/tests.rs +++ b/openraft/src/progress/inflight/tests.rs @@ -19,7 +19,6 @@ fn test_inflight_create() -> anyhow::Result<()> { let l = Inflight::::logs(Some(log_id(5)), Some(log_id(10))); assert_eq!( Inflight::Logs { - id: 0, log_id_range: LogIdRange::new(Some(log_id(5)), Some(log_id(10))) }, l @@ -34,7 +33,6 @@ fn test_inflight_create() -> anyhow::Result<()> { let l = Inflight::::snapshot(Some(log_id(10))); assert_eq!( Inflight::Snapshot { - id: 0, last_log_id: Some(log_id(10)) }, l @@ -58,44 +56,28 @@ fn test_inflight_is_xxx() -> anyhow::Result<()> { Ok(()) } -#[test] -fn test_inflight_ack_with_invalid_request_id() -> anyhow::Result<()> { - let mut f = Inflight::::None; - let res = f.ack(1, Some(log_id(4))); - assert!(res.is_err(), "Inflight::None can not ack"); - - let mut f = Inflight::::logs(Some(log_id(5)), Some(log_id(10))); - let res = f.ack(100, Some(log_id(4))); - assert!(res.is_err(), "invalid request id for log"); - - let mut f = Inflight::::snapshot(Some(log_id(5))); - let res = f.ack(100, Some(log_id(4))); - assert!(res.is_err(), "invalid request id for snapshot"); - Ok(()) -} - #[test] fn test_inflight_ack() -> anyhow::Result<()> { // Update matching when transmitting by logs { let mut f = Inflight::::logs(Some(log_id(5)), Some(log_id(10))); - f.ack(f.id(), Some(log_id(5)))?; + f.ack(Some(log_id(5))); assert_eq!(Inflight::::logs(Some(log_id(5)), Some(log_id(10))), f); - f.ack(f.id(), Some(log_id(6)))?; + f.ack(Some(log_id(6))); assert_eq!(Inflight::::logs(Some(log_id(6)), Some(log_id(10))), f); - f.ack(f.id(), Some(log_id(9)))?; + f.ack(Some(log_id(9))); assert_eq!(Inflight::::logs(Some(log_id(9)), Some(log_id(10))), f); - f.ack(f.id(), Some(log_id(10)))?; + f.ack(Some(log_id(10))); assert_eq!(Inflight::::None, f); { let res = std::panic::catch_unwind(|| { let mut f = Inflight::::logs(Some(log_id(5)), Some(log_id(10))); - f.ack(f.id(), Some(log_id(4))).unwrap(); + f.ack(Some(log_id(4))); }); tracing::info!("res: {:?}", res); assert!(res.is_err(), "non-matching ack < prev_log_id"); @@ -104,7 +86,7 @@ fn test_inflight_ack() -> anyhow::Result<()> { { let res = std::panic::catch_unwind(|| { let mut f = Inflight::::logs(Some(log_id(5)), Some(log_id(10))); - f.ack(f.id(), Some(log_id(11))).unwrap(); + f.ack(Some(log_id(11))); }); tracing::info!("res: {:?}", res); assert!(res.is_err(), "non-matching ack > prev_log_id"); @@ -115,14 +97,14 @@ fn test_inflight_ack() -> anyhow::Result<()> { { { let mut f = Inflight::::snapshot(Some(log_id(5))); - f.ack(f.id(), Some(log_id(5)))?; + f.ack(Some(log_id(5))); assert_eq!(Inflight::::None, f, "valid ack"); } { let res = std::panic::catch_unwind(|| { let mut f = Inflight::::snapshot(Some(log_id(5))); - f.ack(f.id(), Some(log_id(4))).unwrap(); + f.ack(Some(log_id(4))); }); tracing::info!("res: {:?}", res); assert!(res.is_err(), "non-matching ack != snapshot.last_log_id"); @@ -132,27 +114,18 @@ fn test_inflight_ack() -> anyhow::Result<()> { Ok(()) } -#[test] -fn test_inflight_ack_inherit_request_id() -> anyhow::Result<()> { - let mut f = Inflight::::logs(Some(log_id(5)), Some(log_id(10))).with_id(10); - - f.ack(f.id(), Some(log_id(5)))?; - assert_eq!(Some(10), f.get_id()); - Ok(()) -} - #[test] fn test_inflight_conflict() -> anyhow::Result<()> { { let mut f = Inflight::::logs(Some(log_id(5)), Some(log_id(10))); - f.conflict(f.id(), 5)?; + f.conflict(5); assert_eq!(Inflight::::None, f, "valid conflict"); } { let res = std::panic::catch_unwind(|| { let mut f = Inflight::::logs(Some(log_id(5)), Some(log_id(10))); - f.conflict(f.id(), 4).unwrap(); + f.conflict(4); }); tracing::info!("res: {:?}", res); assert!(res.is_err(), "non-matching conflict < prev_log_id"); @@ -161,7 +134,7 @@ fn test_inflight_conflict() -> anyhow::Result<()> { { let res = std::panic::catch_unwind(|| { let mut f = Inflight::::logs(Some(log_id(5)), Some(log_id(10))); - f.conflict(f.id(), 6).unwrap(); + f.conflict(6); }); tracing::info!("res: {:?}", res); assert!(res.is_err(), "non-matching conflict > prev_log_id"); @@ -170,7 +143,7 @@ fn test_inflight_conflict() -> anyhow::Result<()> { { let res = std::panic::catch_unwind(|| { let mut f = Inflight::::snapshot(Some(log_id(5))); - f.conflict(f.id(), 5).unwrap(); + f.conflict(5); }); tracing::info!("res: {:?}", res); assert!(res.is_err(), "conflict is not expected by Inflight::Snapshot"); @@ -178,22 +151,10 @@ fn test_inflight_conflict() -> anyhow::Result<()> { Ok(()) } -#[test] -fn test_inflight_conflict_invalid_request_id() -> anyhow::Result<()> { - let mut f = Inflight::::None; - let res = f.conflict(1, 5); - assert!(res.is_err(), "conflict is not expected by Inflight::None"); - - let mut f = Inflight::::logs(Some(log_id(5)), Some(log_id(10))); - let res = f.conflict(100, 5); - assert!(res.is_err(), "conflict with invalid request id"); - Ok(()) -} #[test] fn test_inflight_validate() -> anyhow::Result<()> { let r = Inflight::Logs { - id: 0, log_id_range: LogIdRange::::new(Some(log_id(5)), Some(log_id(4))), }; let res = r.validate(); diff --git a/openraft/src/replication/callbacks.rs b/openraft/src/replication/callbacks.rs index 0f8efc82b..dc94883a1 100644 --- a/openraft/src/replication/callbacks.rs +++ b/openraft/src/replication/callbacks.rs @@ -9,6 +9,7 @@ use crate::SnapshotMeta; /// Callback payload when a snapshot transmission finished, successfully or not. #[derive(Debug)] +#[derive(PartialEq, Eq)] pub(crate) struct SnapshotCallback { // TODO: Remove `start_time`. // Because sending snapshot is a long lasting process, diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index 580f90ec5..fbbe46e8f 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -4,7 +4,6 @@ pub(crate) mod callbacks; pub(crate) mod hint; mod replication_session_id; pub(crate) mod request; -pub(crate) mod request_id; pub(crate) mod response; use std::sync::Arc; @@ -14,7 +13,6 @@ use anyerror::AnyError; use futures::future::FutureExt; pub(crate) use replication_session_id::ReplicationSessionId; use request::Data; -use request::DataWithId; use request::Replicate; pub(crate) use response::Progress; use response::ReplicationResult; @@ -44,7 +42,6 @@ use crate::raft::AppendEntriesRequest; use crate::raft::AppendEntriesResponse; use crate::replication::callbacks::SnapshotCallback; use crate::replication::hint::ReplicationHint; -use crate::replication::request_id::RequestId; use crate::storage::RaftLogReader; use crate::storage::RaftLogStorage; use crate::storage::Snapshot; @@ -226,20 +223,20 @@ where tracing::debug!(replication_data = display(&d), "{} send replication RPC", func_name!()); - let request_id = d.request_id(); + // If an RPC response is expected by RaftCore + let need_notify = d.has_payload(); let res = match d { - Data::Heartbeat => { + Data::Committed => { let m = &self.matching; - // request_id==None will be ignored by RaftCore. - let d = DataWithId::new(RequestId::new_heartbeat(), LogIdRange::new(*m, *m)); + let d = LogIdRange::new(*m, *m); - log_data = Some(d.clone()); - self.send_log_entries(d).await + log_data = Some(d); + self.send_log_entries(d, false).await } Data::Logs(log) => { - log_data = Some(log.clone()); - self.send_log_entries(log).await + log_data = Some(log); + self.send_log_entries(log, true).await } Data::Snapshot(snap) => self.stream_snapshot(snap).await, Data::SnapshotCallback(resp) => self.handle_snapshot_callback(resp), @@ -306,7 +303,12 @@ where if retry { debug_assert!(self.next_action.is_some(), "next_action must be Some"); } else { - self.send_progress_error(request_id, err); + // If there is no id, it is a heartbeat and do not need to notify RaftCore + if need_notify { + self.send_progress_error(err); + } else { + tracing::warn!("heartbeat RPC failed, do not send any response to RaftCore"); + }; } } }; @@ -359,22 +361,20 @@ where /// configured heartbeat interval. /// /// If an RPC is made but not completely finished, it returns the next action expected to do. + /// + /// `has_payload` indicates if there are any data(AppendEntries) to send, or it is a heartbeat. + /// `has_payload` decides if it needs to send back notification to RaftCore. #[tracing::instrument(level = "debug", skip_all)] async fn send_log_entries( &mut self, - log_ids: DataWithId>, + log_ids: LogIdRange, + has_payload: bool, ) -> Result>, ReplicationError> { - let request_id = log_ids.request_id(); - - tracing::debug!( - request_id = display(request_id), - log_id_range = display(log_ids.data()), - "send_log_entries", - ); + tracing::debug!(log_id_range = display(log_ids), "send_log_entries",); // Series of logs to send, and the last log id to send let (logs, sending_range) = { - let rng = log_ids.data(); + let rng = &log_ids; // The log index start and end to send. let (start, end) = { @@ -462,18 +462,24 @@ where self.notify_heartbeat_progress(leader_time); let matching = sending_range.last; - self.notify_progress(log_ids.request_id(), ReplicationResult(Ok(matching))); - - Ok(self.next_action_to_send(matching, log_ids)) + if has_payload { + self.notify_progress(ReplicationResult(Ok(matching))); + Ok(self.next_action_to_send(matching, log_ids)) + } else { + Ok(None) + } } AppendEntriesResponse::PartialSuccess(matching) => { Self::debug_assert_partial_success(&sending_range, &matching); self.notify_heartbeat_progress(leader_time); - self.notify_progress(log_ids.request_id(), ReplicationResult(Ok(matching))); - - Ok(self.next_action_to_send(matching, log_ids)) + if has_payload { + self.notify_progress(ReplicationResult(Ok(matching))); + Ok(self.next_action_to_send(matching, log_ids)) + } else { + Ok(None) + } } AppendEntriesResponse::HigherVote(vote) => { debug_assert!( @@ -497,7 +503,9 @@ where // Conflict is also a successful replication RPC, because the leadership is acknowledged. self.notify_heartbeat_progress(leader_time); - self.notify_progress(request_id, ReplicationResult(Err(conflict))); + if has_payload { + self.notify_progress(ReplicationResult(Err(conflict))); + } Ok(None) } @@ -506,15 +514,10 @@ where /// Send the error result to RaftCore. /// RaftCore will then submit another replication command. - fn send_progress_error(&mut self, request_id: RequestId, err: RPCError) { - // If there is no id, it is a heartbeat and do not need to notify RaftCore - let Some(request_id) = request_id.request_id() else { - return; - }; + fn send_progress_error(&mut self, err: RPCError) { let _ = self.tx_raft_core.send(Notification::ReplicationProgress { progress: Progress { target: self.target, - request_id, result: Err(err.to_string()), session_id: self.session_id, }, @@ -536,9 +539,8 @@ where } /// Notify RaftCore with the success replication result(log matching or conflict). - fn notify_progress(&mut self, request_id: RequestId, replication_result: ReplicationResult) { + fn notify_progress(&mut self, replication_result: ReplicationResult) { tracing::debug!( - request_id = display(request_id), target = display(self.target), curr_matching = display(self.matching.display()), result = display(&replication_result), @@ -556,17 +558,10 @@ where } } - // If there is no request id, it is a heartbeat RPC, - // no need to notify RaftCore with progress. - let Some(request_id) = request_id.request_id() else { - return; - }; - let _ = self.tx_raft_core.send({ Notification::ReplicationProgress { progress: Progress { session_id: self.session_id, - request_id, target: self.target, result: Ok(replication_result), }, @@ -696,13 +691,7 @@ where // If there is no action, fill in an heartbeat action to send committed index. if self.next_action.is_none() { - self.next_action = Some(Data::new_heartbeat()); - } - } - Replicate::Heartbeat => { - // Never overwrite action with payload. - if self.next_action.is_none() { - self.next_action = Some(Data::new_heartbeat()); + self.next_action = Some(Data::new_committed()); } } Replicate::Data(d) => { @@ -739,11 +728,9 @@ where #[tracing::instrument(level = "info", skip_all)] async fn stream_snapshot( &mut self, - snapshot_req: DataWithId>>, + _snapshot_req: Option>, ) -> Result>, ReplicationError> { - let request_id = snapshot_req.request_id(); - - tracing::info!(request_id = display(request_id), "{}", func_name!()); + tracing::info!("{}", func_name!()); let snapshot = self.snapshot_reader.get_snapshot().await.map_err(|reason| { tracing::warn!(error = display(&reason), "failed to get snapshot from state machine"); @@ -751,8 +738,7 @@ where })?; tracing::info!( - "received snapshot: request_id={}; meta:{}", - request_id, + "received snapshot: meta:{}", snapshot.as_ref().map(|x| &x.meta).display() ); @@ -770,7 +756,6 @@ where let (tx_cancel, rx_cancel) = C::oneshot(); let jh = C::spawn(Self::send_snapshot( - request_id, self.snapshot_network.clone(), *self.session_id.vote_ref(), snapshot, @@ -788,7 +773,6 @@ where } async fn send_snapshot( - request_id: RequestId, network: Arc>, vote: Vote, snapshot: Snapshot, @@ -813,7 +797,7 @@ where } if let Some(tx_noty) = weak_tx.upgrade() { - let data = Data::new_snapshot_callback(request_id, start_time, meta, res); + let data = Data::new_snapshot_callback(start_time, meta, res); let send_res = tx_noty.send(Replicate::new_data(data)); if send_res.is_err() { tracing::warn!("weak_tx failed to send snapshot result to ReplicationCore"); @@ -825,23 +809,21 @@ where fn handle_snapshot_callback( &mut self, - callback: DataWithId>, + callback: SnapshotCallback, ) -> Result>, ReplicationError> { tracing::debug!( - request_id = debug(callback.request_id()), - response = display(callback.data()), + response = display(&callback), matching = display(self.matching.display()), "handle_snapshot_response" ); self.snapshot_state = None; - let request_id = callback.request_id(); let SnapshotCallback { start_time, result, snapshot_meta, - } = callback.into_data(); + } = callback; let resp = result?; @@ -855,22 +837,15 @@ where } self.notify_heartbeat_progress(start_time); - self.notify_progress(request_id, ReplicationResult(Ok(snapshot_meta.last_log_id))); + self.notify_progress(ReplicationResult(Ok(snapshot_meta.last_log_id))); Ok(None) } /// If there are more logs to send, it returns a new `Some(Data::Logs)` to send. - fn next_action_to_send( - &mut self, - matching: Option>, - log_ids: DataWithId>, - ) -> Option> { - if matching < log_ids.data().last { - Some(Data::new_logs( - log_ids.request_id(), - LogIdRange::new(matching, log_ids.data().last), - )) + fn next_action_to_send(&mut self, matching: Option>, log_ids: LogIdRange) -> Option> { + if matching < log_ids.last { + Some(Data::new_logs(LogIdRange::new(matching, log_ids.last))) } else { None } diff --git a/openraft/src/replication/request.rs b/openraft/src/replication/request.rs index de0034620..fe744f6f8 100644 --- a/openraft/src/replication/request.rs +++ b/openraft/src/replication/request.rs @@ -4,15 +4,13 @@ use crate::type_config::alias::LogIdOf; /// A replication request sent by RaftCore leader state to replication stream. #[derive(Debug)] +#[derive(PartialEq, Eq)] pub(crate) enum Replicate where C: RaftTypeConfig { /// Inform replication stream to forward the committed log id to followers/learners. Committed(Option>), - /// Send an empty AppendEntries RPC as heartbeat. - Heartbeat, - /// Send a chunk of data, e.g., logs or snapshot. Data(Data), } @@ -20,12 +18,12 @@ where C: RaftTypeConfig impl Replicate where C: RaftTypeConfig { - pub(crate) fn logs(id: RequestId, log_id_range: LogIdRange) -> Self { - Self::Data(Data::new_logs(id, log_id_range)) + pub(crate) fn logs(log_id_range: LogIdRange) -> Self { + Self::Data(Data::new_logs(log_id_range)) } - pub(crate) fn snapshot(id: RequestId, last_log_id: Option>) -> Self { - Self::Data(Data::new_snapshot(id, last_log_id)) + pub(crate) fn snapshot(last_log_id: Option>) -> Self { + Self::Data(Data::new_snapshot(last_log_id)) } pub(crate) fn new_data(data: Data) -> Self { @@ -37,7 +35,6 @@ impl fmt::Display for Replicate { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Self::Committed(c) => write!(f, "Committed({})", c.display()), - Self::Heartbeat => write!(f, "Heartbeat"), Self::Data(d) => write!(f, "Data({})", d), } } @@ -48,7 +45,6 @@ use crate::error::StreamingError; use crate::log_id_range::LogIdRange; use crate::raft::SnapshotResponse; use crate::replication::callbacks::SnapshotCallback; -use crate::replication::request_id::RequestId; use crate::type_config::alias::InstantOf; use crate::LogId; use crate::RaftTypeConfig; @@ -59,13 +55,14 @@ use crate::SnapshotMeta; /// It defines what data to send to a follower/learner and an id to identify who is sending this /// data. /// Thd data is either a series of logs or a snapshot. +#[derive(PartialEq, Eq)] pub(crate) enum Data where C: RaftTypeConfig { - Heartbeat, - Logs(DataWithId>), - Snapshot(DataWithId>>), - SnapshotCallback(DataWithId>), + Committed, + Logs(LogIdRange), + Snapshot(Option>), + SnapshotCallback(SnapshotCallback), } impl fmt::Debug for Data @@ -73,20 +70,12 @@ where C: RaftTypeConfig { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - Data::Heartbeat => { - write!(f, "Data::Heartbeat") + Data::Committed => { + write!(f, "Data::Committed") } - Self::Logs(l) => f - .debug_struct("Data::Logs") - .field("request_id", &l.request_id()) - .field("log_id_range", &l.data) - .finish(), - Self::Snapshot(s) => f.debug_struct("Data::Snapshot").field("request_id", &s.request_id()).finish(), - Self::SnapshotCallback(resp) => f - .debug_struct("Data::SnapshotCallback") - .field("request_id", &resp.request_id()) - .field("callback", &resp.data) - .finish(), + Self::Logs(l) => f.debug_struct("Data::Logs").field("log_id_range", l).finish(), + Self::Snapshot(s) => f.debug_struct("Data::Snapshot").field("last_log_id", s).finish(), + Self::SnapshotCallback(resp) => f.debug_struct("Data::SnapshotCallback").field("callback", resp).finish(), } } } @@ -94,21 +83,17 @@ where C: RaftTypeConfig impl fmt::Display for Data { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - Data::Heartbeat => { - write!(f, "Heartbeat") + Data::Committed => { + write!(f, "Committed") } Self::Logs(l) => { - write!(f, "Logs{{request_id: {}, log_id_range: {}}}", l.request_id, l.data) + write!(f, "Logs{{log_id_range: {}}}", l) } Self::Snapshot(s) => { - write!(f, "Snapshot{{request_id: {}}}", s.request_id) + write!(f, "Snapshot{{last_log_id:{}}}", s.display()) } Self::SnapshotCallback(l) => { - write!( - f, - "SnapshotCallback{{request_id: {}, callback: {}}}", - l.request_id, l.data - ) + write!(f, "SnapshotCallback{{callback: {}}}", l) } } } @@ -117,71 +102,33 @@ impl fmt::Display for Data { impl Data where C: RaftTypeConfig { - pub(crate) fn new_heartbeat() -> Self { - Self::Heartbeat + pub(crate) fn new_committed() -> Self { + Self::Committed } - pub(crate) fn new_logs(request_id: RequestId, log_id_range: LogIdRange) -> Self { - Self::Logs(DataWithId::new(request_id, log_id_range)) + pub(crate) fn new_logs(log_id_range: LogIdRange) -> Self { + Self::Logs(log_id_range) } - pub(crate) fn new_snapshot(request_id: RequestId, last_log_id: Option>) -> Self { - Self::Snapshot(DataWithId::new(request_id, last_log_id)) + pub(crate) fn new_snapshot(last_log_id: Option>) -> Self { + Self::Snapshot(last_log_id) } pub(crate) fn new_snapshot_callback( - request_id: RequestId, start_time: InstantOf, snapshot_meta: SnapshotMeta, result: Result, StreamingError>, ) -> Self { - Self::SnapshotCallback(DataWithId::new( - request_id, - SnapshotCallback::new(start_time, snapshot_meta, result), - )) - } - - pub(crate) fn request_id(&self) -> RequestId { - match self { - Self::Heartbeat => RequestId::new_heartbeat(), - Self::Logs(l) => l.request_id(), - Self::Snapshot(s) => s.request_id(), - Self::SnapshotCallback(r) => r.request_id(), - } + Self::SnapshotCallback(SnapshotCallback::new(start_time, snapshot_meta, result)) } /// Return true if the data includes any payload, i.e., not a heartbeat. pub(crate) fn has_payload(&self) -> bool { match self { - Self::Heartbeat => false, + Self::Committed => false, Self::Logs(_) => true, Self::Snapshot(_) => true, Self::SnapshotCallback(_) => true, } } } - -#[derive(Clone)] -pub(crate) struct DataWithId { - /// The id of this replication request. - request_id: RequestId, - data: T, -} - -impl DataWithId { - pub(crate) fn new(request_id: RequestId, data: T) -> Self { - Self { request_id, data } - } - - pub(crate) fn request_id(&self) -> RequestId { - self.request_id - } - - pub(crate) fn data(&self) -> &T { - &self.data - } - - pub(crate) fn into_data(self) -> T { - self.data - } -} diff --git a/openraft/src/replication/request_id.rs b/openraft/src/replication/request_id.rs deleted file mode 100644 index ead225072..000000000 --- a/openraft/src/replication/request_id.rs +++ /dev/null @@ -1,43 +0,0 @@ -use std::fmt; - -/// The request id of a replication action. -/// -/// HeartBeat has not payload and does not need a request id. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub(crate) enum RequestId { - HeartBeat, - AppendEntries { id: u64 }, - Snapshot { id: u64 }, -} - -impl RequestId { - pub(crate) fn new_heartbeat() -> Self { - Self::HeartBeat - } - - pub(crate) fn new_append_entries(id: u64) -> Self { - Self::AppendEntries { id } - } - - pub(crate) fn new_snapshot(id: u64) -> Self { - Self::Snapshot { id } - } - - pub(crate) fn request_id(&self) -> Option { - match self { - Self::HeartBeat => None, - Self::AppendEntries { id } => Some(*id), - Self::Snapshot { id } => Some(*id), - } - } -} - -impl fmt::Display for RequestId { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::HeartBeat => write!(f, "HeartBeat"), - Self::AppendEntries { id } => write!(f, "AppendEntries({})", id), - Self::Snapshot { id } => write!(f, "Snapshot({})", id), - } - } -} diff --git a/openraft/src/replication/response.rs b/openraft/src/replication/response.rs index fae9ccd65..991950005 100644 --- a/openraft/src/replication/response.rs +++ b/openraft/src/replication/response.rs @@ -17,9 +17,6 @@ where C: RaftTypeConfig /// The ID of the target node for which the match index is to be updated. pub(crate) target: C::NodeId, - /// The id of the subject that submit this replication action. - pub(crate) request_id: u64, - /// The request by this leader has been successfully handled by the target node, /// or an error in string. /// @@ -46,9 +43,8 @@ where C: RaftTypeConfig fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( f, - "replication::Progress: target={}, request_id: {}, result: {}, session_id: {}", + "replication::Progress: target={}, result: {}, session_id: {}", self.target, - self.request_id, self.result.display(), self.session_id )