From 0a11b9289832c384f9f1a23b039290d2f7808c87 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Thu, 1 Aug 2024 11:10:24 +0800 Subject: [PATCH] Feature: Add `Raft::transfer_leader()` This methods force a node to invalidate current Leader's lease and start to elect for the next new Leader. It wont do anything if the current Leader has changed. --- openraft/src/core/raft_core.rs | 13 ++++ openraft/src/core/raft_msg/mod.rs | 14 ++++ openraft/src/metrics/raft_metrics.rs | 2 +- openraft/src/metrics/wait.rs | 2 +- openraft/src/raft/mod.rs | 70 +++++++++++++++++++ openraft/src/utime.rs | 20 ++++-- tests/tests/client_api/main.rs | 1 + tests/tests/client_api/t14_transfer_leader.rs | 65 +++++++++++++++++ 8 files changed, 180 insertions(+), 7 deletions(-) create mode 100644 tests/tests/client_api/t14_transfer_leader.rs diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 709dbb003..40bf3f504 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -1187,6 +1187,19 @@ where RaftMsg::ExternalCoreRequest { req } => { req(&self.engine.state); } + RaftMsg::TransferLeader { + from: current_leader_vote, + to, + } => { + if self.engine.state.vote_ref() == ¤t_leader_vote { + tracing::info!("Transfer Leader from: {}, to {}", current_leader_vote, to); + + self.engine.state.vote.disable_lease(); + if self.id == to { + self.engine.elect(); + } + } + } RaftMsg::ExternalCommand { cmd } => { tracing::info!(cmd = debug(&cmd), "received RaftMsg::ExternalCommand: {}", func_name!()); diff --git a/openraft/src/core/raft_msg/mod.rs b/openraft/src/core/raft_msg/mod.rs index d56a65dd9..4d5b9edd1 100644 --- a/openraft/src/core/raft_msg/mod.rs +++ b/openraft/src/core/raft_msg/mod.rs @@ -95,6 +95,17 @@ where C: RaftTypeConfig req: BoxOnce<'static, RaftState>, }, + /// Transfer Leader to another node. + /// + /// If this node is `to`, reset Leader lease and start election. + /// Otherwise, just reset Leader lease so that the node `to` can become Leader. + TransferLeader { + /// The vote of the Leader that is transferring the leadership. + from: Vote, + /// The assigned node to be the next Leader. + to: C::NodeId, + }, + ExternalCommand { cmd: ExternalCommand, }, @@ -129,6 +140,9 @@ where C: RaftTypeConfig write!(f, "ChangeMembership: {:?}, retain: {}", changes, retain,) } RaftMsg::ExternalCoreRequest { .. } => write!(f, "External Request"), + RaftMsg::TransferLeader { from, to } => { + write!(f, "TransferLeader: from_leader: vote={}, to: {}", from, to) + } RaftMsg::ExternalCommand { cmd } => { write!(f, "ExternalCommand: {}", cmd) } diff --git a/openraft/src/metrics/raft_metrics.rs b/openraft/src/metrics/raft_metrics.rs index 803b6ca92..9a3e8638f 100644 --- a/openraft/src/metrics/raft_metrics.rs +++ b/openraft/src/metrics/raft_metrics.rs @@ -31,7 +31,7 @@ pub struct RaftMetrics { /// The current term of the Raft node. pub current_term: u64, - /// The last accepted vote. + /// The last flushed vote. pub vote: Vote, /// The last log index has been appended to this Raft node's log. diff --git a/openraft/src/metrics/wait.rs b/openraft/src/metrics/wait.rs index 57196381d..59057b983 100644 --- a/openraft/src/metrics/wait.rs +++ b/openraft/src/metrics/wait.rs @@ -54,7 +54,7 @@ where C: RaftTypeConfig if now >= timeout_at { return Err(WaitError::Timeout( self.timeout, - format!("{} latest: {:?}", msg.to_string(), latest), + format!("{} latest: {}", msg.to_string(), latest), )); } diff --git a/openraft/src/raft/mod.rs b/openraft/src/raft/mod.rs index 750199342..87d03ea6e 100644 --- a/openraft/src/raft/mod.rs +++ b/openraft/src/raft/mod.rs @@ -58,6 +58,7 @@ use crate::core::sm; use crate::core::sm::worker; use crate::core::RaftCore; use crate::core::Tick; +use crate::display_ext::DisplayOptionExt; use crate::engine::Engine; use crate::engine::EngineConfig; use crate::error::CheckIsLeaderError; @@ -87,6 +88,7 @@ use crate::type_config::alias::WatchReceiverOf; use crate::type_config::TypeConfigExt; use crate::LogId; use crate::LogIdOptionExt; +use crate::LogIndexOptionExt; use crate::OptionalSend; use crate::RaftNetworkFactory; use crate::RaftState; @@ -630,6 +632,74 @@ where C: RaftTypeConfig Ok(rx) } + /// Submits LeaderTransfer command to this Raft node. + /// + /// If this node is the `to` node, it resets the Leader lease and triggers an election when the + /// expected log entries are flushed. + /// Otherwise, it just resets the Leader lease to allow the `to` node to become the Leader. + /// + /// To implement Leader transfer, Call this method on every node in the cluster with the same + /// arguments. + // TODO: Explain what the `from` Leader node do. + pub async fn transfer_leader( + &self, + from: Vote, + to: C::NodeId, + flushed_log: Option>, + ) -> Result<(), Fatal> { + // Reset the Leader lease at once and quit, if this is not the assigned next leader. + // Only the assigned next Leader waits for the log to be flushed. + if to != self.inner.id { + self.inner.send_msg(RaftMsg::TransferLeader { from, to }).await?; + return Ok(()); + } + + // If the next Leader is this node, wait for the log to be flushed to make sure the + // RequestVote.last_log_id is upto date. + + // Condition satisfied to become Leader + let ok = |m: &RaftMetrics| (from == m.vote && m.last_log_index.next_index() >= flushed_log.next_index()); + + // Condition failed to become Leader + let fail = |m: &RaftMetrics| !(from >= m.vote); + + let timeout = Some(Duration::from_millis(self.inner.config.election_timeout_min)); + let metrics_res = + self.wait(timeout).metrics(|st| ok(st) || fail(st), "transfer_leader await flushed log").await; + + match metrics_res { + Ok(metrics) => { + if fail(&metrics) { + tracing::warn!( + "Vote changed, give up Leader-transfer; expected vote: {}, metrics: {}", + from, + metrics + ); + return Ok(()); + } + tracing::info!( + "Leader-transfer condition satisfied, submit Leader-transfer message; \ + expected: (vote: {}, flushed_log: {})", + from, + flushed_log.display(), + ); + } + Err(err) => { + tracing::warn!( + "Leader-transfer condition fail to satisfy, still submit Leader-transfer; \ + expected: (vote: {}; flushed_log: {}), error: {}", + from, + flushed_log.display(), + err + ); + } + }; + + self.inner.send_msg(RaftMsg::TransferLeader { from, to }).await?; + + Ok(()) + } + /// Return `true` if this node is already initialized and can not be initialized again with /// [`Raft::initialize`] pub async fn is_initialized(&self) -> Result> { diff --git a/openraft/src/utime.rs b/openraft/src/utime.rs index c0cbab309..69de05552 100644 --- a/openraft/src/utime.rs +++ b/openraft/src/utime.rs @@ -14,13 +14,16 @@ pub(crate) struct Leased { data: T, last_update: Option, lease: Duration, + lease_enabled: bool, } impl fmt::Display for Leased { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let enabled = if self.lease_enabled { "enabled" } else { "disabled" }; + match self.last_update { - Some(utime) => write!(f, "{}@{}+{:?}", self.data, utime.display(), self.lease), - None => write!(f, "{}", self.data), + Some(utime) => write!(f, "{}@{}+{:?}({})", self.data, utime.display(), self.lease, enabled), + None => write!(f, "{}({})", self.data, enabled), } } } @@ -31,6 +34,7 @@ impl Default for Leased { data: T::default(), last_update: None, lease: Default::default(), + lease_enabled: true, } } } @@ -56,6 +60,7 @@ impl Leased { data, last_update: Some(now), lease, + lease_enabled: true, } } @@ -66,6 +71,7 @@ impl Leased { data, last_update: None, lease: Duration::default(), + lease_enabled: true, } } @@ -121,12 +127,14 @@ impl Leased { self.data = data; self.last_update = Some(now); self.lease = lease; + self.lease_enabled = true; } /// Reset the lease duration, so that the object expire at once. - #[allow(dead_code)] - pub(crate) fn reset_lease(&mut self) { + /// And until the next `update()`, [`Self::touch()`] wont update the lease. + pub(crate) fn disable_lease(&mut self) { self.lease = Duration::default(); + self.lease_enabled = false; } /// Checks if the value is expired based on the provided `now` timestamp. @@ -148,6 +156,8 @@ impl Leased { self.last_update.unwrap() - now, ); self.last_update = Some(now); - self.lease = lease; + if self.lease_enabled { + self.lease = lease; + } } } diff --git a/tests/tests/client_api/main.rs b/tests/tests/client_api/main.rs index e6453099b..e6d5ba0ac 100644 --- a/tests/tests/client_api/main.rs +++ b/tests/tests/client_api/main.rs @@ -14,6 +14,7 @@ mod t13_begin_receiving_snapshot; mod t13_get_snapshot; mod t13_install_full_snapshot; mod t13_trigger_snapshot; +mod t14_transfer_leader; mod t16_with_raft_state; mod t16_with_state_machine; mod t50_lagging_network_write; diff --git a/tests/tests/client_api/t14_transfer_leader.rs b/tests/tests/client_api/t14_transfer_leader.rs new file mode 100644 index 000000000..c886eb939 --- /dev/null +++ b/tests/tests/client_api/t14_transfer_leader.rs @@ -0,0 +1,65 @@ +use std::sync::Arc; +use std::time::Duration; + +use maplit::btreeset; +use openraft::Config; +use openraft::ServerState; + +use crate::fixtures::ut_harness; +use crate::fixtures::RaftRouter; + +/// Call [`transfer_leader`](openraft::raft::Raft::transfer_leader) on every non-leader node to +/// force establish a new leader. +#[tracing::instrument] +#[test_harness::test(harness = ut_harness)] +async fn transfer_leader() -> anyhow::Result<()> { + let config = Arc::new( + Config { + election_timeout_min: 150, + election_timeout_max: 300, + ..Default::default() + } + .validate()?, + ); + + let mut router = RaftRouter::new(config.clone()); + + tracing::info!("--- initializing cluster"); + let _log_index = router.new_cluster(btreeset! {0,1,2}, btreeset! {}).await?; + + let n0 = router.get_raft_handle(&0)?; + let n1 = router.get_raft_handle(&1)?; + let n2 = router.get_raft_handle(&2)?; + + let metrics = n0.metrics().borrow().clone(); + let leader_vote = metrics.vote; + let last_log_id = metrics.last_applied; + + tracing::info!("--- transfer Leader from 0 to 2"); + { + n1.transfer_leader(leader_vote, 2, last_log_id).await?; + n2.transfer_leader(leader_vote, 2, last_log_id).await?; + + n2.wait(timeout()).state(ServerState::Leader, "node-2 become leader").await?; + n0.wait(timeout()).state(ServerState::Follower, "node-0 become follower").await?; + } + + tracing::info!("--- can NOT transfer Leader from 2 to 0 with an old vote"); + { + n0.transfer_leader(leader_vote, 0, last_log_id).await?; + n1.transfer_leader(leader_vote, 0, last_log_id).await?; + + let n0_res = n0 + .wait(Some(Duration::from_millis(1_000))) + .state(ServerState::Leader, "node-0 can not become leader with old leader vote") + .await; + + assert!(n0_res.is_err()); + } + + Ok(()) +} + +fn timeout() -> Option { + Some(Duration::from_millis(500)) +}