Skip to content

Commit

Permalink
Feature: Add Raft::transfer_leader()
Browse files Browse the repository at this point in the history
This methods force a node to invalidate current Leader's lease and start
to elect for the next new Leader.
It wont do anything if the current Leader has changed.
  • Loading branch information
drmingdrmer committed Aug 1, 2024
1 parent 7fa0f9b commit 0a11b92
Show file tree
Hide file tree
Showing 8 changed files with 180 additions and 7 deletions.
13 changes: 13 additions & 0 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1187,6 +1187,19 @@ where
RaftMsg::ExternalCoreRequest { req } => {
req(&self.engine.state);
}
RaftMsg::TransferLeader {
from: current_leader_vote,
to,
} => {
if self.engine.state.vote_ref() == &current_leader_vote {
tracing::info!("Transfer Leader from: {}, to {}", current_leader_vote, to);

self.engine.state.vote.disable_lease();
if self.id == to {
self.engine.elect();
}
}
}
RaftMsg::ExternalCommand { cmd } => {
tracing::info!(cmd = debug(&cmd), "received RaftMsg::ExternalCommand: {}", func_name!());

Expand Down
14 changes: 14 additions & 0 deletions openraft/src/core/raft_msg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,17 @@ where C: RaftTypeConfig
req: BoxOnce<'static, RaftState<C>>,
},

/// Transfer Leader to another node.
///
/// If this node is `to`, reset Leader lease and start election.
/// Otherwise, just reset Leader lease so that the node `to` can become Leader.
TransferLeader {
/// The vote of the Leader that is transferring the leadership.
from: Vote<C::NodeId>,
/// The assigned node to be the next Leader.
to: C::NodeId,
},

ExternalCommand {
cmd: ExternalCommand<C>,
},
Expand Down Expand Up @@ -129,6 +140,9 @@ where C: RaftTypeConfig
write!(f, "ChangeMembership: {:?}, retain: {}", changes, retain,)
}
RaftMsg::ExternalCoreRequest { .. } => write!(f, "External Request"),
RaftMsg::TransferLeader { from, to } => {
write!(f, "TransferLeader: from_leader: vote={}, to: {}", from, to)
}
RaftMsg::ExternalCommand { cmd } => {
write!(f, "ExternalCommand: {}", cmd)
}
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/metrics/raft_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub struct RaftMetrics<C: RaftTypeConfig> {
/// The current term of the Raft node.
pub current_term: u64,

/// The last accepted vote.
/// The last flushed vote.
pub vote: Vote<C::NodeId>,

/// The last log index has been appended to this Raft node's log.
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/metrics/wait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ where C: RaftTypeConfig
if now >= timeout_at {
return Err(WaitError::Timeout(
self.timeout,
format!("{} latest: {:?}", msg.to_string(), latest),
format!("{} latest: {}", msg.to_string(), latest),
));
}

Expand Down
70 changes: 70 additions & 0 deletions openraft/src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ use crate::core::sm;
use crate::core::sm::worker;
use crate::core::RaftCore;
use crate::core::Tick;
use crate::display_ext::DisplayOptionExt;
use crate::engine::Engine;
use crate::engine::EngineConfig;
use crate::error::CheckIsLeaderError;
Expand Down Expand Up @@ -87,6 +88,7 @@ use crate::type_config::alias::WatchReceiverOf;
use crate::type_config::TypeConfigExt;
use crate::LogId;
use crate::LogIdOptionExt;
use crate::LogIndexOptionExt;
use crate::OptionalSend;
use crate::RaftNetworkFactory;
use crate::RaftState;
Expand Down Expand Up @@ -630,6 +632,74 @@ where C: RaftTypeConfig
Ok(rx)
}

/// Submits LeaderTransfer command to this Raft node.
///
/// If this node is the `to` node, it resets the Leader lease and triggers an election when the
/// expected log entries are flushed.
/// Otherwise, it just resets the Leader lease to allow the `to` node to become the Leader.
///
/// To implement Leader transfer, Call this method on every node in the cluster with the same
/// arguments.
// TODO: Explain what the `from` Leader node do.
pub async fn transfer_leader(
&self,
from: Vote<C::NodeId>,
to: C::NodeId,
flushed_log: Option<LogId<C::NodeId>>,
) -> Result<(), Fatal<C>> {
// Reset the Leader lease at once and quit, if this is not the assigned next leader.
// Only the assigned next Leader waits for the log to be flushed.
if to != self.inner.id {
self.inner.send_msg(RaftMsg::TransferLeader { from, to }).await?;
return Ok(());
}

// If the next Leader is this node, wait for the log to be flushed to make sure the
// RequestVote.last_log_id is upto date.

// Condition satisfied to become Leader
let ok = |m: &RaftMetrics<C>| (from == m.vote && m.last_log_index.next_index() >= flushed_log.next_index());

// Condition failed to become Leader
let fail = |m: &RaftMetrics<C>| !(from >= m.vote);

Check failure on line 664 in openraft/src/raft/mod.rs

View workflow job for this annotation

GitHub Actions / lint

the use of negated comparison operators on partially ordered types produces code that is hard to read and refactor, please consider using the `partial_cmp` method instead, to make it clear that the two values could be incomparable

let timeout = Some(Duration::from_millis(self.inner.config.election_timeout_min));
let metrics_res =
self.wait(timeout).metrics(|st| ok(st) || fail(st), "transfer_leader await flushed log").await;

match metrics_res {
Ok(metrics) => {
if fail(&metrics) {
tracing::warn!(
"Vote changed, give up Leader-transfer; expected vote: {}, metrics: {}",
from,
metrics
);
return Ok(());
}
tracing::info!(
"Leader-transfer condition satisfied, submit Leader-transfer message; \
expected: (vote: {}, flushed_log: {})",
from,
flushed_log.display(),
);
}
Err(err) => {
tracing::warn!(
"Leader-transfer condition fail to satisfy, still submit Leader-transfer; \
expected: (vote: {}; flushed_log: {}), error: {}",
from,
flushed_log.display(),
err
);
}
};

self.inner.send_msg(RaftMsg::TransferLeader { from, to }).await?;

Ok(())
}

/// Return `true` if this node is already initialized and can not be initialized again with
/// [`Raft::initialize`]
pub async fn is_initialized(&self) -> Result<bool, Fatal<C>> {
Expand Down
20 changes: 15 additions & 5 deletions openraft/src/utime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@ pub(crate) struct Leased<T, I: Instant> {
data: T,
last_update: Option<I>,
lease: Duration,
lease_enabled: bool,
}

impl<T: fmt::Display, I: Instant> fmt::Display for Leased<T, I> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let enabled = if self.lease_enabled { "enabled" } else { "disabled" };

match self.last_update {
Some(utime) => write!(f, "{}@{}+{:?}", self.data, utime.display(), self.lease),
None => write!(f, "{}", self.data),
Some(utime) => write!(f, "{}@{}+{:?}({})", self.data, utime.display(), self.lease, enabled),
None => write!(f, "{}({})", self.data, enabled),
}
}
}
Expand All @@ -31,6 +34,7 @@ impl<T: Default, I: Instant> Default for Leased<T, I> {
data: T::default(),
last_update: None,
lease: Default::default(),
lease_enabled: true,
}
}
}
Expand All @@ -56,6 +60,7 @@ impl<T, I: Instant> Leased<T, I> {
data,
last_update: Some(now),
lease,
lease_enabled: true,
}
}

Expand All @@ -66,6 +71,7 @@ impl<T, I: Instant> Leased<T, I> {
data,
last_update: None,
lease: Duration::default(),
lease_enabled: true,
}
}

Expand Down Expand Up @@ -121,12 +127,14 @@ impl<T, I: Instant> Leased<T, I> {
self.data = data;
self.last_update = Some(now);
self.lease = lease;
self.lease_enabled = true;
}

/// Reset the lease duration, so that the object expire at once.
#[allow(dead_code)]
pub(crate) fn reset_lease(&mut self) {
/// And until the next `update()`, [`Self::touch()`] wont update the lease.
pub(crate) fn disable_lease(&mut self) {
self.lease = Duration::default();
self.lease_enabled = false;
}

/// Checks if the value is expired based on the provided `now` timestamp.
Expand All @@ -148,6 +156,8 @@ impl<T, I: Instant> Leased<T, I> {
self.last_update.unwrap() - now,
);
self.last_update = Some(now);
self.lease = lease;
if self.lease_enabled {
self.lease = lease;
}
}
}
1 change: 1 addition & 0 deletions tests/tests/client_api/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ mod t13_begin_receiving_snapshot;
mod t13_get_snapshot;
mod t13_install_full_snapshot;
mod t13_trigger_snapshot;
mod t14_transfer_leader;
mod t16_with_raft_state;
mod t16_with_state_machine;
mod t50_lagging_network_write;
Expand Down
65 changes: 65 additions & 0 deletions tests/tests/client_api/t14_transfer_leader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use std::sync::Arc;
use std::time::Duration;

use maplit::btreeset;
use openraft::Config;
use openraft::ServerState;

use crate::fixtures::ut_harness;
use crate::fixtures::RaftRouter;

/// Call [`transfer_leader`](openraft::raft::Raft::transfer_leader) on every non-leader node to
/// force establish a new leader.
#[tracing::instrument]
#[test_harness::test(harness = ut_harness)]
async fn transfer_leader() -> anyhow::Result<()> {
let config = Arc::new(
Config {
election_timeout_min: 150,
election_timeout_max: 300,
..Default::default()
}
.validate()?,
);

let mut router = RaftRouter::new(config.clone());

tracing::info!("--- initializing cluster");
let _log_index = router.new_cluster(btreeset! {0,1,2}, btreeset! {}).await?;

let n0 = router.get_raft_handle(&0)?;
let n1 = router.get_raft_handle(&1)?;
let n2 = router.get_raft_handle(&2)?;

let metrics = n0.metrics().borrow().clone();
let leader_vote = metrics.vote;
let last_log_id = metrics.last_applied;

tracing::info!("--- transfer Leader from 0 to 2");
{
n1.transfer_leader(leader_vote, 2, last_log_id).await?;
n2.transfer_leader(leader_vote, 2, last_log_id).await?;

n2.wait(timeout()).state(ServerState::Leader, "node-2 become leader").await?;
n0.wait(timeout()).state(ServerState::Follower, "node-0 become follower").await?;
}

tracing::info!("--- can NOT transfer Leader from 2 to 0 with an old vote");
{
n0.transfer_leader(leader_vote, 0, last_log_id).await?;
n1.transfer_leader(leader_vote, 0, last_log_id).await?;

let n0_res = n0
.wait(Some(Duration::from_millis(1_000)))
.state(ServerState::Leader, "node-0 can not become leader with old leader vote")
.await;

assert!(n0_res.is_err());
}

Ok(())
}

fn timeout() -> Option<Duration> {
Some(Duration::from_millis(500))
}

0 comments on commit 0a11b92

Please sign in to comment.