diff --git a/openraft/src/metrics/wait.rs b/openraft/src/metrics/wait.rs index f7c5ea23c..7eab434be 100644 --- a/openraft/src/metrics/wait.rs +++ b/openraft/src/metrics/wait.rs @@ -121,8 +121,8 @@ where #[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))] pub async fn current_leader(&self, leader_id: NID, msg: impl ToString) -> Result, WaitError> { self.metrics( - |x| x.current_leader == Some(leader_id), - &format!("{} .current_leader -> {}", msg.to_string(), leader_id), + |m| m.current_leader == Some(leader_id), + &format!("{} .current_leader == {}", msg.to_string(), leader_id), ) .await } @@ -133,13 +133,13 @@ where pub async fn log(&self, want_log_index: Option, msg: impl ToString) -> Result, WaitError> { self.metrics( |x| x.last_log_index == want_log_index, - &format!("{} .last_log_index -> {:?}", msg.to_string(), want_log_index), + &format!("{} .last_log_index == {:?}", msg.to_string(), want_log_index), ) .await?; self.metrics( |x| x.last_applied.index() == want_log_index, - &format!("{} .last_applied -> {:?}", msg.to_string(), want_log_index), + &format!("{} .last_applied == {:?}", msg.to_string(), want_log_index), ) .await } @@ -169,7 +169,7 @@ where #[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))] pub async fn log_index(&self, index: Option, msg: impl ToString) -> Result, WaitError> { self.metrics( - |x| x.last_log_index == index, + |m| m.last_log_index == index, &format!("{} .last_log_index == {:?}", msg.to_string(), index), ) .await @@ -183,7 +183,7 @@ where msg: impl ToString, ) -> Result, WaitError> { self.metrics( - |x| x.last_log_index >= index, + |m| m.last_log_index >= index, &format!("{} .last_log_index >= {:?}", msg.to_string(), index), ) .await @@ -197,7 +197,7 @@ where msg: impl ToString, ) -> Result, WaitError> { self.metrics( - |x| x.last_applied.index() == index, + |m| m.last_applied.index() == index, &format!("{} .last_applied.index == {:?}", msg.to_string(), index), ) .await @@ -222,13 +222,14 @@ where #[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))] pub async fn state(&self, want_state: ServerState, msg: impl ToString) -> Result, WaitError> { self.metrics( - |x| x.state == want_state, - &format!("{} .state -> {:?}", msg.to_string(), want_state), + |m| m.state == want_state, + &format!("{} .state == {:?}", msg.to_string(), want_state), ) .await } /// Wait for `membership` to become the expected node id set or timeout. + #[deprecated(note = "use `voter_ids()` instead, deprecated since 0.9.0")] #[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))] pub async fn members( &self, @@ -236,8 +237,8 @@ where msg: impl ToString, ) -> Result, WaitError> { self.metrics( - |x| { - let got = x.membership_config.membership().voter_ids().collect::>(); + |m| { + let got = m.membership_config.membership().voter_ids().collect::>(); want_members == got }, &format!("{} .members -> {:?}", msg.to_string(), want_members), @@ -245,16 +246,37 @@ where .await } - /// Wait for `snapshot` to become `want_snapshot` or timeout. + /// Block until membership contains exact the expected `voter_ids` or timeout. + #[tracing::instrument(level = "trace", skip_all, fields(msg=msg.to_string().as_str()))] + pub async fn voter_ids( + &self, + voter_ids: impl IntoIterator, + msg: impl ToString, + ) -> Result, WaitError> { + let want = voter_ids.into_iter().collect::>(); + + tracing::debug!("block until voter_ids == {:?}", want); + + self.metrics( + |m| { + let got = m.membership_config.membership().voter_ids().collect(); + want == got + }, + &format!("{} .members == {:?}", msg.to_string(), want), + ) + .await + } + + /// Wait for `snapshot` to become `snapshot_last_log_id` or timeout. #[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))] pub async fn snapshot( &self, - want_snapshot: LogId, + snapshot_last_log_id: LogId, msg: impl ToString, ) -> Result, WaitError> { self.metrics( - |x| x.snapshot == Some(want_snapshot), - &format!("{} .snapshot -> {}", msg.to_string(), want_snapshot), + |m| m.snapshot == Some(snapshot_last_log_id), + &format!("{} .snapshot == {}", msg.to_string(), snapshot_last_log_id), ) .await } @@ -263,8 +285,8 @@ where #[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))] pub async fn purged(&self, want: Option>, msg: impl ToString) -> Result, WaitError> { self.metrics( - |x| x.purged == want, - &format!("{} .purged -> {}", msg.to_string(), DisplayOption(&want)), + |m| m.purged == want, + &format!("{} .purged == {}", msg.to_string(), DisplayOption(&want)), ) .await } diff --git a/openraft/src/metrics/wait_test.rs b/openraft/src/metrics/wait_test.rs index 233d3a392..7b22028ba 100644 --- a/openraft/src/metrics/wait_test.rs +++ b/openraft/src/metrics/wait_test.rs @@ -99,7 +99,7 @@ async fn test_wait() -> anyhow::Result<()> { let rst = tx.send(update); assert!(rst.is_ok()); }); - let got = w.members(btreeset![1, 2], "members").await?; + let got = w.voter_ids([1, 2], "members").await?; h.await?; assert_eq!( diff --git a/tests/tests/append_entries/t11_append_updates_membership.rs b/tests/tests/append_entries/t11_append_updates_membership.rs index 6d90734f3..d33f3fb4d 100644 --- a/tests/tests/append_entries/t11_append_updates_membership.rs +++ b/tests/tests/append_entries/t11_append_updates_membership.rs @@ -68,7 +68,7 @@ async fn append_updates_membership() -> Result<()> { assert!(resp.is_success()); assert!(!resp.is_conflict()); - r0.wait(timeout()).members(btreeset! {1,2,3,4}, "append-entries update membership").await?; + r0.wait(timeout()).voter_ids([1, 2, 3, 4], "append-entries update membership").await?; } tracing::info!("--- delete inconsistent logs update membership"); @@ -84,7 +84,7 @@ async fn append_updates_membership() -> Result<()> { assert!(resp.is_success()); assert!(!resp.is_conflict()); - r0.wait(timeout()).members(btreeset! {1,2}, "deleting inconsistent logs updates membership").await?; + r0.wait(timeout()).voter_ids([1, 2], "deleting inconsistent logs updates membership").await?; } Ok(()) diff --git a/tests/tests/snapshot_streaming/t90_issue_808_snapshot_to_unreachable_node_should_not_block.rs b/tests/tests/snapshot_streaming/t90_issue_808_snapshot_to_unreachable_node_should_not_block.rs index bf39d7837..f239f26fb 100644 --- a/tests/tests/snapshot_streaming/t90_issue_808_snapshot_to_unreachable_node_should_not_block.rs +++ b/tests/tests/snapshot_streaming/t90_issue_808_snapshot_to_unreachable_node_should_not_block.rs @@ -52,7 +52,7 @@ async fn snapshot_to_unreachable_node_should_not_block() -> Result<()> { ); { n0.change_membership(btreeset! {0}, true).await?; - n0.wait(timeout()).members(btreeset! {0}, "change membership to {{0}}").await?; + n0.wait(timeout()).voter_ids([0], "change membership to {{0}}").await?; } Ok(())