Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: remove sleep_secs method when waiting the cluster to shutdown #786

Merged
merged 2 commits into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 51 additions & 1 deletion crates/curp/tests/it/common/curp_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ use engine::{
Engine, EngineType, MemorySnapshotAllocator, RocksSnapshotAllocator, Snapshot,
SnapshotAllocator,
};
use futures::{future::join_all, Future};
use futures::{future::join_all, stream::FuturesUnordered, Future};
use itertools::Itertools;
use tokio::{
net::TcpListener,
runtime::{Handle, Runtime},
sync::{mpsc, watch},
task::{block_in_place, JoinHandle},
time::timeout,
};
use tokio_stream::wrappers::TcpListenerStream;
use tonic::transport::{Certificate, Channel, ClientTlsConfig, Endpoint, ServerTlsConfig};
Expand All @@ -49,6 +50,17 @@ pub use commandpb::{
ProposeResponse,
};

/// `BOTTOM_TASKS` are tasks which not dependent on other tasks in the task group.
/// `CurpGroup` uses `BOTTOM_TASKS` to detect whether the curp group is closed or not.
const BOTTOM_TASKS: [TaskName; 3] = [
TaskName::WatchTask,
TaskName::ConfChange,
TaskName::LogPersist,
];

/// The default shutdown timeout used in `wait_for_targets_shutdown`
pub(crate) const DEFAULT_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(7);

pub struct CurpNode {
pub id: ServerId,
pub addr: String,
Expand Down Expand Up @@ -332,6 +344,44 @@ impl CurpGroup {
.all(|node| node.task_manager.is_finished())
}

pub async fn wait_for_node_shutdown(&self, node_id: u64, duration: Duration) {
let node = self
.nodes
.get(&node_id)
.expect("{node_id} should exist in nodes");
let res = std::iter::once(node);
timeout(duration, Self::wait_for_targets_shutdown(res))
.await
.expect("wait for group to shutdown timeout");
assert!(
node.task_manager.is_finished(),
"The target node({node_id}) is not finished yet"
);
}

pub async fn wait_for_group_shutdown(&self, duration: Duration) {
timeout(
duration,
Self::wait_for_targets_shutdown(self.nodes.values()),
)
.await
.expect("wait for group to shutdown timeout");
assert!(self.is_finished(), "The group is not finished yet");
}

async fn wait_for_targets_shutdown(targets: impl Iterator<Item = &CurpNode>) {
let listeners = targets
.flat_map(|node| {
BOTTOM_TASKS
.iter()
.map(|task| node.task_manager.get_shutdown_listener(task.to_owned()))
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();
let waiters: Vec<_> = listeners.iter().map(|l| l.wait()).collect();
futures::future::join_all(waiters.into_iter()).await;
}

async fn stop(&mut self) {
debug!("curp group stopping");

Expand Down
26 changes: 13 additions & 13 deletions crates/curp/tests/it/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use utils::{config::ClientConfig, timestamp};

use crate::common::curp_group::{
commandpb::ProposeId, CurpGroup, FetchClusterRequest, ProposeRequest, ProposeResponse,
DEFAULT_SHUTDOWN_TIMEOUT,
};

#[tokio::test(flavor = "multi_thread")]
Expand Down Expand Up @@ -297,8 +298,9 @@ async fn shutdown_rpc_should_shutdown_the_cluster() {
));

let collection = collection_task.await.unwrap();
sleep_secs(1).await; // wait for the cluster to shutdown
assert!(group.is_finished());
group
.wait_for_group_shutdown(DEFAULT_SHUTDOWN_TIMEOUT)
.await;

let group = CurpGroup::new_rocks(3, tmp_path).await;
let client = group.new_client().await;
Expand Down Expand Up @@ -419,8 +421,9 @@ async fn shutdown_rpc_should_shutdown_the_cluster_when_client_has_wrong_leader()
.unwrap();
client.propose_shutdown().await.unwrap();

sleep_secs(7).await; // wait for the cluster to shutdown
assert!(group.is_finished());
group
.wait_for_group_shutdown(DEFAULT_SHUTDOWN_TIMEOUT)
.await;
}

#[tokio::test(flavor = "multi_thread")]
Expand Down Expand Up @@ -550,8 +553,9 @@ async fn shutdown_rpc_should_shutdown_the_cluster_when_client_has_wrong_cluster(
.await;
client.propose_shutdown().await.unwrap();

sleep_secs(7).await; // wait for the cluster to shutdown
assert!(group.is_finished());
group
.wait_for_group_shutdown(DEFAULT_SHUTDOWN_TIMEOUT)
.await;
}

#[tokio::test(flavor = "multi_thread")]
Expand All @@ -577,13 +581,9 @@ async fn propose_conf_change_rpc_should_work_when_client_has_wrong_cluster() {
let members = client.propose_conf_change(changes).await.unwrap();
assert_eq!(members.len(), 3);
assert!(members.iter().all(|m| m.id != node_id));
sleep_secs(7).await;
assert!(group
.nodes
.get(&node_id)
.unwrap()
.task_manager
.is_finished());
group
.wait_for_node_shutdown(node_id, DEFAULT_SHUTDOWN_TIMEOUT)
.await;
}

#[tokio::test(flavor = "multi_thread")]
Expand Down
5 changes: 5 additions & 0 deletions crates/utils/src/task_manager/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
// \ / | \ /
// WATCH_TASK CONF_CHANGE LOG_PERSIST

// NOTE: In integration tests, we use bottom tasks, like `WatchTask`, `ConfChange`, and `LogPersist`,
// which are not dependent on other tasks to detect the curp group is closed or not. If you want
// to refactor the task group, don't forget to modify the `BOTTOM_TASKS` in `crates/curp/tests/it/common/curp_group.rs`
// to prevent the integration tests from failing.

/// Generate enum with iterator
macro_rules! enum_with_iter {
( $($variant:ident),* $(,)? ) => {
Expand Down
Loading