From 6a95deb54ca4c058e72d645c0204bb6528c8fca6 Mon Sep 17 00:00:00 2001 From: bit-aloo Date: Sun, 5 Jan 2025 09:27:25 +0530 Subject: [PATCH] move task_status block inside the select macro --- roles/jd-client/src/lib/mod.rs | 109 ++++++++++++++++----------------- 1 file changed, 54 insertions(+), 55 deletions(-) diff --git a/roles/jd-client/src/lib/mod.rs b/roles/jd-client/src/lib/mod.rs index 970b894501..4f932329e6 100644 --- a/roles/jd-client/src/lib/mod.rs +++ b/roles/jd-client/src/lib/mod.rs @@ -112,66 +112,65 @@ impl JobDeclaratorClient { } // Check all tasks if is_finished() is true, if so exit loop { - let task_status = select! { - task_status = rx_status.recv().fuse() => task_status, + select! { + task_status = rx_status.recv().fuse() => { + if let Ok(task_status) = task_status { + match task_status.state { + // Should only be sent by the downstream listener + status::State::DownstreamShutdown(err) => { + error!("SHUTDOWN from: {}", err); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + task_collector + .safe_lock(|s| { + for handle in s { + handle.abort(); + } + }) + .unwrap(); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + break; + } + status::State::UpstreamShutdown(err) => { + error!("SHUTDOWN from: {}", err); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + task_collector + .safe_lock(|s| { + for handle in s { + handle.abort(); + } + }) + .unwrap(); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + break; + } + status::State::UpstreamRogue => { + error!("Changin Pool"); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + task_collector + .safe_lock(|s| { + for handle in s { + handle.abort(); + } + }) + .unwrap(); + upstream_index += 1; + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + break; + } + status::State::Healthy(msg) => { + info!("HEALTHY message: {}", msg); + } + } + } else { + info!("Closing channels"); + break 'outer; + } + }, _ = self.shutdown.notified().fuse() => { info!("Shutting down gracefully..."); std::process::exit(0); } }; - // let task_status: status::Status = task_status.unwrap(); - - if let Ok(task_status) = task_status { - match task_status.state { - // Should only be sent by the downstream listener - status::State::DownstreamShutdown(err) => { - error!("SHUTDOWN from: {}", err); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - task_collector - .safe_lock(|s| { - for handle in s { - handle.abort(); - } - }) - .unwrap(); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - break; - } - status::State::UpstreamShutdown(err) => { - error!("SHUTDOWN from: {}", err); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - task_collector - .safe_lock(|s| { - for handle in s { - handle.abort(); - } - }) - .unwrap(); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - break; - } - status::State::UpstreamRogue => { - error!("Changin Pool"); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - task_collector - .safe_lock(|s| { - for handle in s { - handle.abort(); - } - }) - .unwrap(); - upstream_index += 1; - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - break; - } - status::State::Healthy(msg) => { - info!("HEALTHY message: {}", msg); - } - } - } else { - info!("Closing channels"); - break 'outer; - } } } }