From 0079e9c0922e3bea502bbcd1daca0bc30826bf4c Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Mon, 26 Aug 2024 14:35:32 +0800 Subject: [PATCH] fix: only return shutdown error on cluster shutdown Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- crates/curp/src/server/curp_node.rs | 11 ++++++---- crates/curp/src/server/raw_curp/mod.rs | 9 +++++++-- crates/utils/src/task_manager/mod.rs | 28 ++++++++++++++++++-------- 3 files changed, 34 insertions(+), 14 deletions(-) diff --git a/crates/curp/src/server/curp_node.rs b/crates/curp/src/server/curp_node.rs index 1b1b94cc9..4e1c5a552 100644 --- a/crates/curp/src/server/curp_node.rs +++ b/crates/curp/src/server/curp_node.rs @@ -160,7 +160,7 @@ impl, RC: RoleChange> CurpNode { resp_tx: Arc, bypassed: bool, ) -> Result<(), CurpError> { - if self.curp.is_shutdown() { + if self.curp.is_cluster_shutdown() { return Err(CurpError::shutting_down()); } self.curp.check_leader_transfer()?; @@ -206,7 +206,7 @@ impl, RC: RoleChange> CurpNode { /// Handle `Record` requests pub(super) fn record(&self, req: &RecordRequest) -> Result { - if self.curp.is_shutdown() { + if self.curp.is_cluster_shutdown() { return Err(CurpError::shutting_down()); } let id = req.propose_id(); @@ -218,7 +218,7 @@ impl, RC: RoleChange> CurpNode { /// Handle `Record` requests pub(super) fn read_index(&self) -> Result { - if self.curp.is_shutdown() { + if self.curp.is_cluster_shutdown() { return Err(CurpError::shutting_down()); } Ok(ReadIndexResponse { @@ -383,9 +383,12 @@ impl, RC: RoleChange> CurpNode { // NOTE: The leader may shutdown itself in configuration change. // We must first check this situation. self.curp.check_leader_transfer()?; - if self.curp.is_shutdown() { + if self.curp.is_cluster_shutdown() { return Err(CurpError::shutting_down()); } + if self.curp.is_node_shutdown() { + return Err(CurpError::node_not_exist()); + } if !self.curp.is_leader() { let (leader_id, term, _) = self.curp.leader(); return Err(CurpError::redirect(leader_id, term)); diff --git a/crates/curp/src/server/raw_curp/mod.rs b/crates/curp/src/server/raw_curp/mod.rs index fd367400f..b6b5a0fe7 100644 --- a/crates/curp/src/server/raw_curp/mod.rs +++ b/crates/curp/src/server/raw_curp/mod.rs @@ -1374,8 +1374,13 @@ impl RawCurp { ) } - /// Check if the cluster is shutting down - pub(super) fn is_shutdown(&self) -> bool { + /// Check if the current node is shutting down + pub(super) fn is_node_shutdown(&self) -> bool { + self.task_manager.is_node_shutdown() + } + + /// Check if the current node is shutting down + pub(super) fn is_cluster_shutdown(&self) -> bool { self.task_manager.is_shutdown() } diff --git a/crates/utils/src/task_manager/mod.rs b/crates/utils/src/task_manager/mod.rs index 834949969..587613cb7 100644 --- a/crates/utils/src/task_manager/mod.rs +++ b/crates/utils/src/task_manager/mod.rs @@ -120,6 +120,20 @@ impl TaskManager { self.state.load(Ordering::Acquire) != 0 } + /// Check if the cluster is shutdown + #[must_use] + #[inline] + pub fn is_node_shutdown(&self) -> bool { + self.state.load(Ordering::Acquire) == 1 + } + + /// Check if the cluster is shutdown + #[must_use] + #[inline] + pub fn is_cluster_shutdown(&self) -> bool { + self.state.load(Ordering::Acquire) == 2 + } + /// Get shutdown listener /// /// Returns `None` if the cluster has been shutdowned @@ -167,9 +181,8 @@ impl TaskManager { } /// Inner shutdown task - async fn inner_shutdown(tasks: Arc>, state: Arc) { + async fn inner_shutdown(tasks: Arc>) { let mut queue = Self::root_tasks_queue(&tasks); - state.store(1, Ordering::Release); while let Some(v) = queue.pop_front() { let Some((_name, mut task)) = tasks.remove(&v) else { continue; @@ -205,8 +218,8 @@ impl TaskManager { #[inline] pub async fn shutdown(&self, wait: bool) { let tasks = Arc::clone(&self.tasks); - let state = Arc::clone(&self.state); - let h = tokio::spawn(Self::inner_shutdown(tasks, state)); + self.state.store(1, Ordering::Release); + let h = tokio::spawn(Self::inner_shutdown(tasks)); if wait { h.await .unwrap_or_else(|e| unreachable!("shutdown task should not panic: {e}")); @@ -217,11 +230,10 @@ impl TaskManager { #[inline] pub fn cluster_shutdown(&self) { let tasks = Arc::clone(&self.tasks); - let state = Arc::clone(&self.state); let tracker = Arc::clone(&self.cluster_shutdown_tracker); + self.state.store(2, Ordering::Release); let _ig = tokio::spawn(async move { info!("cluster shutdown start"); - state.store(2, Ordering::Release); _ = tasks .get(&TaskName::SyncFollower) .map(|n| n.notifier.notify_waiters()); @@ -232,7 +244,7 @@ impl TaskManager { tracker.notify.notified().await; } info!("cluster shutdown check passed, start shutdown"); - Self::inner_shutdown(tasks, state).await; + Self::inner_shutdown(tasks).await; }); } @@ -430,7 +442,7 @@ mod test { } drop(record_tx); tokio::time::sleep(Duration::from_secs(1)).await; - TaskManager::inner_shutdown(Arc::clone(&tm.tasks), Arc::clone(&tm.state)).await; + TaskManager::inner_shutdown(Arc::clone(&tm.tasks)).await; let mut shutdown_order = vec![]; while let Some(name) = record_rx.recv().await { shutdown_order.push(name);