From b0925141afd8c21bbf22ca5e9ccccccea59a6ed4 Mon Sep 17 00:00:00 2001 From: lorban Date: Mon, 10 Jun 2024 22:56:16 +0200 Subject: [PATCH] TProxy restart if gests disconnected by upstream When gets disconnected from upstream, the TProxy kills all spawned tasks and reconnects. Add start function and put starting logic there Call this function when receive an UpstreamShutdown, after waiting a random amount of time (if there are more TProxy for the same upstream and all of them connect at the same time, it is bad) Add a cancellation token, which clones are propagated to all downstreams. If one of this clones gets cancelled, all of them get cancelled. After each spwaned task, there is a second task that checks if the token get canceled. In this case, it kills all the task above. use tokio::task in favor of async_std::task --- protocols/Cargo.lock | 2 +- roles/Cargo.lock | 2 +- roles/translator/Cargo.toml | 2 +- .../tproxy-config-local-jdc-example.toml | 4 +- .../src/lib/downstream_sv1/downstream.rs | 50 ++++- roles/translator/src/lib/proxy/bridge.rs | 43 +++- .../src/lib/upstream_sv2/upstream.rs | 48 ++++- roles/translator/src/main.rs | 189 +++++++++++++----- 8 files changed, 268 insertions(+), 72 deletions(-) diff --git a/protocols/Cargo.lock b/protocols/Cargo.lock index fbe5ddceac..31450eea42 100644 --- a/protocols/Cargo.lock +++ b/protocols/Cargo.lock @@ -744,7 +744,7 @@ checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc" [[package]] name = "sv1_api" -version = "1.0.0" +version = "1.0.1" dependencies = [ "binary_sv2", "bitcoin_hashes 0.3.2", diff --git a/roles/Cargo.lock b/roles/Cargo.lock index fbaeadc6da..50dee08b42 100644 --- a/roles/Cargo.lock +++ b/roles/Cargo.lock @@ -2053,7 +2053,7 @@ dependencies = [ [[package]] name = "sv1_api" -version = "1.0.0" +version = "1.0.1" dependencies = [ "binary_sv2", "bitcoin_hashes 0.3.2", diff --git a/roles/translator/Cargo.toml b/roles/translator/Cargo.toml index 182370cbd4..6194c74d59 100644 --- a/roles/translator/Cargo.toml +++ b/roles/translator/Cargo.toml @@ -34,11 +34,11 @@ error_handling = { version = "1.0.0", path = "../../utils/error-handling" } key-utils = { version = "^1.0.0", path = "../../utils/key-utils" } tokio-util = { version = "0.7.10", features = ["codec"] } async-compat = "0.2.1" +rand = "0.8.4" [dev-dependencies] -rand = "0.8.4" sha2 = "0.10.6" [features] diff --git a/roles/translator/config-examples/tproxy-config-local-jdc-example.toml b/roles/translator/config-examples/tproxy-config-local-jdc-example.toml index ce11ca19ef..5fe4a8eebd 100644 --- a/roles/translator/config-examples/tproxy-config-local-jdc-example.toml +++ b/roles/translator/config-examples/tproxy-config-local-jdc-example.toml @@ -25,7 +25,7 @@ min_extranonce2_size = 8 # Difficulty params [downstream_difficulty_config] # hashes/s of the weakest miner that will be connecting (e.g.: 10 Th/s = 10_000_000_000_000.0) -min_individual_miner_hashrate=10_000_000.0 +min_individual_miner_hashrate=10_000_000_000_000.0 # target number of shares per minute the miner should be sending shares_per_minute = 6.0 @@ -33,4 +33,4 @@ shares_per_minute = 6.0 # interval in seconds to elapse before updating channel hashrate with the pool channel_diff_update_interval = 60 # estimated accumulated hashrate of all downstream miners (e.g.: 10 Th/s = 10_000_000_000_000.0) -channel_nominal_hashrate = 10_000_000.0 +channel_nominal_hashrate = 10_000_000_000_000.0 diff --git a/roles/translator/src/lib/downstream_sv1/downstream.rs b/roles/translator/src/lib/downstream_sv1/downstream.rs index 9e44a96fb9..f71e9b51ad 100644 --- a/roles/translator/src/lib/downstream_sv1/downstream.rs +++ b/roles/translator/src/lib/downstream_sv1/downstream.rs @@ -27,6 +27,7 @@ use futures::select; use tokio_util::codec::{FramedRead, LinesCodec}; use std::{net::SocketAddr, sync::Arc}; +use tokio_util::sync::CancellationToken; use tracing::{debug, info, warn}; use v1::{ client_to_server::{self, Submit}, @@ -110,6 +111,7 @@ impl Downstream { host: String, difficulty_config: DownstreamDifficultyConfig, upstream_difficulty_config: Arc>, + cancellation_token: CancellationToken, ) { let stream = std::sync::Arc::new(stream); @@ -150,11 +152,12 @@ impl Downstream { let rx_shutdown_clone = rx_shutdown.clone(); let tx_shutdown_clone = tx_shutdown.clone(); let tx_status_reader = tx_status.clone(); + let cancellation_token_mining_device = cancellation_token.clone(); // Task to read from SV1 Mining Device Client socket via `socket_reader`. Depending on the // SV1 message received, a message response is sent directly back to the SV1 Downstream // role, or the message is sent upwards to the Bridge for translation into a SV2 message // and then sent to the SV2 Upstream role. - let _socket_reader_task = task::spawn(async move { + let socket_reader_task = tokio::task::spawn(async move { let reader = BufReader::new(&*socket_reader); let mut messages = FramedRead::new( async_compat::Compat::new(reader), @@ -205,15 +208,24 @@ impl Downstream { kill(&tx_shutdown_clone).await; warn!("Downstream: Shutting down sv1 downstream reader"); }); + tokio::task::spawn(async move { + tokio::select! { + _ = cancellation_token_mining_device.cancelled() => { + socket_reader_task.abort(); + warn!("Shutting down sv1 downstream reader"); + }, + } + }); let rx_shutdown_clone = rx_shutdown.clone(); let tx_shutdown_clone = tx_shutdown.clone(); let tx_status_writer = tx_status.clone(); let host_ = host.clone(); + let cancellation_token_new_sv1_message_no_transl = cancellation_token.clone(); // Task to receive SV1 message responses to SV1 messages that do NOT need translation. // These response messages are sent directly to the SV1 Downstream role. - let _socket_writer_task = task::spawn(async move { + let socket_writer_task = tokio::task::spawn(async move { loop { select! { res = receiver_outgoing.recv().fuse() => { @@ -242,11 +254,20 @@ impl Downstream { &host_ ); }); + tokio::task::spawn(async move { + tokio::select! { + _ = cancellation_token_new_sv1_message_no_transl.cancelled() => { + socket_writer_task.abort(); + warn!("Shutting down sv1 downstream writer"); + }, + } + }); let tx_status_notify = tx_status; let self_ = downstream.clone(); - let _notify_task = task::spawn(async move { + let cancellation_token_notify_task = cancellation_token.clone(); + let notify_task = tokio::task::spawn(async move { let timeout_timer = std::time::Instant::now(); let mut first_sent = false; loop { @@ -329,10 +350,19 @@ impl Downstream { &host ); }); + tokio::task::spawn(async move { + tokio::select! { + _ = cancellation_token_notify_task.cancelled() => { + notify_task.abort(); + warn!("Shutting down sv1 downstream job notifier"); + }, + } + }); } /// Accept connections from one or more SV1 Downstream roles (SV1 Mining Devices) and create a /// new `Downstream` for each connection. + #[allow(clippy::too_many_arguments)] pub fn accept_connections( downstream_addr: SocketAddr, tx_sv1_submit: Sender, @@ -341,8 +371,11 @@ impl Downstream { bridge: Arc>, downstream_difficulty_config: DownstreamDifficultyConfig, upstream_difficulty_config: Arc>, + cancellation_token: CancellationToken, ) { - task::spawn(async move { + let cancellation_token_downstream = cancellation_token.clone(); + + let task = tokio::task::spawn(async move { let downstream_listener = TcpListener::bind(downstream_addr).await.unwrap(); let mut downstream_incoming = downstream_listener.incoming(); @@ -369,6 +402,7 @@ impl Downstream { host, downstream_difficulty_config.clone(), upstream_difficulty_config.clone(), + cancellation_token_downstream.clone(), ) .await; } @@ -378,6 +412,14 @@ impl Downstream { } } }); + tokio::task::spawn(async move { + tokio::select! { + _ = cancellation_token.cancelled() => { + task.abort(); + warn!("Shutting down accept connections task"); + }, + }; + }); } /// As SV1 messages come in, determines if the message response needs to be translated to SV2 diff --git a/roles/translator/src/lib/proxy/bridge.rs b/roles/translator/src/lib/proxy/bridge.rs index ee9ad43370..91a2d4b960 100644 --- a/roles/translator/src/lib/proxy/bridge.rs +++ b/roles/translator/src/lib/proxy/bridge.rs @@ -1,5 +1,4 @@ use async_channel::{Receiver, Sender}; -use async_std::task; use roles_logic_sv2::{ channel_logic::channel_factory::{ExtendedChannelKind, ProxyExtendedChannelFactory, Share}, mining_sv2::{ @@ -22,6 +21,7 @@ use super::super::{ }; use error_handling::handle_result; use roles_logic_sv2::{channel_logic::channel_factory::OnNewShare, Error as RolesLogicError}; +use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; /// Bridge between the SV2 `Upstream` and SV1 `Downstream` responsible for the following messaging @@ -64,6 +64,7 @@ pub struct Bridge { last_p_hash: Option>, target: Arc>>, last_job_id: u32, + cancellation_token: CancellationToken, } impl Bridge { @@ -79,6 +80,7 @@ impl Bridge { extranonces: ExtendedExtranonce, target: Arc>>, up_id: u32, + cancellation_token: CancellationToken, ) -> Arc> { let ids = Arc::new(Mutex::new(GroupId::new())); let share_per_min = 1.0; @@ -107,6 +109,7 @@ impl Bridge { last_p_hash: None, target, last_job_id: 0, + cancellation_token, })) } @@ -162,10 +165,12 @@ impl Bridge { /// Receives a `DownstreamMessages` message from the `Downstream`, handles based on the /// variant received. fn handle_downstream_messages(self_: Arc>) { + let cancellation_token_handle_downstream = + self_.safe_lock(|b| b.cancellation_token.clone()).unwrap(); let (rx_sv1_downstream, tx_status) = self_ .safe_lock(|s| (s.rx_sv1_downstream.clone(), s.tx_status.clone())) .unwrap(); - task::spawn(async move { + let task = tokio::task::spawn(async move { loop { let msg = handle_result!(tx_status, rx_sv1_downstream.clone().recv().await); @@ -185,6 +190,14 @@ impl Bridge { }; } }); + tokio::task::spawn(async move { + tokio::select! { + _ = cancellation_token_handle_downstream.cancelled() => { + task.abort(); + warn!("Shutting down handle_result task"); + }, + } + }); } /// receives a `SetDownstreamTarget` and updates the downstream target for the channel #[allow(clippy::result_large_err)] @@ -367,6 +380,8 @@ impl Bridge { /// corresponding `job_id` has already been received. If this is not the case, an error has /// occurred on the Upstream pool role and the connection will close. fn handle_new_prev_hash(self_: Arc>) { + let cancellation_token_handle_prev_hash = + self_.safe_lock(|b| b.cancellation_token.clone()).unwrap(); let (tx_sv1_notify, rx_sv2_set_new_prev_hash, tx_status) = self_ .safe_lock(|s| { ( @@ -377,7 +392,7 @@ impl Bridge { }) .unwrap(); debug!("Starting handle_new_prev_hash task"); - task::spawn(async move { + let task = tokio::task::spawn(async move { loop { // Receive `SetNewPrevHash` from `Upstream` let sv2_set_new_prev_hash: SetNewPrevHash = @@ -397,6 +412,14 @@ impl Bridge { ) } }); + tokio::task::spawn(async move { + tokio::select! { + _ = cancellation_token_handle_prev_hash.cancelled() => { + task.abort(); + warn!("Shutting down handle_new_prev_hash"); + } + } + }); } async fn handle_new_extended_mining_job_( @@ -460,6 +483,8 @@ impl Bridge { /// `SetNewPrevHash` `job_id`, an error has occurred on the Upstream pool role and the /// connection will close. fn handle_new_extended_mining_job(self_: Arc>) { + let cancellation_token_new_extended_mining_job = + self_.safe_lock(|b| b.cancellation_token.clone()).unwrap(); let (tx_sv1_notify, rx_sv2_new_ext_mining_job, tx_status) = self_ .safe_lock(|s| { ( @@ -470,7 +495,7 @@ impl Bridge { }) .unwrap(); debug!("Starting handle_new_extended_mining_job task"); - task::spawn(async move { + let task = tokio::task::spawn(async move { loop { // Receive `NewExtendedMiningJob` from `Upstream` let sv2_new_extended_mining_job: NewExtendedMiningJob = handle_result!( @@ -494,6 +519,14 @@ impl Bridge { .store(true, std::sync::atomic::Ordering::SeqCst); } }); + tokio::task::spawn(async move { + tokio::select! { + _ = cancellation_token_new_extended_mining_job.cancelled() => { + task.abort(); + warn!("Task handle_new_extended_mining_job cancelled"); + } + } + }); } } pub struct OpenSv1Downstream { @@ -543,6 +576,7 @@ mod test { rx_sv1_notify, }; + let cancellation_token = CancellationToken::new(); let b = Bridge::new( rx_sv1_submit, tx_sv2_submit_shares_ext, @@ -553,6 +587,7 @@ mod test { extranonces, Arc::new(Mutex::new(upstream_target)), 1, + cancellation_token, ); (b, interface) } diff --git a/roles/translator/src/lib/upstream_sv2/upstream.rs b/roles/translator/src/lib/upstream_sv2/upstream.rs index f6d192f75e..f19842ceac 100644 --- a/roles/translator/src/lib/upstream_sv2/upstream.rs +++ b/roles/translator/src/lib/upstream_sv2/upstream.rs @@ -9,7 +9,7 @@ use crate::{ upstream_sv2::{EitherFrame, Message, StdFrame, UpstreamConnection}, }; use async_channel::{Receiver, Sender}; -use async_std::{net::TcpStream, task}; +use async_std::net::TcpStream; use binary_sv2::u256_from_int; use codec_sv2::{Frame, HandshakeRole, Initiator}; use error_handling::handle_result; @@ -36,9 +36,9 @@ use roles_logic_sv2::{ use std::{ net::SocketAddr, sync::{atomic::AtomicBool, Arc}, - thread::sleep, - time::Duration, }; +use tokio::time::{sleep, Duration}; +use tokio_util::sync::CancellationToken; use tracing::{error, info, warn}; use stratum_common::bitcoin::BlockHash; @@ -98,6 +98,7 @@ pub struct Upstream { // and the upstream just needs to occasionally check if it has changed more than // than the configured percentage pub(super) difficulty_config: Arc>, + cancellation_token: CancellationToken, } impl PartialEq for Upstream { @@ -124,6 +125,7 @@ impl Upstream { tx_status: status::Sender, target: Arc>>, difficulty_config: Arc>, + cancellation_token: CancellationToken, ) -> ProxyResult<'static, Arc>> { // Connect to the SV2 Upstream role retry connection every 5 seconds. let socket = loop { @@ -135,7 +137,7 @@ impl Upstream { address, e ); - sleep(Duration::from_secs(5)); + sleep(Duration::from_secs(5)).await; } } }; @@ -171,6 +173,7 @@ impl Upstream { tx_status, target, difficulty_config, + cancellation_token, }))) } @@ -259,6 +262,9 @@ impl Upstream { #[allow(clippy::result_large_err)] pub fn parse_incoming(self_: Arc>) -> ProxyResult<'static, ()> { let clone = self_.clone(); + let cancellation_token = self_.safe_lock(|s| s.cancellation_token.clone()).unwrap(); + let token1 = cancellation_token.clone(); + let token2 = cancellation_token.clone(); let ( tx_frame, tx_sv2_extranonce, @@ -281,16 +287,24 @@ impl Upstream { { let self_ = self_.clone(); let tx_status = tx_status.clone(); - task::spawn(async move { + let task = tokio::task::spawn(async move { // No need to start diff management immediatly - async_std::task::sleep(Duration::from_secs(10)).await; + sleep(Duration::from_secs(10)).await; loop { handle_result!(tx_status, Self::try_update_hashrate(self_.clone()).await); } }); + tokio::task::spawn(async move { + tokio::select! { + _ = token1.cancelled() => { + task.abort(); + warn!("Shutting down handle result task"); + }, + } + }); } - task::spawn(async move { + let task = tokio::task::spawn(async move { loop { // Waiting to receive a message from the SV2 Upstream role let incoming = handle_result!(tx_status, recv.recv().await); @@ -433,6 +447,14 @@ impl Upstream { } } }); + tokio::task::spawn(async move { + tokio::select! { + _ = token2.cancelled() => { + task.abort(); + warn!("Shutting down parse incoming task"); + }, + } + }); Ok(()) } @@ -459,6 +481,7 @@ impl Upstream { #[allow(clippy::result_large_err)] pub fn handle_submit(self_: Arc>) -> ProxyResult<'static, ()> { + let cancellation_token = self_.safe_lock(|s| s.cancellation_token.clone()).unwrap(); let clone = self_.clone(); let (tx_frame, receiver, tx_status) = clone .safe_lock(|s| { @@ -470,7 +493,7 @@ impl Upstream { }) .map_err(|_| PoisonLock)?; - task::spawn(async move { + let task = tokio::task::spawn(async move { loop { let mut sv2_submit: SubmitSharesExtended = handle_result!(tx_status, receiver.recv().await); @@ -506,6 +529,15 @@ impl Upstream { ); } }); + tokio::task::spawn(async move { + tokio::select! { + _ = cancellation_token.cancelled() => { + task.abort(); + info!("Shutting down handle submit task"); + }, + } + }); + Ok(()) } diff --git a/roles/translator/src/main.rs b/roles/translator/src/main.rs index c1307a5a2f..3f6f91c0fe 100644 --- a/roles/translator/src/main.rs +++ b/roles/translator/src/main.rs @@ -3,24 +3,29 @@ mod args; mod lib; use args::Args; +use async_channel::{bounded, unbounded, Receiver, Sender}; +use downstream_sv1::DownstreamMessages; use error::{Error, ProxyResult}; +use futures::{select, FutureExt}; use lib::{downstream_sv1, error, proxy, proxy_config, status, upstream_sv2}; use proxy_config::ProxyConfig; -use roles_logic_sv2::utils::Mutex; - -use async_channel::{bounded, unbounded}; -use futures::{select, FutureExt}; +use rand::Rng; +use roles_logic_sv2::{ + mining_sv2::{ExtendedExtranonce, NewExtendedMiningJob, SetNewPrevHash, SubmitSharesExtended}, + utils::Mutex, +}; use std::{ net::{IpAddr, SocketAddr}, str::FromStr, sync::Arc, }; -use tokio::{sync::broadcast, task}; +use tokio::{sync::broadcast, task, time::Duration}; +use tokio_util::sync::CancellationToken; use v1::server_to_client; use crate::status::{State, Status}; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, warn}; /// Process CLI args, if any. #[allow(clippy::result_large_err)] fn process_cli_args<'a>() -> ProxyResult<'a, ProxyConfig> { @@ -77,6 +82,120 @@ async fn main() { broadcast::Receiver, ) = broadcast::channel(10); + let cancellation_token = CancellationToken::new(); + start( + rx_sv2_submit_shares_ext.clone(), + tx_sv2_submit_shares_ext.clone(), + tx_sv2_new_ext_mining_job.clone(), + tx_sv2_set_new_prev_hash.clone(), + tx_sv2_extranonce.clone(), + rx_sv2_extranonce.clone(), + rx_sv2_set_new_prev_hash.clone(), + rx_sv2_new_ext_mining_job.clone(), + rx_sv1_downstream.clone(), + tx_sv1_bridge.clone(), + tx_sv1_notify.clone(), + target.clone(), + tx_status.clone(), + cancellation_token.clone(), + ) + .await; + + debug!("Starting up signal listener"); + + let mut interrupt_signal_future = Box::pin(tokio::signal::ctrl_c().fuse()); + debug!("Starting up status listener"); + // Check all tasks if is_finished() is true, if so exit + loop { + let task_status = select! { + task_status = rx_status.recv().fuse() => task_status, + interrupt_signal = interrupt_signal_future => { + match interrupt_signal { + Ok(()) => { + info!("Interrupt received"); + }, + Err(err) => { + error!("Unable to listen for interrupt signal: {}", err); + // we also shut down in case of error + }, + } + break; + } + }; + let task_status: Status = task_status.unwrap(); + + match task_status.state { + // Should only be sent by the downstream listener + State::DownstreamShutdown(err) => { + error!("SHUTDOWN from: {}", err); + break; + } + State::BridgeShutdown(err) => { + error!("SHUTDOWN from: {}", err); + break; + } + State::UpstreamShutdown(err) => { + error!("SHUTDOWN from: {}", err); + cancellation_token.clone().cancel(); + + // wait a random amount of time between 0 and 3000ms + // if all the downstreams try to reconnect at the same time, the upstream may fail + tokio::time::sleep(Duration::from_millis(1000)).await; + let mut rng = rand::thread_rng(); + let wait_time = rng.gen_range(0..=3000); + tokio::time::sleep(Duration::from_millis(wait_time)).await; + + // create a new token + let cancellation_token = CancellationToken::new(); + + error!("Trying recconnecting to upstream"); + start( + rx_sv2_submit_shares_ext.clone(), + tx_sv2_submit_shares_ext.clone(), + tx_sv2_new_ext_mining_job.clone(), + tx_sv2_set_new_prev_hash.clone(), + tx_sv2_extranonce.clone(), + rx_sv2_extranonce.clone(), + rx_sv2_set_new_prev_hash.clone(), + rx_sv2_new_ext_mining_job.clone(), + rx_sv1_downstream.clone(), + tx_sv1_bridge.clone(), + tx_sv1_notify.clone(), + target.clone(), + tx_status.clone(), + cancellation_token.clone(), + ) + .await; + } + State::Healthy(msg) => { + info!("HEALTHY message: {}", msg); + } + } + } +} + +#[allow(clippy::too_many_arguments)] +async fn start<'a>( + rx_sv2_submit_shares_ext: Receiver>, + tx_sv2_submit_shares_ext: Sender>, + tx_sv2_new_ext_mining_job: Sender>, + tx_sv2_set_new_prev_hash: Sender>, + tx_sv2_extranonce: Sender<(ExtendedExtranonce, u32)>, + rx_sv2_extranonce: Receiver<(ExtendedExtranonce, u32)>, + rx_sv2_set_new_prev_hash: Receiver>, + rx_sv2_new_ext_mining_job: Receiver>, + rx_sv1_downstream: Receiver, + tx_sv1_bridge: Sender, + tx_sv1_notify: broadcast::Sender>, + target: Arc>>, + tx_status: async_channel::Sender>, + cancellation_token: CancellationToken, +) { + let proxy_config = match process_cli_args() { + Ok(p) => p, + Err(e) => panic!("failed to load config: {}", e), + }; + info!("Proxy Config: {:?}", &proxy_config); // Format `Upstream` connection address let upstream_addr = SocketAddr::new( IpAddr::from_str(&proxy_config.upstream_address) @@ -85,7 +204,7 @@ async fn main() { ); let diff_config = Arc::new(Mutex::new(proxy_config.upstream_difficulty_config.clone())); - + let cancellation_token_upstream = cancellation_token.clone(); // Instantiate a new `Upstream` (SV2 Pool) let upstream = match upstream_sv2::Upstream::new( upstream_addr, @@ -98,6 +217,7 @@ async fn main() { status::Sender::Upstream(tx_status.clone()), target.clone(), diff_config.clone(), + cancellation_token_upstream, ) .await { @@ -107,12 +227,12 @@ async fn main() { return; } }; - + let cancellation_token_init_task = cancellation_token.clone(); // Spawn a task to do all of this init work so that the main thread // can listen for signals and failures on the status channel. This // allows for the tproxy to fail gracefully if any of these init tasks //fail - task::spawn(async move { + let task = task::spawn(async move { // Connect to the SV2 Upstream role match upstream_sv2::Upstream::connect( upstream.clone(), @@ -152,6 +272,7 @@ async fn main() { async_std::task::sleep(std::time::Duration::from_millis(100)).await; } + let cancellation_token_bridge = cancellation_token_init_task.clone(); // Instantiate a new `Bridge` and begins handling incoming messages let b = proxy::Bridge::new( rx_sv1_downstream, @@ -163,6 +284,7 @@ async fn main() { extended_extranonce, target, up_id, + cancellation_token_bridge, ); proxy::Bridge::start(b.clone()); @@ -172,6 +294,7 @@ async fn main() { proxy_config.downstream_port, ); + let cancellation_token_downstream = cancellation_token_init_task.clone(); // Accept connections from one or more SV1 Downstream roles (SV1 Mining Devices) downstream_sv1::Downstream::accept_connections( downstream_addr, @@ -181,49 +304,13 @@ async fn main() { b, proxy_config.downstream_difficulty_config, diff_config, + cancellation_token_downstream, ); }); // End of init task - - debug!("Starting up signal listener"); - let mut interrupt_signal_future = Box::pin(tokio::signal::ctrl_c().fuse()); - debug!("Starting up status listener"); - - // Check all tasks if is_finished() is true, if so exit - loop { - let task_status = select! { - task_status = rx_status.recv().fuse() => task_status, - interrupt_signal = interrupt_signal_future => { - match interrupt_signal { - Ok(()) => { - info!("Interrupt received"); - }, - Err(err) => { - error!("Unable to listen for interrupt signal: {}", err); - // we also shut down in case of error - }, - } - break; - } - }; - let task_status: Status = task_status.unwrap(); - - match task_status.state { - // Should only be sent by the downstream listener - State::DownstreamShutdown(err) => { - error!("SHUTDOWN from: {}", err); - break; - } - State::BridgeShutdown(err) => { - error!("SHUTDOWN from: {}", err); - break; - } - State::UpstreamShutdown(err) => { - error!("SHUTDOWN from: {}", err); - break; - } - State::Healthy(msg) => { - info!("HEALTHY message: {}", msg); - } - } + tokio::select! { + _ = task => {}, + _ = cancellation_token.cancelled() => { + warn!("Shutting init task"); + }, } }