From 6f1f26903f9078f874791f006dafb41f9e6d64e3 Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Mon, 26 Aug 2024 14:13:44 +0800 Subject: [PATCH] fix: potential panic in shutdown listener Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- crates/curp/tests/it/common/curp_group.rs | 8 +++++++- crates/utils/src/task_manager/mod.rs | 13 ++++++------- crates/xline/src/server/lease_server.rs | 21 ++++++++++++--------- crates/xline/src/server/watch_server.rs | 8 ++++++-- 4 files changed, 31 insertions(+), 19 deletions(-) diff --git a/crates/curp/tests/it/common/curp_group.rs b/crates/curp/tests/it/common/curp_group.rs index fbdab5951..8fe32ae18 100644 --- a/crates/curp/tests/it/common/curp_group.rs +++ b/crates/curp/tests/it/common/curp_group.rs @@ -373,6 +373,8 @@ impl CurpGroup { ) .await .expect("wait for group to shutdown timeout"); + // Sleep for some duration because the tasks may not exit immediately + tokio::time::sleep(Duration::from_secs(2)).await; assert!(self.is_finished(), "The group is not finished yet"); } @@ -381,7 +383,11 @@ impl CurpGroup { .flat_map(|node| { BOTTOM_TASKS .iter() - .map(|task| node.task_manager.get_shutdown_listener(task.to_owned())) + .map(|task| { + node.task_manager + .get_shutdown_listener(task.to_owned()) + .unwrap() + }) .collect::>() }) .collect::>(); diff --git a/crates/utils/src/task_manager/mod.rs b/crates/utils/src/task_manager/mod.rs index 8f177b8ee..834949969 100644 --- a/crates/utils/src/task_manager/mod.rs +++ b/crates/utils/src/task_manager/mod.rs @@ -121,18 +121,17 @@ impl TaskManager { } /// Get shutdown listener + /// + /// Returns `None` if the cluster has been shutdowned #[must_use] #[inline] - pub fn get_shutdown_listener(&self, name: TaskName) -> Listener { - let task = self - .tasks - .get(&name) - .unwrap_or_else(|| unreachable!("task {:?} should exist", name)); - Listener::new( + pub fn get_shutdown_listener(&self, name: TaskName) -> Option { + let task = self.tasks.get(&name)?; + Some(Listener::new( Arc::clone(&self.state), Arc::clone(&task.notifier), Arc::clone(&self.cluster_shutdown_tracker), - ) + )) } /// Spawn a task diff --git a/crates/xline/src/server/lease_server.rs b/crates/xline/src/server/lease_server.rs index d528c1c8d..1dca749f7 100644 --- a/crates/xline/src/server/lease_server.rs +++ b/crates/xline/src/server/lease_server.rs @@ -52,6 +52,10 @@ pub(crate) struct LeaseServer { task_manager: Arc, } +/// A lease keep alive stream +type KeepAliveStream = + Pin> + Send>>; + impl LeaseServer { /// New `LeaseServer` pub(crate) fn new( @@ -135,10 +139,11 @@ impl LeaseServer { fn leader_keep_alive( &self, mut request_stream: tonic::Streaming, - ) -> Pin> + Send>> { + ) -> Result { let shutdown_listener = self .task_manager - .get_shutdown_listener(TaskName::LeaseKeepAlive); + .get_shutdown_listener(TaskName::LeaseKeepAlive) + .ok_or(tonic::Status::cancelled("The cluster is shutting down"))?; let lease_storage = Arc::clone(&self.lease_storage); let stream = try_stream! { loop { @@ -176,7 +181,7 @@ impl LeaseServer { }; } }; - Box::pin(stream) + Ok(Box::pin(stream)) } /// Handle keep alive at follower @@ -185,13 +190,11 @@ impl LeaseServer { &self, mut request_stream: tonic::Streaming, leader_addrs: &[String], - ) -> Result< - Pin> + Send>>, - tonic::Status, - > { + ) -> Result { let shutdown_listener = self .task_manager - .get_shutdown_listener(TaskName::LeaseKeepAlive); + .get_shutdown_listener(TaskName::LeaseKeepAlive) + .ok_or(tonic::Status::cancelled("The cluster is shutting down"))?; let endpoints = build_endpoints(leader_addrs, self.client_tls_config.as_ref())?; let channel = tonic::transport::Channel::balance_list(endpoints.into_iter()); let mut lease_client = LeaseClient::new(channel); @@ -302,7 +305,7 @@ impl Lease for LeaseServer { let request_stream = request.into_inner(); let stream = loop { if self.lease_storage.is_primary() { - break self.leader_keep_alive(request_stream); + break self.leader_keep_alive(request_stream)?; } let leader_id = self.client.fetch_leader_id(false).await?; // Given that a candidate server may become a leader when it won the election or diff --git a/crates/xline/src/server/watch_server.rs b/crates/xline/src/server/watch_server.rs index d7cb68f60..29f67cf74 100644 --- a/crates/xline/src/server/watch_server.rs +++ b/crates/xline/src/server/watch_server.rs @@ -481,7 +481,9 @@ mod test { .return_const(-1_i64); let watcher = Arc::new(mock_watcher); let next_id = Arc::new(WatchIdGenerator::new(1)); - let n = task_manager.get_shutdown_listener(TaskName::WatchTask); + let n = task_manager + .get_shutdown_listener(TaskName::WatchTask) + .unwrap(); let handle = tokio::spawn(WatchServer::task( next_id, Arc::clone(&watcher), @@ -733,7 +735,9 @@ mod test { .return_const(-1_i64); let watcher = Arc::new(mock_watcher); let next_id = Arc::new(WatchIdGenerator::new(1)); - let n = task_manager.get_shutdown_listener(TaskName::WatchTask); + let n = task_manager + .get_shutdown_listener(TaskName::WatchTask) + .unwrap(); let handle = tokio::spawn(WatchServer::task( next_id, Arc::clone(&watcher),