Skip to content

Commit

Permalink
fix: potential panic in shutdown listener
Browse files Browse the repository at this point in the history
Signed-off-by: bsbds <[email protected]>
  • Loading branch information
bsbds committed Aug 26, 2024
1 parent a31b2f2 commit 6f1f269
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 19 deletions.
8 changes: 7 additions & 1 deletion crates/curp/tests/it/common/curp_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,8 @@ impl CurpGroup {
)
.await
.expect("wait for group to shutdown timeout");
// Sleep for some duration because the tasks may not exit immediately
tokio::time::sleep(Duration::from_secs(2)).await;
assert!(self.is_finished(), "The group is not finished yet");
}

Expand All @@ -381,7 +383,11 @@ impl CurpGroup {
.flat_map(|node| {
BOTTOM_TASKS
.iter()
.map(|task| node.task_manager.get_shutdown_listener(task.to_owned()))
.map(|task| {
node.task_manager
.get_shutdown_listener(task.to_owned())
.unwrap()
})
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();
Expand Down
13 changes: 6 additions & 7 deletions crates/utils/src/task_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,18 +121,17 @@ impl TaskManager {
}

/// Get shutdown listener
///
/// Returns `None` if the cluster has been shutdowned
#[must_use]
#[inline]
pub fn get_shutdown_listener(&self, name: TaskName) -> Listener {
let task = self
.tasks
.get(&name)
.unwrap_or_else(|| unreachable!("task {:?} should exist", name));
Listener::new(
pub fn get_shutdown_listener(&self, name: TaskName) -> Option<Listener> {
let task = self.tasks.get(&name)?;
Some(Listener::new(
Arc::clone(&self.state),
Arc::clone(&task.notifier),
Arc::clone(&self.cluster_shutdown_tracker),
)
))
}

/// Spawn a task
Expand Down
21 changes: 12 additions & 9 deletions crates/xline/src/server/lease_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ pub(crate) struct LeaseServer {
task_manager: Arc<TaskManager>,
}

/// A lease keep alive stream
type KeepAliveStream =
Pin<Box<dyn Stream<Item = Result<LeaseKeepAliveResponse, tonic::Status>> + Send>>;

impl LeaseServer {
/// New `LeaseServer`
pub(crate) fn new(
Expand Down Expand Up @@ -135,10 +139,11 @@ impl LeaseServer {
fn leader_keep_alive(
&self,
mut request_stream: tonic::Streaming<LeaseKeepAliveRequest>,
) -> Pin<Box<dyn Stream<Item = Result<LeaseKeepAliveResponse, tonic::Status>> + Send>> {
) -> Result<KeepAliveStream, tonic::Status> {
let shutdown_listener = self
.task_manager
.get_shutdown_listener(TaskName::LeaseKeepAlive);
.get_shutdown_listener(TaskName::LeaseKeepAlive)
.ok_or(tonic::Status::cancelled("The cluster is shutting down"))?;
let lease_storage = Arc::clone(&self.lease_storage);
let stream = try_stream! {
loop {
Expand Down Expand Up @@ -176,7 +181,7 @@ impl LeaseServer {
};
}
};
Box::pin(stream)
Ok(Box::pin(stream))
}

/// Handle keep alive at follower
Expand All @@ -185,13 +190,11 @@ impl LeaseServer {
&self,
mut request_stream: tonic::Streaming<LeaseKeepAliveRequest>,
leader_addrs: &[String],
) -> Result<
Pin<Box<dyn Stream<Item = Result<LeaseKeepAliveResponse, tonic::Status>> + Send>>,
tonic::Status,
> {
) -> Result<KeepAliveStream, tonic::Status> {
let shutdown_listener = self
.task_manager
.get_shutdown_listener(TaskName::LeaseKeepAlive);
.get_shutdown_listener(TaskName::LeaseKeepAlive)
.ok_or(tonic::Status::cancelled("The cluster is shutting down"))?;
let endpoints = build_endpoints(leader_addrs, self.client_tls_config.as_ref())?;
let channel = tonic::transport::Channel::balance_list(endpoints.into_iter());
let mut lease_client = LeaseClient::new(channel);
Expand Down Expand Up @@ -302,7 +305,7 @@ impl Lease for LeaseServer {
let request_stream = request.into_inner();
let stream = loop {
if self.lease_storage.is_primary() {
break self.leader_keep_alive(request_stream);
break self.leader_keep_alive(request_stream)?;
}
let leader_id = self.client.fetch_leader_id(false).await?;
// Given that a candidate server may become a leader when it won the election or
Expand Down
8 changes: 6 additions & 2 deletions crates/xline/src/server/watch_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,9 @@ mod test {
.return_const(-1_i64);
let watcher = Arc::new(mock_watcher);
let next_id = Arc::new(WatchIdGenerator::new(1));
let n = task_manager.get_shutdown_listener(TaskName::WatchTask);
let n = task_manager
.get_shutdown_listener(TaskName::WatchTask)
.unwrap();
let handle = tokio::spawn(WatchServer::task(
next_id,
Arc::clone(&watcher),
Expand Down Expand Up @@ -733,7 +735,9 @@ mod test {
.return_const(-1_i64);
let watcher = Arc::new(mock_watcher);
let next_id = Arc::new(WatchIdGenerator::new(1));
let n = task_manager.get_shutdown_listener(TaskName::WatchTask);
let n = task_manager
.get_shutdown_listener(TaskName::WatchTask)
.unwrap();
let handle = tokio::spawn(WatchServer::task(
next_id,
Arc::clone(&watcher),
Expand Down

0 comments on commit 6f1f269

Please sign in to comment.