Skip to content

Commit

Permalink
Merge pull request #961 from drmingdrmer/49-refine-wait
Browse files Browse the repository at this point in the history
Feature: add `Wait::voter_ids()` and deprecate `Wait::members()`
  • Loading branch information
drmingdrmer authored Dec 7, 2023
2 parents 10c7337 + 3d4cda9 commit b84e414
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 21 deletions.
56 changes: 39 additions & 17 deletions openraft/src/metrics/wait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ where
#[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))]
pub async fn current_leader(&self, leader_id: NID, msg: impl ToString) -> Result<RaftMetrics<NID, N>, WaitError> {
self.metrics(
|x| x.current_leader == Some(leader_id),
&format!("{} .current_leader -> {}", msg.to_string(), leader_id),
|m| m.current_leader == Some(leader_id),
&format!("{} .current_leader == {}", msg.to_string(), leader_id),
)
.await
}
Expand All @@ -133,13 +133,13 @@ where
pub async fn log(&self, want_log_index: Option<u64>, msg: impl ToString) -> Result<RaftMetrics<NID, N>, WaitError> {
self.metrics(
|x| x.last_log_index == want_log_index,
&format!("{} .last_log_index -> {:?}", msg.to_string(), want_log_index),
&format!("{} .last_log_index == {:?}", msg.to_string(), want_log_index),
)
.await?;

self.metrics(
|x| x.last_applied.index() == want_log_index,
&format!("{} .last_applied -> {:?}", msg.to_string(), want_log_index),
&format!("{} .last_applied == {:?}", msg.to_string(), want_log_index),
)
.await
}
Expand Down Expand Up @@ -169,7 +169,7 @@ where
#[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))]
pub async fn log_index(&self, index: Option<u64>, msg: impl ToString) -> Result<RaftMetrics<NID, N>, WaitError> {
self.metrics(
|x| x.last_log_index == index,
|m| m.last_log_index == index,
&format!("{} .last_log_index == {:?}", msg.to_string(), index),
)
.await
Expand All @@ -183,7 +183,7 @@ where
msg: impl ToString,
) -> Result<RaftMetrics<NID, N>, WaitError> {
self.metrics(
|x| x.last_log_index >= index,
|m| m.last_log_index >= index,
&format!("{} .last_log_index >= {:?}", msg.to_string(), index),
)
.await
Expand All @@ -197,7 +197,7 @@ where
msg: impl ToString,
) -> Result<RaftMetrics<NID, N>, WaitError> {
self.metrics(
|x| x.last_applied.index() == index,
|m| m.last_applied.index() == index,
&format!("{} .last_applied.index == {:?}", msg.to_string(), index),
)
.await
Expand All @@ -222,39 +222,61 @@ where
#[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))]
pub async fn state(&self, want_state: ServerState, msg: impl ToString) -> Result<RaftMetrics<NID, N>, WaitError> {
self.metrics(
|x| x.state == want_state,
&format!("{} .state -> {:?}", msg.to_string(), want_state),
|m| m.state == want_state,
&format!("{} .state == {:?}", msg.to_string(), want_state),
)
.await
}

/// Wait for `membership` to become the expected node id set or timeout.
#[deprecated(note = "use `voter_ids()` instead, deprecated since 0.9.0")]
#[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))]
pub async fn members(
&self,
want_members: BTreeSet<NID>,
msg: impl ToString,
) -> Result<RaftMetrics<NID, N>, WaitError> {
self.metrics(
|x| {
let got = x.membership_config.membership().voter_ids().collect::<BTreeSet<_>>();
|m| {
let got = m.membership_config.membership().voter_ids().collect::<BTreeSet<_>>();
want_members == got
},
&format!("{} .members -> {:?}", msg.to_string(), want_members),
)
.await
}

/// Wait for `snapshot` to become `want_snapshot` or timeout.
/// Block until membership contains exact the expected `voter_ids` or timeout.
#[tracing::instrument(level = "trace", skip_all, fields(msg=msg.to_string().as_str()))]
pub async fn voter_ids(
&self,
voter_ids: impl IntoIterator<Item = NID>,
msg: impl ToString,
) -> Result<RaftMetrics<NID, N>, WaitError> {
let want = voter_ids.into_iter().collect::<BTreeSet<_>>();

tracing::debug!("block until voter_ids == {:?}", want);

self.metrics(
|m| {
let got = m.membership_config.membership().voter_ids().collect();
want == got
},
&format!("{} .members == {:?}", msg.to_string(), want),
)
.await
}

/// Wait for `snapshot` to become `snapshot_last_log_id` or timeout.
#[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))]
pub async fn snapshot(
&self,
want_snapshot: LogId<NID>,
snapshot_last_log_id: LogId<NID>,
msg: impl ToString,
) -> Result<RaftMetrics<NID, N>, WaitError> {
self.metrics(
|x| x.snapshot == Some(want_snapshot),
&format!("{} .snapshot -> {}", msg.to_string(), want_snapshot),
|m| m.snapshot == Some(snapshot_last_log_id),
&format!("{} .snapshot == {}", msg.to_string(), snapshot_last_log_id),
)
.await
}
Expand All @@ -263,8 +285,8 @@ where
#[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))]
pub async fn purged(&self, want: Option<LogId<NID>>, msg: impl ToString) -> Result<RaftMetrics<NID, N>, WaitError> {
self.metrics(
|x| x.purged == want,
&format!("{} .purged -> {}", msg.to_string(), DisplayOption(&want)),
|m| m.purged == want,
&format!("{} .purged == {}", msg.to_string(), DisplayOption(&want)),
)
.await
}
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/metrics/wait_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ async fn test_wait() -> anyhow::Result<()> {
let rst = tx.send(update);
assert!(rst.is_ok());
});
let got = w.members(btreeset![1, 2], "members").await?;
let got = w.voter_ids([1, 2], "members").await?;
h.await?;

assert_eq!(
Expand Down
4 changes: 2 additions & 2 deletions tests/tests/append_entries/t11_append_updates_membership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ async fn append_updates_membership() -> Result<()> {
assert!(resp.is_success());
assert!(!resp.is_conflict());

r0.wait(timeout()).members(btreeset! {1,2,3,4}, "append-entries update membership").await?;
r0.wait(timeout()).voter_ids([1, 2, 3, 4], "append-entries update membership").await?;
}

tracing::info!("--- delete inconsistent logs update membership");
Expand All @@ -84,7 +84,7 @@ async fn append_updates_membership() -> Result<()> {
assert!(resp.is_success());
assert!(!resp.is_conflict());

r0.wait(timeout()).members(btreeset! {1,2}, "deleting inconsistent logs updates membership").await?;
r0.wait(timeout()).voter_ids([1, 2], "deleting inconsistent logs updates membership").await?;
}

Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ async fn snapshot_to_unreachable_node_should_not_block() -> Result<()> {
);
{
n0.change_membership(btreeset! {0}, true).await?;
n0.wait(timeout()).members(btreeset! {0}, "change membership to {{0}}").await?;
n0.wait(timeout()).voter_ids([0], "change membership to {{0}}").await?;
}

Ok(())
Expand Down

0 comments on commit b84e414

Please sign in to comment.