From 908b5c0ef0a4bed49945481093771ada4edd4c82 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Wed, 24 Jul 2024 14:39:37 +0800 Subject: [PATCH] Feature: add absolute timestamp `RaftMetrics::last_quorum_acked` `RaftMetrics::last_quorum_acked` is the absolute timestamp of the most recent time point that is accepted by a quorum via `AppendEntries` RPC. This field is a wrapped `Instant` type: `SerdeInstant` which support serde for `Instant`. This field is added as a replacement of `millis_since_quorum_ack`, which is a relative time. `SerdeInstant` serialize `Instant` into a string formatted as "%Y-%m-%dT%H:%M:%S%.9f%z", e.g., "2024-07-24T04:07:32.567025000+0000". Note: Serialization and deserialization are not perfectly accurate and can be indeterministic, resulting in minor variations each time. These deviations(could be smaller or greater) are typically less than a microsecond (10^-6 seconds). --- openraft/src/core/raft_core.rs | 5 + openraft/src/error.rs | 2 - openraft/src/metrics/mod.rs | 2 + openraft/src/metrics/raft_metrics.rs | 73 +++++++- openraft/src/metrics/serde_instant.rs | 201 +++++++++++++++++++++ openraft/src/metrics/wait_test.rs | 2 + scripts/check.kdl | 3 - tests/tests/fixtures/mod.rs | 13 +- tests/tests/metrics/t10_leader_last_ack.rs | 114 ++++++++++++ 9 files changed, 397 insertions(+), 18 deletions(-) create mode 100644 openraft/src/metrics/serde_instant.rs diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 082dbc450..535d31af0 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -60,6 +60,7 @@ use crate::metrics::RaftDataMetrics; use crate::metrics::RaftMetrics; use crate::metrics::RaftServerMetrics; use crate::metrics::ReplicationMetrics; +use crate::metrics::SerdeInstant; use crate::network::v2::RaftNetworkV2; use crate::network::RPCOption; use crate::network::RPCTypes; @@ -553,6 +554,7 @@ where let membership_config = st.membership_state.effective().stored_membership().clone(); let current_leader = self.current_leader(); + #[allow(deprecated)] let m = RaftMetrics { running_state: Ok(()), id: self.id, @@ -569,6 +571,7 @@ where state: st.server_state, current_leader, millis_since_quorum_ack, + last_quorum_acked: last_quorum_acked.map(SerdeInstant::new), membership_config: membership_config.clone(), heartbeat: heartbeat.clone(), @@ -576,12 +579,14 @@ where replication: replication.clone(), }; + #[allow(deprecated)] let data_metrics = RaftDataMetrics { last_log: st.last_log_id().copied(), last_applied: st.io_applied().copied(), snapshot: st.io_snapshot_last_log_id().copied(), purged: st.io_purged().copied(), millis_since_quorum_ack, + last_quorum_acked: last_quorum_acked.map(SerdeInstant::new), replication, heartbeat, }; diff --git a/openraft/src/error.rs b/openraft/src/error.rs index 01deaffe9..4ce978b9e 100644 --- a/openraft/src/error.rs +++ b/openraft/src/error.rs @@ -232,8 +232,6 @@ where C: RaftTypeConfig #[error(transparent)] Closed(#[from] ReplicationClosed), - // TODO(xp): two sub type: StorageError / TransportError - // TODO(xp): a sub error for just append_entries() #[error(transparent)] StorageError(#[from] StorageError), diff --git a/openraft/src/metrics/mod.rs b/openraft/src/metrics/mod.rs index 297aadc91..f62f337a4 100644 --- a/openraft/src/metrics/mod.rs +++ b/openraft/src/metrics/mod.rs @@ -32,6 +32,7 @@ mod raft_metrics; mod wait; mod metric_display; +mod serde_instant; mod wait_condition; #[cfg(test)] mod wait_test; @@ -42,6 +43,7 @@ pub use metric::Metric; pub use raft_metrics::RaftDataMetrics; pub use raft_metrics::RaftMetrics; pub use raft_metrics::RaftServerMetrics; +pub use serde_instant::SerdeInstant; pub use wait::Wait; pub use wait::WaitError; pub(crate) use wait_condition::Condition; diff --git a/openraft/src/metrics/raft_metrics.rs b/openraft/src/metrics/raft_metrics.rs index 1226fc07d..552e6c55c 100644 --- a/openraft/src/metrics/raft_metrics.rs +++ b/openraft/src/metrics/raft_metrics.rs @@ -4,10 +4,12 @@ use std::sync::Arc; use crate::core::ServerState; use crate::display_ext::DisplayBTreeMapOptValue; use crate::display_ext::DisplayOption; -use crate::display_ext::DisplayOptionExt; use crate::error::Fatal; use crate::metrics::HeartbeatMetrics; use crate::metrics::ReplicationMetrics; +use crate::metrics::SerdeInstant; +use crate::type_config::alias::InstantOf; +use crate::Instant; use crate::LogId; use crate::RaftTypeConfig; use crate::StoredMembership; @@ -69,8 +71,27 @@ pub struct RaftMetrics { /// synchronization with the cluster. /// A longer duration without acknowledgment may suggest a higher probability of the leader /// being partitioned from the cluster. + /// + /// Use `last_quorum_acked` instead, which is absolute timestamp. + /// This value relates to the time when metrics is reported, which may behind the current time + /// by an unknown duration(although it should be very small). + #[deprecated(since = "0.10.0", note = "use `last_quorum_acked` instead.")] pub millis_since_quorum_ack: Option, + /// For a leader, it is the most recently acknowledged timestamp by a quorum. + /// + /// It is `None` if this node is not leader, or the leader is not yet acknowledged by a quorum. + /// Being acknowledged means receiving a reply of + /// `AppendEntries`(`AppendEntriesRequest.vote.committed == true`). + /// Receiving a reply of `RequestVote`(`RequestVote.vote.committed == false`) does not count, + /// because a node will not maintain a lease for a vote with `committed == false`. + /// + /// This timestamp can be used by the application to assess the likelihood that the leader has + /// lost synchronization with the cluster. + /// An older value may suggest a higher probability of the leader being partitioned from the + /// cluster. + pub last_quorum_acked: Option>>, + /// The current membership config of the cluster. pub membership_config: Arc>, @@ -98,7 +119,7 @@ where C: RaftTypeConfig write!( f, - "id:{}, {:?}, term:{}, vote:{}, last_log:{}, last_applied:{}, leader:{}(since_last_ack:{} ms)", + "id:{}, {:?}, term:{}, vote:{}, last_log:{}, last_applied:{}, leader:{}", self.id, self.state, self.current_term, @@ -106,9 +127,19 @@ where C: RaftTypeConfig DisplayOption(&self.last_log_index), DisplayOption(&self.last_applied), DisplayOption(&self.current_leader), - DisplayOption(&self.millis_since_quorum_ack), )?; + if let Some(quorum_acked) = &self.last_quorum_acked { + write!( + f, + "(quorum_acked_time:{}, {:?} ago)", + quorum_acked, + quorum_acked.elapsed() + )?; + } else { + write!(f, "(quorum_acked_time:None)")?; + } + write!(f, ", ")?; write!( f, @@ -129,6 +160,7 @@ impl RaftMetrics where C: RaftTypeConfig { pub fn new_initial(id: C::NodeId) -> Self { + #[allow(deprecated)] Self { running_state: Ok(()), id, @@ -143,6 +175,7 @@ where C: RaftTypeConfig state: ServerState::Follower, current_leader: None, millis_since_quorum_ack: None, + last_quorum_acked: None, membership_config: Arc::new(StoredMembership::default()), replication: None, heartbeat: None, @@ -172,8 +205,23 @@ pub struct RaftDataMetrics { /// synchronization with the cluster. /// A longer duration without acknowledgment may suggest a higher probability of the leader /// being partitioned from the cluster. + #[deprecated(since = "0.10.0", note = "use `last_quorum_acked` instead.")] pub millis_since_quorum_ack: Option, + /// For a leader, it is the most recently acknowledged timestamp by a quorum. + /// + /// It is `None` if this node is not leader, or the leader is not yet acknowledged by a quorum. + /// Being acknowledged means receiving a reply of + /// `AppendEntries`(`AppendEntriesRequest.vote.committed == true`). + /// Receiving a reply of `RequestVote`(`RequestVote.vote.committed == false`) does not count, + /// because a node will not maintain a lease for a vote with `committed == false`. + /// + /// This timestamp can be used by the application to assess the likelihood that the leader has + /// lost synchronization with the cluster. + /// An older value may suggest a higher probability of the leader being partitioned from the + /// cluster. + pub last_quorum_acked: Option>>, + pub replication: Option>, /// Heartbeat metrics. It is Some() only when this node is leader. @@ -194,12 +242,27 @@ where C: RaftTypeConfig write!( f, - "last_log:{}, last_applied:{}, snapshot:{}, purged:{}, quorum_acked(leader):{} ms before, replication:{{{}}}, heartbeat:{{{}}}", + "last_log:{}, last_applied:{}, snapshot:{}, purged:{}", DisplayOption(&self.last_log), DisplayOption(&self.last_applied), DisplayOption(&self.snapshot), DisplayOption(&self.purged), - self.millis_since_quorum_ack.display(), + )?; + + if let Some(quorum_acked) = &self.last_quorum_acked { + write!( + f, + ", quorum_acked_time:({}, {:?} ago)", + quorum_acked, + quorum_acked.elapsed() + )?; + } else { + write!(f, ", quorum_acked_time:None")?; + } + + write!( + f, + ", replication:{{{}}}, heartbeat:{{{}}}", DisplayOption(&self.replication.as_ref().map(DisplayBTreeMapOptValue)), DisplayOption(&self.heartbeat.as_ref().map(DisplayBTreeMapOptValue)), )?; diff --git a/openraft/src/metrics/serde_instant.rs b/openraft/src/metrics/serde_instant.rs new file mode 100644 index 000000000..afaa53d24 --- /dev/null +++ b/openraft/src/metrics/serde_instant.rs @@ -0,0 +1,201 @@ +use std::fmt; +use std::fmt::Formatter; +use std::ops::Deref; + +use crate::display_ext::DisplayInstantExt; +use crate::Instant; + +/// A wrapper for [`Instant`] that supports serialization and deserialization. +/// +/// This struct serializes an `Instant` into a string formatted as "%Y-%m-%dT%H:%M:%S%.9f%z", e.g., +/// "2024-07-24T04:07:32.567025000+0000". +/// +/// Note: Serialization and deserialization are not perfectly accurate and can be indeterministic, +/// resulting in minor variations each time. These deviations(could be smaller or greater) are +/// typically less than a microsecond (10^-6 seconds). +#[derive(Debug, Clone, Copy)] +#[derive(PartialEq, Eq)] +#[derive(PartialOrd, Ord)] +pub struct SerdeInstant +where I: Instant +{ + inner: I, +} + +impl Deref for SerdeInstant +where I: Instant +{ + type Target = I; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl From for SerdeInstant +where I: Instant +{ + fn from(inner: I) -> Self { + Self { inner } + } +} + +impl fmt::Display for SerdeInstant +where I: Instant +{ + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + self.inner.display().fmt(f) + } +} + +impl SerdeInstant +where I: Instant +{ + pub fn new(inner: I) -> Self { + Self { inner } + } + + pub fn into_inner(self) -> I { + self.inner + } +} + +#[cfg(feature = "serde")] +mod serde_impl { + use std::fmt; + use std::marker::PhantomData; + use std::time::SystemTime; + + use chrono::DateTime; + use chrono::Utc; + use serde::de; + use serde::de::Visitor; + use serde::Deserialize; + use serde::Deserializer; + use serde::Serialize; + use serde::Serializer; + + use super::SerdeInstant; + use crate::Instant; + + impl SerdeInstant + where I: Instant + { + const SERDE_FMT: &'static str = "%Y-%m-%dT%H:%M:%S%.9f%z"; + } + + impl Serialize for SerdeInstant + where I: Instant + { + fn serialize(&self, serializer: S) -> Result + where S: Serializer { + // Convert Instant to SystemTime + let system_time = { + let sys_now = SystemTime::now(); + let now = I::now(); + + if now >= self.inner { + let d = now - self.inner; + sys_now - d + } else { + let d = self.inner - now; + sys_now + d + } + }; + + // Convert `SystemTime` to `DateTime` + let datetime: DateTime = system_time.into(); + + // Format the datetime to the desired string format + let datetime_str = datetime.format(Self::SERDE_FMT).to_string(); + + // Serialize the datetime string + serializer.serialize_str(&datetime_str) + } + } + + impl<'de, I> Deserialize<'de> for SerdeInstant + where I: Instant + { + fn deserialize(deserializer: D) -> Result + where D: Deserializer<'de> { + struct InstantVisitor(PhantomData); + + impl<'de, II: Instant> Visitor<'de> for InstantVisitor { + type Value = SerdeInstant; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + // formatter.write_str("a string representing a datetime in the format %Y-%m-%dT%H:%M:%S%.6f%z") + write!( + formatter, + "a string representing a datetime in the format {}", + SerdeInstant::::SERDE_FMT + ) + } + + fn visit_str(self, value: &str) -> Result + where E: de::Error { + // Parse the datetime string back to `DateTime` + // 2024-07-19T09:30:46.635735000 + let datetime = + DateTime::parse_from_str(value, SerdeInstant::::SERDE_FMT).map_err(de::Error::custom)?; + + // Convert `DateTime` to `SystemTime` + let system_time: SystemTime = datetime.with_timezone(&Utc).into(); + + // Calculate the `Instant` from the current time + let sys_now = SystemTime::now(); + let now = II::now(); + let instant = if system_time > sys_now { + now + (system_time.duration_since(sys_now).unwrap()) + } else { + now - (sys_now.duration_since(system_time).unwrap()) + }; + Ok(SerdeInstant { inner: instant }) + } + } + + deserializer.deserialize_str(InstantVisitor::(Default::default())) + } + } + + #[cfg(test)] + mod tests { + use std::time::Duration; + + use super::SerdeInstant; + use crate::engine::testing::UTConfig; + use crate::type_config::alias::InstantOf; + use crate::type_config::TypeConfigExt; + + #[test] + fn test_serde_instant() { + let now = UTConfig::<()>::now(); + let serde_instant = SerdeInstant::new(now); + let json = serde_json::to_string(&serde_instant).unwrap(); + let deserialized: SerdeInstant> = serde_json::from_str(&json).unwrap(); + + println!("Now: {:?}", now); + println!("Des: {:?}", *deserialized); + // Convert Instant to SerdeInstant is inaccurate. + if now > *deserialized { + assert!((now - *deserialized) < Duration::from_millis(500)); + } else { + assert!((*deserialized - now) < Duration::from_millis(500)); + } + + // Test serialization format + + let timestamp = r#""2024-07-24T04:07:32.567025000+0000""#; + let deserialized: SerdeInstant> = serde_json::from_str(timestamp).unwrap(); + let serialized = serde_json::to_string(&deserialized).unwrap(); + + assert_eq!( + timestamp[0..24], + serialized[0..24], + "compare upto milli seconds: {}", + ×tamp[0..24] + ); + } + } +} diff --git a/openraft/src/metrics/wait_test.rs b/openraft/src/metrics/wait_test.rs index 81df2b466..b380d70ec 100644 --- a/openraft/src/metrics/wait_test.rs +++ b/openraft/src/metrics/wait_test.rs @@ -249,6 +249,7 @@ pub(crate) type InitResult = (RaftMetrics, Wait, WatchSenderOf() -> InitResult where C: RaftTypeConfig { + #[allow(deprecated)] let init = RaftMetrics { running_state: Ok(()), id: NodeIdOf::::default(), @@ -261,6 +262,7 @@ where C: RaftTypeConfig { current_leader: None, millis_since_quorum_ack: None, + last_quorum_acked: None, membership_config: Arc::new(StoredMembership::new(None, Membership::new(vec![btreeset! {}], None))), heartbeat: None, diff --git a/scripts/check.kdl b/scripts/check.kdl index ac6245c1d..e1327a832 100755 --- a/scripts/check.kdl +++ b/scripts/check.kdl @@ -21,17 +21,14 @@ layout { pane { command "cargo" args "test" "--lib" - close_on_exit true } pane { command "cargo" args "test" "--test" "*" - close_on_exit true } pane { command "cargo" args "clippy" "--no-deps" "--all-targets" "--" "-D" "warnings" - close_on_exit true } } // status-bar diff --git a/tests/tests/fixtures/mod.rs b/tests/tests/fixtures/mod.rs index 99cdaa26b..8feb604f2 100644 --- a/tests/tests/fixtures/mod.rs +++ b/tests/tests/fixtures/mod.rs @@ -132,8 +132,10 @@ pub fn init_global_tracing(app_name: &str, dir: &str, level: &str) -> WorkerGuar } pub fn set_panic_hook() { - std::panic::set_hook(Box::new(|panic| { + let prev_hook = std::panic::take_hook(); + std::panic::set_hook(Box::new(move |panic| { log_panic(panic); + prev_hook(panic); })); } @@ -150,22 +152,17 @@ pub fn log_panic(panic: &PanicInfo) { } }; - eprintln!("{}", panic); - if let Some(location) = panic.location() { tracing::error!( - message = %panic, + message = %panic.to_string().replace("\n", " "), backtrace = %backtrace, panic.file = location.file(), panic.line = location.line(), panic.column = location.column(), ); - eprintln!("{}:{}:{}", location.file(), location.line(), location.column()); } else { - tracing::error!(message = %panic, backtrace = %backtrace); + tracing::error!(message = %panic.to_string().replace("\n", " "), backtrace = %backtrace); } - - eprintln!("{}", backtrace); } #[derive(Debug, Clone, Copy)] diff --git a/tests/tests/metrics/t10_leader_last_ack.rs b/tests/tests/metrics/t10_leader_last_ack.rs index 33c0b0eb5..d5edc5cfa 100644 --- a/tests/tests/metrics/t10_leader_last_ack.rs +++ b/tests/tests/metrics/t10_leader_last_ack.rs @@ -14,6 +14,7 @@ use crate::fixtures::RaftRouter; /// from RaftMetrics and RaftServerMetrics. #[tracing::instrument] #[test_harness::test(harness = ut_harness)] +#[allow(deprecated)] async fn leader_last_ack_3_nodes() -> Result<()> { let heartbeat_interval = 50; // ms let config = Arc::new( @@ -100,6 +101,7 @@ async fn leader_last_ack_3_nodes() -> Result<()> { let got = n0 .wait(timeout()) .metrics( + #[allow(deprecated)] |x| x.millis_since_quorum_ack < Some(100), "millis_since_quorum_ack refreshed again", ) @@ -114,6 +116,104 @@ async fn leader_last_ack_3_nodes() -> Result<()> { /// from RaftMetrics and RaftServerMetrics. #[tracing::instrument] #[test_harness::test(harness = ut_harness)] +async fn leader_last_ack_3_nodes_abs_time() -> Result<()> { + let heartbeat_interval = 50; // ms + let config = Arc::new( + Config { + enable_heartbeat: false, + heartbeat_interval, + enable_elect: false, + ..Default::default() + } + .validate()?, + ); + + let mut router = RaftRouter::new(config.clone()); + + let log_index = router.new_cluster(btreeset! {0,1,2}, btreeset! {}).await?; + + let n0 = router.get_raft_handle(&0)?; + let last_acked = n0.metrics().borrow().last_quorum_acked; + assert!(last_acked.as_deref() <= Some(&TypeConfig::now())); + + { + let last_acked = n0.data_metrics().borrow().last_quorum_acked; + assert!(last_acked.as_deref() <= Some(&TypeConfig::now())); + } + + tracing::info!(log_index, "--- sleep 500 ms, the `last_quorum_acked` should not change"); + { + TypeConfig::sleep(Duration::from_millis(500)).await; + + let acked2 = n0.metrics().borrow().last_quorum_acked; + println!("greater: {:?}", acked2); + assert_eq!(acked2, last_acked); + } + + let n0 = router.get_raft_handle(&0)?; + + tracing::info!(log_index, "--- heartbeat; last_quorum_acked refreshes"); + { + let now = TypeConfig::now(); + + n0.trigger().heartbeat().await?; + n0.wait(timeout()) + .metrics( + |x| x.last_quorum_acked.as_deref() >= Some(&now), + "last_quorum_acked refreshed", + ) + .await?; + } + + tracing::info!(log_index, "--- sleep and heartbeat again; last_quorum_acked refreshes"); + { + TypeConfig::sleep(Duration::from_millis(500)).await; + + let now = TypeConfig::now(); + n0.trigger().heartbeat().await?; + + n0.wait(timeout()) + .metrics( + |x| x.last_quorum_acked.as_deref() >= Some(&now), + "last_quorum_acked refreshed again", + ) + .await?; + } + + tracing::info!(log_index, "--- remove node 1 and node 2"); + { + router.remove_node(1); + router.remove_node(2); + } + + tracing::info!( + log_index, + "--- sleep and heartbeat again; last_quorum_acked does not refresh" + ); + { + TypeConfig::sleep(Duration::from_millis(500)).await; + + let now = TypeConfig::now(); + n0.trigger().heartbeat().await?; + + let got = n0 + .wait(timeout()) + .metrics( + |x| x.last_quorum_acked.as_deref() >= Some(&now), + "last_quorum_acked refreshed again", + ) + .await; + assert!(got.is_err(), "last_quorum_acked does not refresh"); + } + + Ok(()) +} + +/// Get the last timestamp when a leader is acknowledged by a quorum, +/// from RaftMetrics and RaftServerMetrics. +#[tracing::instrument] +#[test_harness::test(harness = ut_harness)] +#[allow(deprecated)] async fn leader_last_ack_1_node() -> Result<()> { let config = Arc::new( Config { @@ -139,6 +239,20 @@ async fn leader_last_ack_1_node() -> Result<()> { assert_eq!(millis, Some(0), "it is always acked for single leader"); } + let last_acked = n0.metrics().borrow().last_quorum_acked; + assert!( + last_acked.unwrap().elapsed() < Duration::from_millis(100), + "it is always acked for single leader" + ); + + { + let last_acked = n0.metrics().borrow().last_quorum_acked; + assert!( + last_acked.unwrap().elapsed() < Duration::from_millis(100), + "it is always acked for single leader" + ); + } + Ok(()) }