Skip to content

Commit

Permalink
fix: only return shutdown error on cluster shutdown
Browse files Browse the repository at this point in the history
Signed-off-by: bsbds <[email protected]>
  • Loading branch information
bsbds committed Aug 26, 2024
1 parent 6f1f269 commit 74467e2
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 14 deletions.
11 changes: 7 additions & 4 deletions crates/curp/src/server/curp_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl<C: Command, CE: CommandExecutor<C>, RC: RoleChange> CurpNode<C, CE, RC> {
resp_tx: Arc<ResponseSender>,
bypassed: bool,
) -> Result<(), CurpError> {
if self.curp.is_shutdown() {
if self.curp.is_cluster_shutdown() {
return Err(CurpError::shutting_down());
}
self.curp.check_leader_transfer()?;
Expand Down Expand Up @@ -206,7 +206,7 @@ impl<C: Command, CE: CommandExecutor<C>, RC: RoleChange> CurpNode<C, CE, RC> {

/// Handle `Record` requests
pub(super) fn record(&self, req: &RecordRequest) -> Result<RecordResponse, CurpError> {
if self.curp.is_shutdown() {
if self.curp.is_cluster_shutdown() {
return Err(CurpError::shutting_down());
}
let id = req.propose_id();
Expand All @@ -218,7 +218,7 @@ impl<C: Command, CE: CommandExecutor<C>, RC: RoleChange> CurpNode<C, CE, RC> {

/// Handle `Record` requests
pub(super) fn read_index(&self) -> Result<ReadIndexResponse, CurpError> {
if self.curp.is_shutdown() {
if self.curp.is_cluster_shutdown() {
return Err(CurpError::shutting_down());
}
Ok(ReadIndexResponse {
Expand Down Expand Up @@ -383,9 +383,12 @@ impl<C: Command, CE: CommandExecutor<C>, RC: RoleChange> CurpNode<C, CE, RC> {
// NOTE: The leader may shutdown itself in configuration change.
// We must first check this situation.
self.curp.check_leader_transfer()?;
if self.curp.is_shutdown() {
if self.curp.is_cluster_shutdown() {
return Err(CurpError::shutting_down());
}
if self.curp.is_node_shutdown() {
return Err(CurpError::node_not_exist());
}
if !self.curp.is_leader() {
let (leader_id, term, _) = self.curp.leader();
return Err(CurpError::redirect(leader_id, term));
Expand Down
9 changes: 7 additions & 2 deletions crates/curp/src/server/raw_curp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1374,8 +1374,13 @@ impl<C: Command, RC: RoleChange> RawCurp<C, RC> {
)
}

/// Check if the cluster is shutting down
pub(super) fn is_shutdown(&self) -> bool {
/// Check if the current node is shutting down
pub(super) fn is_node_shutdown(&self) -> bool {
self.task_manager.is_node_shutdown()
}

/// Check if the current node is shutting down
pub(super) fn is_cluster_shutdown(&self) -> bool {
self.task_manager.is_shutdown()
}

Expand Down
28 changes: 20 additions & 8 deletions crates/utils/src/task_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,20 @@ impl TaskManager {
self.state.load(Ordering::Acquire) != 0
}

/// Check if the cluster is shutdown
#[must_use]
#[inline]
pub fn is_node_shutdown(&self) -> bool {
self.state.load(Ordering::Acquire) == 1
}

/// Check if the cluster is shutdown
#[must_use]
#[inline]
pub fn is_cluster_shutdown(&self) -> bool {
self.state.load(Ordering::Acquire) == 2
}

/// Get shutdown listener
///
/// Returns `None` if the cluster has been shutdowned
Expand Down Expand Up @@ -167,9 +181,8 @@ impl TaskManager {
}

/// Inner shutdown task
async fn inner_shutdown(tasks: Arc<DashMap<TaskName, Task>>, state: Arc<AtomicU8>) {
async fn inner_shutdown(tasks: Arc<DashMap<TaskName, Task>>) {
let mut queue = Self::root_tasks_queue(&tasks);
state.store(1, Ordering::Release);
while let Some(v) = queue.pop_front() {
let Some((_name, mut task)) = tasks.remove(&v) else {
continue;
Expand Down Expand Up @@ -205,8 +218,8 @@ impl TaskManager {
#[inline]
pub async fn shutdown(&self, wait: bool) {
let tasks = Arc::clone(&self.tasks);
let state = Arc::clone(&self.state);
let h = tokio::spawn(Self::inner_shutdown(tasks, state));
self.state.store(1, Ordering::Release);
let h = tokio::spawn(Self::inner_shutdown(tasks));
if wait {
h.await
.unwrap_or_else(|e| unreachable!("shutdown task should not panic: {e}"));
Expand All @@ -217,11 +230,10 @@ impl TaskManager {
#[inline]
pub fn cluster_shutdown(&self) {
let tasks = Arc::clone(&self.tasks);
let state = Arc::clone(&self.state);
let tracker = Arc::clone(&self.cluster_shutdown_tracker);
self.state.store(2, Ordering::Release);
let _ig = tokio::spawn(async move {
info!("cluster shutdown start");
state.store(2, Ordering::Release);
_ = tasks
.get(&TaskName::SyncFollower)
.map(|n| n.notifier.notify_waiters());
Expand All @@ -232,7 +244,7 @@ impl TaskManager {
tracker.notify.notified().await;
}
info!("cluster shutdown check passed, start shutdown");
Self::inner_shutdown(tasks, state).await;
Self::inner_shutdown(tasks).await;
});
}

Expand Down Expand Up @@ -430,7 +442,7 @@ mod test {
}
drop(record_tx);
tokio::time::sleep(Duration::from_secs(1)).await;
TaskManager::inner_shutdown(Arc::clone(&tm.tasks), Arc::clone(&tm.state)).await;
TaskManager::inner_shutdown(Arc::clone(&tm.tasks)).await;
let mut shutdown_order = vec![];
while let Some(name) = record_rx.recv().await {
shutdown_order.push(name);
Expand Down

0 comments on commit 74467e2

Please sign in to comment.