Skip to content

Commit

Permalink
Refactor: introducing CommittedVote, NonCommittedVote, and RefVote types
Browse files Browse the repository at this point in the history
- **Introduce Specialized Vote Types:**
  - Added `CommittedVote` and `NonCommittedVote` to distinguish between votes that have been
    accepted by a quorum and those that have not.
  - Introduced `RefVote` to provide a reference-based representation of votes, facilitating safer
    and more efficient comparisons.

- **Update Vote Management Logic:**
  - Replaced direct usage of `Vote` with `CommittedVote` and `NonCommittedVote` across various
    modules (`heartbeat/worker.rs`, `notification.rs`, `raft_core.rs`, etc.) to enhance type
    safety and clarity.
  - Modified comparison logic to leverage the new vote types, ensuring accurate and meaningful
    ordering based on vote status.

- **Enhance Notification Mechanisms:**
  - Updated notification structures to use the new vote types, improving the accuracy of messages
    related to vote responses and higher votes observed.
  • Loading branch information
drmingdrmer committed Oct 12, 2024
1 parent f1aba77 commit 6c91be7
Show file tree
Hide file tree
Showing 18 changed files with 226 additions and 105 deletions.
3 changes: 1 addition & 2 deletions openraft/src/core/heartbeat/worker.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::fmt;
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;

Expand Down Expand Up @@ -83,7 +82,7 @@ where
let option = RPCOption::new(timeout);

let payload = AppendEntriesRequest {
vote: *heartbeat.session_id.leader_vote.deref(),
vote: heartbeat.session_id.leader_vote.into_vote(),
prev_log_id: None,
leader_commit: heartbeat.committed,
entries: vec![],
Expand Down
16 changes: 8 additions & 8 deletions openraft/src/core/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use crate::raft_state::IOId;
use crate::replication;
use crate::replication::ReplicationSessionId;
use crate::type_config::alias::InstantOf;
use crate::vote::CommittedVote;
use crate::vote::NonCommittedVote;
use crate::RaftTypeConfig;
use crate::StorageError;
use crate::Vote;
Expand All @@ -22,21 +24,19 @@ where C: RaftTypeConfig
/// The candidate that sent the vote request.
///
/// A vote identifies a unique server state.
sender_vote: Vote<C::NodeId>,
candidate_vote: NonCommittedVote<C>,
},

/// Seen a higher `vote`.
/// A Leader sees a higher `vote` when replicating.
HigherVote {
/// The ID of the target node from which the new term was observed.
target: C::NodeId,

/// The higher vote observed.
higher: Vote<C::NodeId>,

/// The candidate or leader that sent the vote request.
///
/// A vote identifies a unique server state.
sender_vote: Vote<C::NodeId>,
/// The Leader that sent replication request.
leader_vote: CommittedVote<C>,
// TODO: need this?
// /// The cluster this replication works for.
// membership_log_id: Option<LogId<C::NodeId>>,
Expand Down Expand Up @@ -84,7 +84,7 @@ where C: RaftTypeConfig
Self::VoteResponse {
target,
resp,
sender_vote,
candidate_vote: sender_vote,
} => {
write!(
f,
Expand All @@ -95,7 +95,7 @@ where C: RaftTypeConfig
Self::HigherVote {
ref target,
higher: ref new_vote,
sender_vote: ref vote,
leader_vote: ref vote,
} => {
write!(
f,
Expand Down
103 changes: 67 additions & 36 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::borrow::Borrow;
use std::collections::BTreeMap;
use std::fmt;
use std::fmt::Debug;
use std::ops::Deref;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -93,14 +92,16 @@ use crate::type_config::alias::ResponderOf;
use crate::type_config::alias::WatchSenderOf;
use crate::type_config::async_runtime::MpscUnboundedReceiver;
use crate::type_config::TypeConfigExt;
use crate::vote::vote_status::VoteStatus;
use crate::vote::CommittedVote;
use crate::vote::NonCommittedVote;
use crate::ChangeMembers;
use crate::Instant;
use crate::LogId;
use crate::Membership;
use crate::OptionalSend;
use crate::RaftTypeConfig;
use crate::StorageError;
use crate::Vote;

/// A temp struct to hold the data for a node that is being applied.
#[derive(Debug)]
Expand Down Expand Up @@ -393,7 +394,7 @@ where
let send_res = core_tx.send(Notification::HigherVote {
target,
higher: vote,
sender_vote: my_vote,
leader_vote: my_vote.into_committed(),
});

if let Err(_e) = send_res {
Expand Down Expand Up @@ -566,7 +567,7 @@ where

// --- data ---
current_term: st.vote_ref().leader_id().get_term(),
vote: st.io_state().io_progress.flushed().map(|x| *x.vote_ref()).unwrap_or_default(),
vote: st.io_state().io_progress.flushed().map(|io_id| io_id.to_vote()).unwrap_or_default(),
last_log_index: st.last_log_id().index(),
last_applied: st.io_applied().copied(),
snapshot: st.io_snapshot_last_log_id().copied(),
Expand Down Expand Up @@ -598,7 +599,7 @@ where

let server_metrics = RaftServerMetrics {
id: self.id,
vote: st.io_state().io_progress.flushed().map(|x| *x.vote_ref()).unwrap_or_default(),
vote: st.io_state().io_progress.flushed().map(|io_id| io_id.to_vote()).unwrap_or_default(),
state: st.server_state,
current_leader,
membership_config,
Expand Down Expand Up @@ -1102,7 +1103,7 @@ where
let _ = tx.send(Notification::VoteResponse {
target,
resp,
sender_vote: vote,
candidate_vote: vote.into_non_committed(),
});
}
Err(err) => tracing::error!({error=%err, target=display(target)}, "while requesting vote"),
Expand Down Expand Up @@ -1306,7 +1307,7 @@ where
Notification::VoteResponse {
target,
resp,
sender_vote,
candidate_vote,
} => {
let now = C::now();

Expand All @@ -1319,7 +1320,7 @@ where

#[allow(clippy::collapsible_if)]
if self.engine.candidate.is_some() {
if self.does_vote_match(&sender_vote, "VoteResponse") {
if self.does_candidate_vote_match(&candidate_vote, "VoteResponse") {
self.engine.handle_vote_resp(target, resp);
}
}
Expand All @@ -1328,17 +1329,17 @@ where
Notification::HigherVote {
target,
higher,
sender_vote,
leader_vote,
} => {
tracing::info!(
target = display(target),
higher_vote = display(&higher),
sending_vote = display(&sender_vote),
sending_vote = display(&leader_vote),
"received Notification::HigherVote: {}",
func_name!()
);

if self.does_vote_match(&sender_vote, "HigherVote") {
if self.does_leader_vote_match(&leader_vote, "HigherVote") {
// Rejected vote change is ok.
let _ = self.engine.vote_handler().update_vote(&higher);
}
Expand Down Expand Up @@ -1407,7 +1408,7 @@ where
// local log wont revert when membership changes.
#[allow(clippy::collapsible_if)]
if self.engine.leader.is_some() {
if self.does_vote_match(io_id.vote_ref(), "LocalIO Notification") {
if self.does_leader_vote_match(&log_io_id.committed_vote, "LocalIO Notification") {
self.engine.replication_handler().update_local_progress(log_io_id.log_id);
}
}
Expand Down Expand Up @@ -1551,27 +1552,53 @@ where
self.engine.elect();
}

/// If a message is sent by a previous server state but is received by current server state,
/// If a message is sent by a previous Candidate but is received by current Candidate,
/// it is a stale message and should be just ignored.
fn does_vote_match(&self, sender_vote: &Vote<C::NodeId>, msg: impl fmt::Display) -> bool {
// Get the current leading vote:
// - If input `sender_vote` is committed, it is sent by a Leader. Therefore we check against current
// Leader's vote.
// - Otherwise, it is sent by a Candidate, we check against the current in progress voting state.
let my_vote = if sender_vote.is_committed() {
let l = self.engine.leader.as_ref();
l.map(|x| *x.committed_vote.deref())
fn does_candidate_vote_match(&self, candidate_vote: &NonCommittedVote<C>, msg: impl fmt::Display) -> bool {
// If it finished voting, Candidate's vote is None.
let Some(my_vote) = self.engine.candidate_ref().map(|x| *x.vote_ref()) else {
tracing::warn!(
"A message will be ignored because this node is no longer Candidate: \
msg sent by vote: {}; when ({})",
candidate_vote,
msg
);
return false;
};

if candidate_vote.leader_id() != my_vote.leader_id() {
tracing::warn!(
"A message will be ignored because vote changed: \
msg sent by vote: {}; current my vote: {}; when ({})",
candidate_vote,
my_vote,
msg
);
false
} else {
// If it finished voting, Candidate's vote is None.
let candidate = self.engine.candidate_ref();
candidate.map(|x| *x.vote_ref())
true
}
}

/// If a message is sent by a previous Leader but is received by current Leader,
/// it is a stale message and should be just ignored.
fn does_leader_vote_match(&self, leader_vote: &CommittedVote<C>, msg: impl fmt::Display) -> bool {
let Some(my_vote) = self.engine.leader.as_ref().map(|x| x.committed_vote) else {
tracing::warn!(
"A message will be ignored because this node is no longer Leader: \
msg sent by vote: {}; when ({})",
leader_vote,
msg
);
return false;
};

if Some(*sender_vote) != my_vote {
if leader_vote != &my_vote {
tracing::warn!(
"A message will be ignored because vote changed: msg sent by vote: {}; current my vote: {}; when ({})",
sender_vote,
my_vote.display(),
"A message will be ignored because vote changed: \
msg sent by vote: {}; current my vote: {}; when ({})",
leader_vote,
my_vote,
msg
);
false
Expand All @@ -1587,7 +1614,7 @@ where
session_id: &ReplicationSessionId<C>,
msg: impl fmt::Display + Copy,
) -> bool {
if !self.does_vote_match(session_id.vote_ref(), msg) {
if !self.does_leader_vote_match(&session_id.committed_vote(), msg) {
return false;
}

Expand Down Expand Up @@ -1683,7 +1710,7 @@ where
let last_log_id = *entries.last().unwrap().get_log_id();
tracing::debug!("AppendInputEntries: {}", DisplaySlice::<_>(&entries),);

let io_id = IOId::new_log_io(vote.into_committed(), Some(last_log_id));
let io_id = IOId::new_log_io(vote, Some(last_log_id));
let notify = Notification::LocalIO { io_id };
let callback = IOFlushed::new(notify, self.tx_notification.downgrade());

Expand All @@ -1706,12 +1733,16 @@ where

let _ = self.tx_notification.send(Notification::LocalIO { io_id: IOId::new(vote) });

let _ = self.tx_notification.send(Notification::VoteResponse {
target: self.id,
// last_log_id is not used when sending VoteRequest to local node
resp: VoteResponse::new(vote, None, true),
sender_vote: vote,
});
// If a non-committed vote is saved,
// there may be a candidate waiting for the response.
if let VoteStatus::Pending(non_committed) = vote.into_vote_status() {
let _ = self.tx_notification.send(Notification::VoteResponse {
target: self.id,
// last_log_id is not used when sending VoteRequest to local node
resp: VoteResponse::new(vote, None, true),
candidate_vote: non_committed,
});
}
}
Command::PurgeLog { upto } => {
self.log_store.purge(upto).await?;
Expand Down
7 changes: 3 additions & 4 deletions openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::ops::Deref;
use std::time::Duration;

use validit::Valid;
Expand Down Expand Up @@ -645,10 +644,10 @@ where C: RaftTypeConfig
// Before sending any log, update the vote.
// This could not fail because `internal_server_state` will be cleared
// once `state.vote` is changed to a value of other node.
let _res = self.vote_handler().update_vote(&vote);
let _res = self.vote_handler().update_vote(&vote.into_vote());
debug_assert!(_res.is_ok(), "commit vote can not fail but: {:?}", _res);

self.state.accept_io(IOId::new_log_io(vote.into_committed(), last_log_id));
self.state.accept_io(IOId::new_log_io(vote, last_log_id));

// No need to submit UpdateIOProgress command,
// IO progress is updated by the new blank log
Expand Down Expand Up @@ -752,7 +751,7 @@ where C: RaftTypeConfig
};

debug_assert!(
leader.committed_vote_ref().deref() >= self.state.vote_ref(),
leader.committed_vote_ref().into_vote() >= *self.state.vote_ref(),
"leader.vote({}) >= state.vote({})",
leader.committed_vote_ref(),
self.state.vote_ref()
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/engine/handler/establish_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ where C: RaftTypeConfig

if let Some(l) = self.leader.as_ref() {
#[allow(clippy::neg_cmp_op_on_partial_ord)]
if !(&vote > l.committed_vote_ref()) {
if !(vote > l.committed_vote_ref().into_vote()) {
tracing::warn!(
"vote is not greater than current existing leader vote. Do not establish new leader and quit"
);
Expand Down
6 changes: 5 additions & 1 deletion openraft/src/engine/handler/leader_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,11 @@ where C: RaftTypeConfig
self.state.vote.disable_lease();

self.output.push_command(Command::BroadcastTransferLeader {
req: TransferLeaderRequest::new(*self.leader.committed_vote, to, self.leader.last_log_id().copied()),
req: TransferLeaderRequest::new(
self.leader.committed_vote.into_vote(),
to,
self.leader.last_log_id().copied(),
),
});
}

Expand Down
2 changes: 1 addition & 1 deletion openraft/src/engine/handler/vote_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ where C: RaftTypeConfig
if let Some(l) = self.leader.as_mut() {
tracing::debug!("leading vote: {}", l.committed_vote,);

if l.committed_vote.leader_id() == self.state.vote_ref().leader_id() {
if l.committed_vote.into_vote().leader_id() == self.state.vote_ref().leader_id() {
tracing::debug!(
"vote still belongs to the same leader. Just updating vote is enough: node-{}, {}",
self.config.id,
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/proposer/leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ where
// Thus vote.voted_for() is this node.

// Safe unwrap: voted_for() is always non-None in Openraft
let node_id = self.committed_vote.leader_id().voted_for().unwrap();
let node_id = self.committed_vote.into_vote().leader_id().voted_for().unwrap();
let now = C::now();

tracing::debug!(
Expand Down
Loading

0 comments on commit 6c91be7

Please sign in to comment.