Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: add absolute timestamp RaftMetrics::last_quorum_acked #94

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ use crate::metrics::RaftDataMetrics;
use crate::metrics::RaftMetrics;
use crate::metrics::RaftServerMetrics;
use crate::metrics::ReplicationMetrics;
use crate::metrics::SerdeInstant;
use crate::network::v2::RaftNetworkV2;
use crate::network::RPCOption;
use crate::network::RPCTypes;
Expand Down Expand Up @@ -553,6 +554,7 @@ where
let membership_config = st.membership_state.effective().stored_membership().clone();
let current_leader = self.current_leader();

#[allow(deprecated)]
let m = RaftMetrics {
running_state: Ok(()),
id: self.id,
Expand All @@ -569,19 +571,22 @@ where
state: st.server_state,
current_leader,
millis_since_quorum_ack,
last_quorum_acked: last_quorum_acked.map(SerdeInstant::new),
membership_config: membership_config.clone(),
heartbeat: heartbeat.clone(),

// --- replication ---
replication: replication.clone(),
};

#[allow(deprecated)]
let data_metrics = RaftDataMetrics {
last_log: st.last_log_id().copied(),
last_applied: st.io_applied().copied(),
snapshot: st.io_snapshot_last_log_id().copied(),
purged: st.io_purged().copied(),
millis_since_quorum_ack,
last_quorum_acked: last_quorum_acked.map(SerdeInstant::new),
replication,
heartbeat,
};
Expand Down
2 changes: 0 additions & 2 deletions openraft/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,6 @@ where C: RaftTypeConfig
#[error(transparent)]
Closed(#[from] ReplicationClosed),

// TODO(xp): two sub type: StorageError / TransportError
// TODO(xp): a sub error for just append_entries()
#[error(transparent)]
StorageError(#[from] StorageError<C>),

Expand Down
2 changes: 2 additions & 0 deletions openraft/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ mod raft_metrics;
mod wait;

mod metric_display;
mod serde_instant;
mod wait_condition;
#[cfg(test)]
mod wait_test;
Expand All @@ -42,6 +43,7 @@ pub use metric::Metric;
pub use raft_metrics::RaftDataMetrics;
pub use raft_metrics::RaftMetrics;
pub use raft_metrics::RaftServerMetrics;
pub use serde_instant::SerdeInstant;
pub use wait::Wait;
pub use wait::WaitError;
pub(crate) use wait_condition::Condition;
Expand Down
73 changes: 68 additions & 5 deletions openraft/src/metrics/raft_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ use std::sync::Arc;
use crate::core::ServerState;
use crate::display_ext::DisplayBTreeMapOptValue;
use crate::display_ext::DisplayOption;
use crate::display_ext::DisplayOptionExt;
use crate::error::Fatal;
use crate::metrics::HeartbeatMetrics;
use crate::metrics::ReplicationMetrics;
use crate::metrics::SerdeInstant;
use crate::type_config::alias::InstantOf;
use crate::Instant;
use crate::LogId;
use crate::RaftTypeConfig;
use crate::StoredMembership;
Expand Down Expand Up @@ -69,8 +71,27 @@ pub struct RaftMetrics<C: RaftTypeConfig> {
/// synchronization with the cluster.
/// A longer duration without acknowledgment may suggest a higher probability of the leader
/// being partitioned from the cluster.
///
/// Use `last_quorum_acked` instead, which is absolute timestamp.
/// This value relates to the time when metrics is reported, which may behind the current time
/// by an unknown duration(although it should be very small).
#[deprecated(since = "0.10.0", note = "use `last_quorum_acked` instead.")]
pub millis_since_quorum_ack: Option<u64>,

/// For a leader, it is the most recently acknowledged timestamp by a quorum.
///
/// It is `None` if this node is not leader, or the leader is not yet acknowledged by a quorum.
/// Being acknowledged means receiving a reply of
/// `AppendEntries`(`AppendEntriesRequest.vote.committed == true`).
/// Receiving a reply of `RequestVote`(`RequestVote.vote.committed == false`) does not count,
/// because a node will not maintain a lease for a vote with `committed == false`.
///
/// This timestamp can be used by the application to assess the likelihood that the leader has
/// lost synchronization with the cluster.
/// An older value may suggest a higher probability of the leader being partitioned from the
/// cluster.
pub last_quorum_acked: Option<SerdeInstant<InstantOf<C>>>,

/// The current membership config of the cluster.
pub membership_config: Arc<StoredMembership<C>>,

Expand Down Expand Up @@ -98,17 +119,27 @@ where C: RaftTypeConfig

write!(
f,
"id:{}, {:?}, term:{}, vote:{}, last_log:{}, last_applied:{}, leader:{}(since_last_ack:{} ms)",
"id:{}, {:?}, term:{}, vote:{}, last_log:{}, last_applied:{}, leader:{}",
self.id,
self.state,
self.current_term,
self.vote,
DisplayOption(&self.last_log_index),
DisplayOption(&self.last_applied),
DisplayOption(&self.current_leader),
DisplayOption(&self.millis_since_quorum_ack),
)?;

if let Some(quorum_acked) = &self.last_quorum_acked {
write!(
f,
"(quorum_acked_time:{}, {:?} ago)",
quorum_acked,
quorum_acked.elapsed()
)?;
} else {
write!(f, "(quorum_acked_time:None)")?;
}

write!(f, ", ")?;
write!(
f,
Expand All @@ -129,6 +160,7 @@ impl<C> RaftMetrics<C>
where C: RaftTypeConfig
{
pub fn new_initial(id: C::NodeId) -> Self {
#[allow(deprecated)]
Self {
running_state: Ok(()),
id,
Expand All @@ -143,6 +175,7 @@ where C: RaftTypeConfig
state: ServerState::Follower,
current_leader: None,
millis_since_quorum_ack: None,
last_quorum_acked: None,
membership_config: Arc::new(StoredMembership::default()),
replication: None,
heartbeat: None,
Expand Down Expand Up @@ -172,8 +205,23 @@ pub struct RaftDataMetrics<C: RaftTypeConfig> {
/// synchronization with the cluster.
/// A longer duration without acknowledgment may suggest a higher probability of the leader
/// being partitioned from the cluster.
#[deprecated(since = "0.10.0", note = "use `last_quorum_acked` instead.")]
pub millis_since_quorum_ack: Option<u64>,

/// For a leader, it is the most recently acknowledged timestamp by a quorum.
///
/// It is `None` if this node is not leader, or the leader is not yet acknowledged by a quorum.
/// Being acknowledged means receiving a reply of
/// `AppendEntries`(`AppendEntriesRequest.vote.committed == true`).
/// Receiving a reply of `RequestVote`(`RequestVote.vote.committed == false`) does not count,
/// because a node will not maintain a lease for a vote with `committed == false`.
///
/// This timestamp can be used by the application to assess the likelihood that the leader has
/// lost synchronization with the cluster.
/// An older value may suggest a higher probability of the leader being partitioned from the
/// cluster.
pub last_quorum_acked: Option<SerdeInstant<InstantOf<C>>>,

pub replication: Option<ReplicationMetrics<C>>,

/// Heartbeat metrics. It is Some() only when this node is leader.
Expand All @@ -194,12 +242,27 @@ where C: RaftTypeConfig

write!(
f,
"last_log:{}, last_applied:{}, snapshot:{}, purged:{}, quorum_acked(leader):{} ms before, replication:{{{}}}, heartbeat:{{{}}}",
"last_log:{}, last_applied:{}, snapshot:{}, purged:{}",
DisplayOption(&self.last_log),
DisplayOption(&self.last_applied),
DisplayOption(&self.snapshot),
DisplayOption(&self.purged),
self.millis_since_quorum_ack.display(),
)?;

if let Some(quorum_acked) = &self.last_quorum_acked {
write!(
f,
", quorum_acked_time:({}, {:?} ago)",
quorum_acked,
quorum_acked.elapsed()
)?;
} else {
write!(f, ", quorum_acked_time:None")?;
}

write!(
f,
", replication:{{{}}}, heartbeat:{{{}}}",
DisplayOption(&self.replication.as_ref().map(DisplayBTreeMapOptValue)),
DisplayOption(&self.heartbeat.as_ref().map(DisplayBTreeMapOptValue)),
)?;
Expand Down
Loading
Loading