Skip to content

Commit

Permalink
Merge pull request #960 from drmingdrmer/47-metrics-wait
Browse files Browse the repository at this point in the history
Feature: Add new methods to `openraft::metrics::Wait`
  • Loading branch information
drmingdrmer authored Dec 7, 2023
2 parents 196bad3 + d160752 commit 10c7337
Show file tree
Hide file tree
Showing 38 changed files with 182 additions and 77 deletions.
2 changes: 1 addition & 1 deletion cluster_benchmark/tests/benchmark/bench_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ async fn do_bench(bench_config: &BenchConfig) -> anyhow::Result<()> {
handles.push(h)
}

leader.wait(timeout()).log_at_least(Some(total), "commit all written logs").await?;
leader.wait(timeout()).applied_index_at_least(Some(total), "commit all written logs").await?;

let elapsed = now.elapsed();

Expand Down
2 changes: 1 addition & 1 deletion cluster_benchmark/tests/benchmark/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl Router {

for (id, s) in rafts.iter_mut() {
tracing::info!(log_index, "--- wait init log: {}, index: {}", id, log_index);
s.wait(timeout()).log(Some(log_index), "init").await?;
s.wait(timeout()).applied_index(Some(log_index), "init").await?;
}

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion macros/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use proc_macro::TokenStream;

/// This macro either emits `#[async_trait::async_trait]` or `#[asnc_trait::async_trait(?Send)]`
/// This macro either emits `#[async_trait::async_trait]` or `#[async_trait::async_trait(?Send)]`
/// based on the activated feature set.
///
/// This assumes that the `[async_trait](https://crates.io/crates/async-trait)` crate is imported
Expand Down
55 changes: 55 additions & 0 deletions openraft/src/metrics/wait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ where
}

/// Wait until applied exactly `want_log`(inclusive) logs or timeout.
#[deprecated(note = "use `log_index()` and `applied_index()` instead, deprecated since 0.9.0")]
#[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))]
pub async fn log(&self, want_log_index: Option<u64>, msg: impl ToString) -> Result<RaftMetrics<NID, N>, WaitError> {
self.metrics(
Expand All @@ -144,6 +145,7 @@ where
}

/// Wait until applied at least `want_log`(inclusive) logs or timeout.
#[deprecated(note = "use `log_index_at_least()` and `applied_index_at_least()` instead, deprecated since 0.9.0")]
#[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))]
pub async fn log_at_least(
&self,
Expand All @@ -163,6 +165,59 @@ where
.await
}

/// Block until the last log index becomes exactly `index`(inclusive) or timeout.
#[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))]
pub async fn log_index(&self, index: Option<u64>, msg: impl ToString) -> Result<RaftMetrics<NID, N>, WaitError> {
self.metrics(
|x| x.last_log_index == index,
&format!("{} .last_log_index == {:?}", msg.to_string(), index),
)
.await
}

/// Block until the last log index becomes at least `index`(inclusive) or timeout.
#[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))]
pub async fn log_index_at_least(
&self,
index: Option<u64>,
msg: impl ToString,
) -> Result<RaftMetrics<NID, N>, WaitError> {
self.metrics(
|x| x.last_log_index >= index,
&format!("{} .last_log_index >= {:?}", msg.to_string(), index),
)
.await
}

/// Block until the applied index becomes exactly `index`(inclusive) or timeout.
#[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))]
pub async fn applied_index(
&self,
index: Option<u64>,
msg: impl ToString,
) -> Result<RaftMetrics<NID, N>, WaitError> {
self.metrics(
|x| x.last_applied.index() == index,
&format!("{} .last_applied.index == {:?}", msg.to_string(), index),
)
.await
}

/// Block until the last applied log index become at least `index`(inclusive) or timeout.
/// Note that this also implies `last_log_id >= index`.
#[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))]
pub async fn applied_index_at_least(
&self,
index: Option<u64>,
msg: impl ToString,
) -> Result<RaftMetrics<NID, N>, WaitError> {
self.metrics(
|m| m.last_log_index >= index && m.last_applied.index() >= index,
&format!("{} .last_applied.index >= {:?}", msg.to_string(), index),
)
.await
}

/// Wait for `state` to become `want_state` or timeout.
#[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))]
pub async fn state(&self, want_state: ServerState, msg: impl ToString) -> Result<RaftMetrics<NID, N>, WaitError> {
Expand Down
38 changes: 33 additions & 5 deletions openraft/src/metrics/wait_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async fn test_wait() -> anyhow::Result<()> {
}

{
// wait for log
// wait for applied log
let (init, w, tx) = init_wait_test::<u64, ()>();

let h = tokio::spawn(async move {
Expand All @@ -52,10 +52,10 @@ async fn test_wait() -> anyhow::Result<()> {
let rst = tx.send(update);
assert!(rst.is_ok());
});
let got = w.log(Some(3), "log").await?;
let got_least2 = w.log_at_least(Some(2), "log").await?;
let got_least3 = w.log_at_least(Some(3), "log").await?;
let got_least4 = w.log_at_least(Some(4), "log").await;
let got = w.applied_index(Some(3), "log").await?;
let got_least2 = w.applied_index_at_least(Some(2), "log").await?;
let got_least3 = w.applied_index_at_least(Some(3), "log").await?;
let got_least4 = w.applied_index_at_least(Some(4), "log").await;
h.await?;

assert_eq!(Some(3), got.last_log_index);
Expand Down Expand Up @@ -173,6 +173,34 @@ async fn test_wait() -> anyhow::Result<()> {
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn test_wait_log_index() -> anyhow::Result<()> {
// wait for applied log
let (init, w, tx) = init_wait_test::<u64, ()>();

let h = tokio::spawn(async move {
sleep(Duration::from_millis(10)).await;
let mut update = init.clone();
update.last_log_index = Some(3);
let rst = tx.send(update);
assert!(rst.is_ok());
});

let got = w.log_index(Some(3), "log").await?;
let got_least2 = w.log_index_at_least(Some(2), "log").await?;
let got_least3 = w.log_index_at_least(Some(3), "log").await?;
let got_least4 = w.log_index_at_least(Some(4), "log").await;
h.await?;

assert_eq!(Some(3), got.last_log_index);
assert_eq!(Some(3), got_least2.last_log_index);
assert_eq!(Some(3), got_least3.last_log_index);

assert!(got_least4.is_err());

Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn test_wait_vote() -> anyhow::Result<()> {
let (init, w, tx) = init_wait_test::<u64, ()>();
Expand Down
2 changes: 1 addition & 1 deletion tests/tests/append_entries/t11_append_inconsistent_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ async fn append_inconsistent_log() -> Result<()> {
router
.wait(&0, Some(Duration::from_millis(2000)))
// leader appends at least one blank log. There may be more than one transient leaders
.log_at_least(Some(log_index), "sync log to node 0")
.applied_index_at_least(Some(log_index), "sync log to node 0")
.await?;

let logs = sto0.get_log_entries(60..=60).await?;
Expand Down
2 changes: 1 addition & 1 deletion tests/tests/append_entries/t60_enable_heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async fn enable_heartbeat() -> Result<()> {
// no new log will be sent, .
router
.wait(&node_id, timeout())
.log_at_least(Some(log_index), format!("node {} emit heartbeat log", node_id))
.applied_index_at_least(Some(log_index), format!("node {} emit heartbeat log", node_id))
.await?;

// leader lease is extended.
Expand Down
4 changes: 2 additions & 2 deletions tests/tests/append_entries/t61_heartbeat_reject_vote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,15 @@ async fn heartbeat_reject_vote() -> Result<()> {
{
// TODO: this part can be removed when blank-log heartbeat is removed.
sleep(Duration::from_millis(1500)).await;
router.wait(&1, timeout()).log(Some(log_index), "no log is written").await?;
router.wait(&1, timeout()).applied_index(Some(log_index), "no log is written").await?;
}

tracing::info!(log_index, "--- disable heartbeat, vote request will be granted");
{
node0.runtime_config().heartbeat(false);
sleep(Duration::from_millis(1500)).await;

router.wait(&1, timeout()).log(Some(log_index), "no log is written").await?;
router.wait(&1, timeout()).applied_index(Some(log_index), "no log is written").await?;

let res = node1.vote(VoteRequest::new(Vote::new(10, 2), Some(log_id(10, 1, 10)))).await?;
assert!(res.vote_granted, "vote is granted after leader lease expired");
Expand Down
2 changes: 1 addition & 1 deletion tests/tests/append_entries/t61_large_heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async fn large_heartbeat() -> Result<()> {
router.client_request_many(0, "foo", 10).await?;
log_index += 10;

router.wait(&1, Some(Duration::from_millis(3_000))).log(Some(log_index), "").await?;
router.wait(&1, Some(Duration::from_millis(3_000))).applied_index(Some(log_index), "").await?;

Ok(())
}
8 changes: 4 additions & 4 deletions tests/tests/append_entries/t90_issue_216_stale_last_log_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ async fn stale_last_log_id() -> Result<()> {
log_index += n_ops as u64;
}

router.wait(&1, Some(Duration::from_millis(1000))).log(Some(log_index), "").await?;
router.wait(&2, Some(Duration::from_millis(1000))).log(Some(log_index), "").await?;
router.wait(&3, Some(Duration::from_millis(1000))).log(Some(log_index), "").await?;
router.wait(&4, Some(Duration::from_millis(1000))).log(Some(log_index), "").await?;
router.wait(&1, Some(Duration::from_millis(1000))).applied_index(Some(log_index), "").await?;
router.wait(&2, Some(Duration::from_millis(1000))).applied_index(Some(log_index), "").await?;
router.wait(&3, Some(Duration::from_millis(1000))).applied_index(Some(log_index), "").await?;
router.wait(&4, Some(Duration::from_millis(1000))).applied_index(Some(log_index), "").await?;

Ok(())
}
10 changes: 8 additions & 2 deletions tests/tests/client_api/t12_trigger_purge_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ async fn trigger_purge_log() -> anyhow::Result<()> {
log_index += router.client_request_many(0, "0", 10).await?;

for id in [0, 1, 2] {
router.wait(&id, timeout()).log(Some(log_index), format_args!("node-{} write logs", id)).await?;
router
.wait(&id, timeout())
.applied_index(Some(log_index), format_args!("node-{} write logs", id))
.await?;
}
}

Expand All @@ -53,7 +56,10 @@ async fn trigger_purge_log() -> anyhow::Result<()> {
log_index += router.client_request_many(0, "0", 10).await?;

for id in [0, 1, 2] {
router.wait(&id, timeout()).log(Some(log_index), format_args!("node-{} write logs", id)).await?;
router
.wait(&id, timeout())
.applied_index(Some(log_index), format_args!("node-{} write logs", id))
.await?;
}
}

Expand Down
4 changes: 2 additions & 2 deletions tests/tests/client_api/t13_trigger_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ async fn trigger_snapshot() -> anyhow::Result<()> {
router.client_request_many(0, "0", 10).await?;
log_index += 10;

router.wait(&0, timeout()).log(Some(log_index), "node-0 write logs").await?;
router.wait(&1, timeout()).log(Some(log_index), "node-1 write logs").await?;
router.wait(&0, timeout()).applied_index(Some(log_index), "node-0 write logs").await?;
router.wait(&1, timeout()).applied_index(Some(log_index), "node-1 write logs").await?;
}

tracing::info!(log_index, "--- trigger snapshot for node-0");
Expand Down
2 changes: 1 addition & 1 deletion tests/tests/fixtures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ impl TypedRaftRouter {
msg: &str,
) -> anyhow::Result<()> {
for i in node_ids.iter() {
self.wait(i, timeout).log(want_log, msg).await?;
self.wait(i, timeout).applied_index(want_log, msg).await?;
}
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion tests/tests/life_cycle/t10_initialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ async fn initialization() -> anyhow::Result<()> {
log_index += 1;

for node_id in [0, 1, 2] {
router.wait(&node_id, timeout()).log(Some(log_index), "init").await?;
router.wait(&node_id, timeout()).applied_index(Some(log_index), "init").await?;
}
}

Expand Down
2 changes: 1 addition & 1 deletion tests/tests/life_cycle/t50_single_follower_restart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ async fn single_follower_restart() -> anyhow::Result<()> {
router.client_request_many(0, "foo", 1).await?;
log_index += 1;

router.wait(&0, timeout()).log(Some(log_index), "node-0 works").await?;
router.wait(&0, timeout()).applied_index(Some(log_index), "node-0 works").await?;
}

Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ async fn single_leader_restart_re_apply_logs() -> anyhow::Result<()> {

tracing::info!(log_index, "--- a single leader should re-apply all logs");
{
router.wait(&0, timeout()).log(Some(log_index), "node-0 works").await?;
router.wait(&0, timeout()).applied_index(Some(log_index), "node-0 works").await?;
}

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion tests/tests/life_cycle/t90_issue_607_single_restart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ async fn single_restart() -> anyhow::Result<()> {
router.client_request_many(0, "foo", 1).await?;
log_index += 1;

router.wait(&0, timeout()).log(Some(log_index), "node-0 works").await?;
router.wait(&0, timeout()).applied_index(Some(log_index), "node-0 works").await?;
}

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion tests/tests/log_store/t10_save_committed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async fn write_committed_log_id_to_log_store() -> Result<()> {
log_index += router.client_request_many(0, "0", 10).await?;

for i in [0, 1, 2] {
router.wait(&i, timeout()).log(Some(log_index), "write logs").await?;
router.wait(&i, timeout()).applied_index(Some(log_index), "write logs").await?;
}

for id in [0, 1, 2] {
Expand Down
2 changes: 1 addition & 1 deletion tests/tests/membership/t10_learner_restart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ async fn learner_restart() -> Result<()> {

// restart node-1, assert the state as expected.
let restarted = Raft::new(1, config.clone(), router.clone(), sto1, sm1).await?;
restarted.wait(timeout()).log(Some(log_index), "log after restart").await?;
restarted.wait(timeout()).applied_index(Some(log_index), "log after restart").await?;
restarted.wait(timeout()).state(ServerState::Learner, "server state after restart").await?;

Ok(())
Expand Down
12 changes: 6 additions & 6 deletions tests/tests/membership/t11_add_learner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ async fn add_learner_basic() -> Result<()> {
log_index += 1;

assert_eq!(log_index, res.log_id.index);
router.wait(&0, timeout()).log(Some(log_index), "commit re-adding leader log").await?;
router.wait(&0, timeout()).applied_index(Some(log_index), "commit re-adding leader log").await?;
}

tracing::info!(log_index, "--- add new node node-1");
Expand Down Expand Up @@ -86,7 +86,7 @@ async fn add_learner_basic() -> Result<()> {
log_index += 1;

assert_eq!(log_index, res.log_id.index);
router.wait(&0, timeout()).log(Some(log_index), "commit re-adding node-1 log").await?;
router.wait(&0, timeout()).applied_index(Some(log_index), "commit re-adding node-1 log").await?;

let metrics = router.get_raft_handle(&0)?.metrics().borrow().clone();
let node_ids = metrics.membership_config.membership().nodes().map(|x| *x.0).collect::<Vec<_>>();
Expand Down Expand Up @@ -122,7 +122,7 @@ async fn add_learner_non_blocking() -> Result<()> {
router.client_request_many(0, "learner_add", 100 - log_index as usize).await?;
log_index = 100;

router.wait(&0, timeout()).log(Some(log_index), "received 100 logs").await?;
router.wait(&0, timeout()).applied_index(Some(log_index), "received 100 logs").await?;

router.new_raft_node(1).await;

Expand Down Expand Up @@ -279,7 +279,7 @@ async fn check_learner_after_leader_transferred() -> Result<()> {
{
router
.wait(&orig_leader_id, timeout())
.log(Some(log_index), "old leader commits 2 membership log")
.applied_index(Some(log_index), "old leader commits 2 membership log")
.await?;
}

Expand All @@ -291,7 +291,7 @@ async fn check_learner_after_leader_transferred() -> Result<()> {
for id in [1, 3, 4] {
router
.wait(&id, timeout())
.log_at_least(
.applied_index_at_least(
Some(log_index),
"node in new cluster finally commit at least one blank leader-initialize log",
)
Expand Down Expand Up @@ -325,7 +325,7 @@ async fn check_learner_after_leader_transferred() -> Result<()> {
log_index += 1;

for i in [1, 2, 3, 4] {
router.wait(&i, timeout()).log_at_least(Some(log_index), "learner recv new log").await?;
router.wait(&i, timeout()).applied_index_at_least(Some(log_index), "learner recv new log").await?;
}
}

Expand Down
Loading

0 comments on commit 10c7337

Please sign in to comment.