From 0352e787b2604e35f968886c9782baf435b25bbc Mon Sep 17 00:00:00 2001 From: bit-aloo Date: Thu, 9 Jan 2025 21:12:42 +0530 Subject: [PATCH] **Fix jdc sigterm signalling issue** It changes the structuring of the start method, to avoid its blocking nature in case of disconnection from the upstream. Currently, when we are sending the termination signal, during the stance of disconnection from upstream, due of blocking nature of initialize_jd it halts the main thread runtime from executing select block, which listens for any termination signal and channel responses. **Modifications** 1. Addition of new shutdown field, which can be used later in integration test to terminate the instances 2. Change argument type for methods initialize_jd and initialize_jd_solo, to make them movable. 3. Spawning of blocking process as a separate task --- roles/jd-client/src/lib/mod.rs | 163 ++++++++++++++++++--------------- 1 file changed, 91 insertions(+), 72 deletions(-) diff --git a/roles/jd-client/src/lib/mod.rs b/roles/jd-client/src/lib/mod.rs index 467ac52b54..caa13229b7 100644 --- a/roles/jd-client/src/lib/mod.rs +++ b/roles/jd-client/src/lib/mod.rs @@ -20,7 +20,7 @@ use std::{ str::FromStr, sync::Arc, }; -use tokio::task::AbortHandle; +use tokio::{sync::Notify, task::AbortHandle}; use tracing::{error, info}; @@ -57,111 +57,131 @@ pub static IS_NEW_TEMPLATE_HANDLED: AtomicBool = AtomicBool::new(true); pub struct JobDeclaratorClient { /// Configuration of the proxy server [`JobDeclaratorClient`] is connected to. config: ProxyConfig, + shutdown: Arc, } impl JobDeclaratorClient { pub fn new(config: ProxyConfig) -> Self { - Self { config } + Self { + config, + shutdown: Arc::new(Notify::new()), + } } pub async fn start(self) { let mut upstream_index = 0; - let mut interrupt_signal_future = Box::pin(tokio::signal::ctrl_c().fuse()); // Channel used to manage failed tasks let (tx_status, rx_status) = unbounded(); let task_collector = Arc::new(Mutex::new(vec![])); - let proxy_config = &self.config; + tokio::spawn({ + let shutdown_signal = self.shutdown.clone(); + async move { + if tokio::signal::ctrl_c().await.is_ok() { + info!("Interrupt received"); + shutdown_signal.notify_one(); + } + } + }); - loop { + let proxy_config = self.config; + 'outer: loop { let task_collector = task_collector.clone(); let tx_status = tx_status.clone(); + let proxy_config = proxy_config.clone(); if let Some(upstream) = proxy_config.upstreams.get(upstream_index) { - self.initialize_jd(tx_status.clone(), task_collector.clone(), upstream.clone()) - .await; + let tx_status = tx_status.clone(); + let task_collector = task_collector.clone(); + let upstream = upstream.clone(); + tokio::spawn(async move { + Self::initialize_jd(proxy_config, tx_status, task_collector, upstream).await; + }); } else { - self.initialize_jd_as_solo_miner(tx_status.clone(), task_collector.clone()) + let tx_status = tx_status.clone(); + let task_collector = task_collector.clone(); + tokio::spawn(async move { + Self::initialize_jd_as_solo_miner( + proxy_config, + tx_status.clone(), + task_collector.clone(), + ) .await; + }); } // Check all tasks if is_finished() is true, if so exit loop { - let task_status = select! { - task_status = rx_status.recv().fuse() => task_status, - interrupt_signal = interrupt_signal_future => { - match interrupt_signal { - Ok(()) => { - info!("Interrupt received"); - }, - Err(err) => { - error!("Unable to listen for interrupt signal: {}", err); - // we also shut down in case of error - }, - } - std::process::exit(0); - } - }; - let task_status: status::Status = task_status.unwrap(); - - match task_status.state { - // Should only be sent by the downstream listener - status::State::DownstreamShutdown(err) => { - error!("SHUTDOWN from: {}", err); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - task_collector - .safe_lock(|s| { - for handle in s { - handle.abort(); + select! { + task_status = rx_status.recv().fuse() => { + if let Ok(task_status) = task_status { + match task_status.state { + // Should only be sent by the downstream listener + status::State::DownstreamShutdown(err) => { + error!("SHUTDOWN from: {}", err); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + task_collector + .safe_lock(|s| { + for handle in s { + handle.abort(); + } + }) + .unwrap(); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + break; } - }) - .unwrap(); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - break; - } - status::State::UpstreamShutdown(err) => { - error!("SHUTDOWN from: {}", err); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - task_collector - .safe_lock(|s| { - for handle in s { - handle.abort(); + status::State::UpstreamShutdown(err) => { + error!("SHUTDOWN from: {}", err); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + task_collector + .safe_lock(|s| { + for handle in s { + handle.abort(); + } + }) + .unwrap(); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + break; } - }) - .unwrap(); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - break; - } - status::State::UpstreamRogue => { - error!("Changin Pool"); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - task_collector - .safe_lock(|s| { - for handle in s { - handle.abort(); + status::State::UpstreamRogue => { + error!("Changing Pool"); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + task_collector + .safe_lock(|s| { + for handle in s { + handle.abort(); + } + }) + .unwrap(); + upstream_index += 1; + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + break; } - }) - .unwrap(); - upstream_index += 1; - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - break; - } - status::State::Healthy(msg) => { - info!("HEALTHY message: {}", msg); + status::State::Healthy(msg) => { + info!("HEALTHY message: {}", msg); + } + } + } else { + info!("Received unknown task. Shutting down."); + break 'outer; + } + }, + _ = self.shutdown.notified().fuse() => { + info!("Shutting down gracefully..."); + std::process::exit(0); } - } + }; } } } async fn initialize_jd_as_solo_miner( - &self, + proxy_config: ProxyConfig, tx_status: async_channel::Sender>, task_collector: Arc>>, ) { - let proxy_config = &self.config; let timeout = proxy_config.timeout; - let miner_tx_out = proxy_config::get_coinbase_output(proxy_config).unwrap(); + let miner_tx_out = proxy_config::get_coinbase_output(&proxy_config).unwrap(); // When Downstream receive a share that meets bitcoin target it transformit in a // SubmitSolution and send it to the TemplateReceiver @@ -211,12 +231,11 @@ impl JobDeclaratorClient { } async fn initialize_jd( - &self, + proxy_config: ProxyConfig, tx_status: async_channel::Sender>, task_collector: Arc>>, upstream_config: proxy_config::Upstream, ) { - let proxy_config = &self.config; let timeout = proxy_config.timeout; let test_only_do_not_send_solution_to_tp = proxy_config .test_only_do_not_send_solution_to_tp