Skip to content

Commit

Permalink
fix jdc sigterm signalling issue
Browse files Browse the repository at this point in the history
  • Loading branch information
Shourya742 committed Jan 5, 2025
1 parent 1f2c5e8 commit 9585a3d
Showing 1 changed file with 37 additions and 22 deletions.
59 changes: 37 additions & 22 deletions roles/jd-client/src/lib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::{
str::FromStr,
sync::Arc,
};
use tokio::task::AbortHandle;
use tokio::{sync::Notify, task::AbortHandle};

use tracing::{error, info};

Expand Down Expand Up @@ -57,48 +57,65 @@ pub static IS_NEW_TEMPLATE_HANDLED: AtomicBool = AtomicBool::new(true);
pub struct JobDeclaratorClient {
/// Configuration of the proxy server [`JobDeclaratorClient`] is connected to.
config: ProxyConfig,
shutdown: Arc<Notify>,
}

impl JobDeclaratorClient {
pub fn new(config: ProxyConfig) -> Self {
Self { config }
Self {
config,
shutdown: Arc::new(Notify::new()),
}
}

pub async fn start(self) {
let mut upstream_index = 0;
let mut interrupt_signal_future = Box::pin(tokio::signal::ctrl_c().fuse());

// Channel used to manage failed tasks
let (tx_status, rx_status) = unbounded();

let task_collector = Arc::new(Mutex::new(vec![]));

let proxy_config = &self.config;
tokio::spawn({
let shutdown_signal = self.shutdown.clone();
async move {
if tokio::signal::ctrl_c().await.is_ok() {
info!("Interrupt received");
shutdown_signal.notify_one();
}
}
});

let proxy_config = self.config;
loop {
let task_collector = task_collector.clone();
let tx_status = tx_status.clone();
let proxy_config = proxy_config.clone();
if let Some(upstream) = proxy_config.upstreams.get(upstream_index) {
self.initialize_jd(tx_status.clone(), task_collector.clone(), upstream.clone())
.await;
let tx_status = tx_status.clone();
let task_collector = task_collector.clone();
let upstream = upstream.clone();
tokio::spawn(async move {
Self::initialize_jd(proxy_config, tx_status, task_collector, upstream).await;
});
} else {
self.initialize_jd_as_solo_miner(tx_status.clone(), task_collector.clone())
let tx_status = tx_status.clone();
let task_collector = task_collector.clone();
tokio::spawn(async move {
Self::initialize_jd_as_solo_miner(
proxy_config,
tx_status.clone(),
task_collector.clone(),
)
.await;
});
}
// Check all tasks if is_finished() is true, if so exit
loop {
let task_status = select! {
task_status = rx_status.recv().fuse() => task_status,
interrupt_signal = interrupt_signal_future => {
match interrupt_signal {
Ok(()) => {
info!("Interrupt received");
},
Err(err) => {
error!("Unable to listen for interrupt signal: {}", err);
// we also shut down in case of error
},
}
_ = self.shutdown.notified().fuse() => {
info!("Shutting down gracefully...");
std::process::exit(0);
}
};
Expand Down Expand Up @@ -155,13 +172,12 @@ impl JobDeclaratorClient {
}

async fn initialize_jd_as_solo_miner(
&self,
proxy_config: ProxyConfig,
tx_status: async_channel::Sender<status::Status<'static>>,
task_collector: Arc<Mutex<Vec<AbortHandle>>>,
) {
let proxy_config = &self.config;
let timeout = proxy_config.timeout;
let miner_tx_out = proxy_config::get_coinbase_output(proxy_config).unwrap();
let miner_tx_out = proxy_config::get_coinbase_output(&proxy_config).unwrap();

// When Downstream receive a share that meets bitcoin target it transformit in a
// SubmitSolution and send it to the TemplateReceiver
Expand Down Expand Up @@ -211,12 +227,11 @@ impl JobDeclaratorClient {
}

async fn initialize_jd(
&self,
proxy_config: ProxyConfig,
tx_status: async_channel::Sender<status::Status<'static>>,
task_collector: Arc<Mutex<Vec<AbortHandle>>>,
upstream_config: proxy_config::Upstream,
) {
let proxy_config = &self.config;
let timeout = proxy_config.timeout;
let test_only_do_not_send_solution_to_tp = proxy_config
.test_only_do_not_send_solution_to_tp
Expand Down

0 comments on commit 9585a3d

Please sign in to comment.