diff --git a/protocols/Cargo.lock b/protocols/Cargo.lock index fbe5ddceac..31450eea42 100644 --- a/protocols/Cargo.lock +++ b/protocols/Cargo.lock @@ -744,7 +744,7 @@ checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc" [[package]] name = "sv1_api" -version = "1.0.0" +version = "1.0.1" dependencies = [ "binary_sv2", "bitcoin_hashes 0.3.2", diff --git a/roles/Cargo.lock b/roles/Cargo.lock index fbaeadc6da..50dee08b42 100644 --- a/roles/Cargo.lock +++ b/roles/Cargo.lock @@ -2053,7 +2053,7 @@ dependencies = [ [[package]] name = "sv1_api" -version = "1.0.0" +version = "1.0.1" dependencies = [ "binary_sv2", "bitcoin_hashes 0.3.2", diff --git a/roles/translator/Cargo.toml b/roles/translator/Cargo.toml index 182370cbd4..6194c74d59 100644 --- a/roles/translator/Cargo.toml +++ b/roles/translator/Cargo.toml @@ -34,11 +34,11 @@ error_handling = { version = "1.0.0", path = "../../utils/error-handling" } key-utils = { version = "^1.0.0", path = "../../utils/key-utils" } tokio-util = { version = "0.7.10", features = ["codec"] } async-compat = "0.2.1" +rand = "0.8.4" [dev-dependencies] -rand = "0.8.4" sha2 = "0.10.6" [features] diff --git a/roles/translator/config-examples/tproxy-config-local-jdc-example.toml b/roles/translator/config-examples/tproxy-config-local-jdc-example.toml index ce11ca19ef..5fe4a8eebd 100644 --- a/roles/translator/config-examples/tproxy-config-local-jdc-example.toml +++ b/roles/translator/config-examples/tproxy-config-local-jdc-example.toml @@ -25,7 +25,7 @@ min_extranonce2_size = 8 # Difficulty params [downstream_difficulty_config] # hashes/s of the weakest miner that will be connecting (e.g.: 10 Th/s = 10_000_000_000_000.0) -min_individual_miner_hashrate=10_000_000.0 +min_individual_miner_hashrate=10_000_000_000_000.0 # target number of shares per minute the miner should be sending shares_per_minute = 6.0 @@ -33,4 +33,4 @@ shares_per_minute = 6.0 # interval in seconds to elapse before updating channel hashrate with the pool channel_diff_update_interval = 60 # estimated accumulated hashrate of all downstream miners (e.g.: 10 Th/s = 10_000_000_000_000.0) -channel_nominal_hashrate = 10_000_000.0 +channel_nominal_hashrate = 10_000_000_000_000.0 diff --git a/roles/translator/src/lib/downstream_sv1/downstream.rs b/roles/translator/src/lib/downstream_sv1/downstream.rs index 9e44a96fb9..f71e9b51ad 100644 --- a/roles/translator/src/lib/downstream_sv1/downstream.rs +++ b/roles/translator/src/lib/downstream_sv1/downstream.rs @@ -27,6 +27,7 @@ use futures::select; use tokio_util::codec::{FramedRead, LinesCodec}; use std::{net::SocketAddr, sync::Arc}; +use tokio_util::sync::CancellationToken; use tracing::{debug, info, warn}; use v1::{ client_to_server::{self, Submit}, @@ -110,6 +111,7 @@ impl Downstream { host: String, difficulty_config: DownstreamDifficultyConfig, upstream_difficulty_config: Arc>, + cancellation_token: CancellationToken, ) { let stream = std::sync::Arc::new(stream); @@ -150,11 +152,12 @@ impl Downstream { let rx_shutdown_clone = rx_shutdown.clone(); let tx_shutdown_clone = tx_shutdown.clone(); let tx_status_reader = tx_status.clone(); + let cancellation_token_mining_device = cancellation_token.clone(); // Task to read from SV1 Mining Device Client socket via `socket_reader`. Depending on the // SV1 message received, a message response is sent directly back to the SV1 Downstream // role, or the message is sent upwards to the Bridge for translation into a SV2 message // and then sent to the SV2 Upstream role. - let _socket_reader_task = task::spawn(async move { + let socket_reader_task = tokio::task::spawn(async move { let reader = BufReader::new(&*socket_reader); let mut messages = FramedRead::new( async_compat::Compat::new(reader), @@ -205,15 +208,24 @@ impl Downstream { kill(&tx_shutdown_clone).await; warn!("Downstream: Shutting down sv1 downstream reader"); }); + tokio::task::spawn(async move { + tokio::select! { + _ = cancellation_token_mining_device.cancelled() => { + socket_reader_task.abort(); + warn!("Shutting down sv1 downstream reader"); + }, + } + }); let rx_shutdown_clone = rx_shutdown.clone(); let tx_shutdown_clone = tx_shutdown.clone(); let tx_status_writer = tx_status.clone(); let host_ = host.clone(); + let cancellation_token_new_sv1_message_no_transl = cancellation_token.clone(); // Task to receive SV1 message responses to SV1 messages that do NOT need translation. // These response messages are sent directly to the SV1 Downstream role. - let _socket_writer_task = task::spawn(async move { + let socket_writer_task = tokio::task::spawn(async move { loop { select! { res = receiver_outgoing.recv().fuse() => { @@ -242,11 +254,20 @@ impl Downstream { &host_ ); }); + tokio::task::spawn(async move { + tokio::select! { + _ = cancellation_token_new_sv1_message_no_transl.cancelled() => { + socket_writer_task.abort(); + warn!("Shutting down sv1 downstream writer"); + }, + } + }); let tx_status_notify = tx_status; let self_ = downstream.clone(); - let _notify_task = task::spawn(async move { + let cancellation_token_notify_task = cancellation_token.clone(); + let notify_task = tokio::task::spawn(async move { let timeout_timer = std::time::Instant::now(); let mut first_sent = false; loop { @@ -329,10 +350,19 @@ impl Downstream { &host ); }); + tokio::task::spawn(async move { + tokio::select! { + _ = cancellation_token_notify_task.cancelled() => { + notify_task.abort(); + warn!("Shutting down sv1 downstream job notifier"); + }, + } + }); } /// Accept connections from one or more SV1 Downstream roles (SV1 Mining Devices) and create a /// new `Downstream` for each connection. + #[allow(clippy::too_many_arguments)] pub fn accept_connections( downstream_addr: SocketAddr, tx_sv1_submit: Sender, @@ -341,8 +371,11 @@ impl Downstream { bridge: Arc>, downstream_difficulty_config: DownstreamDifficultyConfig, upstream_difficulty_config: Arc>, + cancellation_token: CancellationToken, ) { - task::spawn(async move { + let cancellation_token_downstream = cancellation_token.clone(); + + let task = tokio::task::spawn(async move { let downstream_listener = TcpListener::bind(downstream_addr).await.unwrap(); let mut downstream_incoming = downstream_listener.incoming(); @@ -369,6 +402,7 @@ impl Downstream { host, downstream_difficulty_config.clone(), upstream_difficulty_config.clone(), + cancellation_token_downstream.clone(), ) .await; } @@ -378,6 +412,14 @@ impl Downstream { } } }); + tokio::task::spawn(async move { + tokio::select! { + _ = cancellation_token.cancelled() => { + task.abort(); + warn!("Shutting down accept connections task"); + }, + }; + }); } /// As SV1 messages come in, determines if the message response needs to be translated to SV2 diff --git a/roles/translator/src/lib/proxy/bridge.rs b/roles/translator/src/lib/proxy/bridge.rs index ee9ad43370..91a2d4b960 100644 --- a/roles/translator/src/lib/proxy/bridge.rs +++ b/roles/translator/src/lib/proxy/bridge.rs @@ -1,5 +1,4 @@ use async_channel::{Receiver, Sender}; -use async_std::task; use roles_logic_sv2::{ channel_logic::channel_factory::{ExtendedChannelKind, ProxyExtendedChannelFactory, Share}, mining_sv2::{ @@ -22,6 +21,7 @@ use super::super::{ }; use error_handling::handle_result; use roles_logic_sv2::{channel_logic::channel_factory::OnNewShare, Error as RolesLogicError}; +use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; /// Bridge between the SV2 `Upstream` and SV1 `Downstream` responsible for the following messaging @@ -64,6 +64,7 @@ pub struct Bridge { last_p_hash: Option>, target: Arc>>, last_job_id: u32, + cancellation_token: CancellationToken, } impl Bridge { @@ -79,6 +80,7 @@ impl Bridge { extranonces: ExtendedExtranonce, target: Arc>>, up_id: u32, + cancellation_token: CancellationToken, ) -> Arc> { let ids = Arc::new(Mutex::new(GroupId::new())); let share_per_min = 1.0; @@ -107,6 +109,7 @@ impl Bridge { last_p_hash: None, target, last_job_id: 0, + cancellation_token, })) } @@ -162,10 +165,12 @@ impl Bridge { /// Receives a `DownstreamMessages` message from the `Downstream`, handles based on the /// variant received. fn handle_downstream_messages(self_: Arc>) { + let cancellation_token_handle_downstream = + self_.safe_lock(|b| b.cancellation_token.clone()).unwrap(); let (rx_sv1_downstream, tx_status) = self_ .safe_lock(|s| (s.rx_sv1_downstream.clone(), s.tx_status.clone())) .unwrap(); - task::spawn(async move { + let task = tokio::task::spawn(async move { loop { let msg = handle_result!(tx_status, rx_sv1_downstream.clone().recv().await); @@ -185,6 +190,14 @@ impl Bridge { }; } }); + tokio::task::spawn(async move { + tokio::select! { + _ = cancellation_token_handle_downstream.cancelled() => { + task.abort(); + warn!("Shutting down handle_result task"); + }, + } + }); } /// receives a `SetDownstreamTarget` and updates the downstream target for the channel #[allow(clippy::result_large_err)] @@ -367,6 +380,8 @@ impl Bridge { /// corresponding `job_id` has already been received. If this is not the case, an error has /// occurred on the Upstream pool role and the connection will close. fn handle_new_prev_hash(self_: Arc>) { + let cancellation_token_handle_prev_hash = + self_.safe_lock(|b| b.cancellation_token.clone()).unwrap(); let (tx_sv1_notify, rx_sv2_set_new_prev_hash, tx_status) = self_ .safe_lock(|s| { ( @@ -377,7 +392,7 @@ impl Bridge { }) .unwrap(); debug!("Starting handle_new_prev_hash task"); - task::spawn(async move { + let task = tokio::task::spawn(async move { loop { // Receive `SetNewPrevHash` from `Upstream` let sv2_set_new_prev_hash: SetNewPrevHash = @@ -397,6 +412,14 @@ impl Bridge { ) } }); + tokio::task::spawn(async move { + tokio::select! { + _ = cancellation_token_handle_prev_hash.cancelled() => { + task.abort(); + warn!("Shutting down handle_new_prev_hash"); + } + } + }); } async fn handle_new_extended_mining_job_( @@ -460,6 +483,8 @@ impl Bridge { /// `SetNewPrevHash` `job_id`, an error has occurred on the Upstream pool role and the /// connection will close. fn handle_new_extended_mining_job(self_: Arc>) { + let cancellation_token_new_extended_mining_job = + self_.safe_lock(|b| b.cancellation_token.clone()).unwrap(); let (tx_sv1_notify, rx_sv2_new_ext_mining_job, tx_status) = self_ .safe_lock(|s| { ( @@ -470,7 +495,7 @@ impl Bridge { }) .unwrap(); debug!("Starting handle_new_extended_mining_job task"); - task::spawn(async move { + let task = tokio::task::spawn(async move { loop { // Receive `NewExtendedMiningJob` from `Upstream` let sv2_new_extended_mining_job: NewExtendedMiningJob = handle_result!( @@ -494,6 +519,14 @@ impl Bridge { .store(true, std::sync::atomic::Ordering::SeqCst); } }); + tokio::task::spawn(async move { + tokio::select! { + _ = cancellation_token_new_extended_mining_job.cancelled() => { + task.abort(); + warn!("Task handle_new_extended_mining_job cancelled"); + } + } + }); } } pub struct OpenSv1Downstream { @@ -543,6 +576,7 @@ mod test { rx_sv1_notify, }; + let cancellation_token = CancellationToken::new(); let b = Bridge::new( rx_sv1_submit, tx_sv2_submit_shares_ext, @@ -553,6 +587,7 @@ mod test { extranonces, Arc::new(Mutex::new(upstream_target)), 1, + cancellation_token, ); (b, interface) } diff --git a/roles/translator/src/lib/upstream_sv2/upstream.rs b/roles/translator/src/lib/upstream_sv2/upstream.rs index f6d192f75e..f19842ceac 100644 --- a/roles/translator/src/lib/upstream_sv2/upstream.rs +++ b/roles/translator/src/lib/upstream_sv2/upstream.rs @@ -9,7 +9,7 @@ use crate::{ upstream_sv2::{EitherFrame, Message, StdFrame, UpstreamConnection}, }; use async_channel::{Receiver, Sender}; -use async_std::{net::TcpStream, task}; +use async_std::net::TcpStream; use binary_sv2::u256_from_int; use codec_sv2::{Frame, HandshakeRole, Initiator}; use error_handling::handle_result; @@ -36,9 +36,9 @@ use roles_logic_sv2::{ use std::{ net::SocketAddr, sync::{atomic::AtomicBool, Arc}, - thread::sleep, - time::Duration, }; +use tokio::time::{sleep, Duration}; +use tokio_util::sync::CancellationToken; use tracing::{error, info, warn}; use stratum_common::bitcoin::BlockHash; @@ -98,6 +98,7 @@ pub struct Upstream { // and the upstream just needs to occasionally check if it has changed more than // than the configured percentage pub(super) difficulty_config: Arc>, + cancellation_token: CancellationToken, } impl PartialEq for Upstream { @@ -124,6 +125,7 @@ impl Upstream { tx_status: status::Sender, target: Arc>>, difficulty_config: Arc>, + cancellation_token: CancellationToken, ) -> ProxyResult<'static, Arc>> { // Connect to the SV2 Upstream role retry connection every 5 seconds. let socket = loop { @@ -135,7 +137,7 @@ impl Upstream { address, e ); - sleep(Duration::from_secs(5)); + sleep(Duration::from_secs(5)).await; } } }; @@ -171,6 +173,7 @@ impl Upstream { tx_status, target, difficulty_config, + cancellation_token, }))) } @@ -259,6 +262,9 @@ impl Upstream { #[allow(clippy::result_large_err)] pub fn parse_incoming(self_: Arc>) -> ProxyResult<'static, ()> { let clone = self_.clone(); + let cancellation_token = self_.safe_lock(|s| s.cancellation_token.clone()).unwrap(); + let token1 = cancellation_token.clone(); + let token2 = cancellation_token.clone(); let ( tx_frame, tx_sv2_extranonce, @@ -281,16 +287,24 @@ impl Upstream { { let self_ = self_.clone(); let tx_status = tx_status.clone(); - task::spawn(async move { + let task = tokio::task::spawn(async move { // No need to start diff management immediatly - async_std::task::sleep(Duration::from_secs(10)).await; + sleep(Duration::from_secs(10)).await; loop { handle_result!(tx_status, Self::try_update_hashrate(self_.clone()).await); } }); + tokio::task::spawn(async move { + tokio::select! { + _ = token1.cancelled() => { + task.abort(); + warn!("Shutting down handle result task"); + }, + } + }); } - task::spawn(async move { + let task = tokio::task::spawn(async move { loop { // Waiting to receive a message from the SV2 Upstream role let incoming = handle_result!(tx_status, recv.recv().await); @@ -433,6 +447,14 @@ impl Upstream { } } }); + tokio::task::spawn(async move { + tokio::select! { + _ = token2.cancelled() => { + task.abort(); + warn!("Shutting down parse incoming task"); + }, + } + }); Ok(()) } @@ -459,6 +481,7 @@ impl Upstream { #[allow(clippy::result_large_err)] pub fn handle_submit(self_: Arc>) -> ProxyResult<'static, ()> { + let cancellation_token = self_.safe_lock(|s| s.cancellation_token.clone()).unwrap(); let clone = self_.clone(); let (tx_frame, receiver, tx_status) = clone .safe_lock(|s| { @@ -470,7 +493,7 @@ impl Upstream { }) .map_err(|_| PoisonLock)?; - task::spawn(async move { + let task = tokio::task::spawn(async move { loop { let mut sv2_submit: SubmitSharesExtended = handle_result!(tx_status, receiver.recv().await); @@ -506,6 +529,15 @@ impl Upstream { ); } }); + tokio::task::spawn(async move { + tokio::select! { + _ = cancellation_token.cancelled() => { + task.abort(); + info!("Shutting down handle submit task"); + }, + } + }); + Ok(()) } diff --git a/roles/translator/src/main.rs b/roles/translator/src/main.rs index c1307a5a2f..3f6f91c0fe 100644 --- a/roles/translator/src/main.rs +++ b/roles/translator/src/main.rs @@ -3,24 +3,29 @@ mod args; mod lib; use args::Args; +use async_channel::{bounded, unbounded, Receiver, Sender}; +use downstream_sv1::DownstreamMessages; use error::{Error, ProxyResult}; +use futures::{select, FutureExt}; use lib::{downstream_sv1, error, proxy, proxy_config, status, upstream_sv2}; use proxy_config::ProxyConfig; -use roles_logic_sv2::utils::Mutex; - -use async_channel::{bounded, unbounded}; -use futures::{select, FutureExt}; +use rand::Rng; +use roles_logic_sv2::{ + mining_sv2::{ExtendedExtranonce, NewExtendedMiningJob, SetNewPrevHash, SubmitSharesExtended}, + utils::Mutex, +}; use std::{ net::{IpAddr, SocketAddr}, str::FromStr, sync::Arc, }; -use tokio::{sync::broadcast, task}; +use tokio::{sync::broadcast, task, time::Duration}; +use tokio_util::sync::CancellationToken; use v1::server_to_client; use crate::status::{State, Status}; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, warn}; /// Process CLI args, if any. #[allow(clippy::result_large_err)] fn process_cli_args<'a>() -> ProxyResult<'a, ProxyConfig> { @@ -77,6 +82,120 @@ async fn main() { broadcast::Receiver, ) = broadcast::channel(10); + let cancellation_token = CancellationToken::new(); + start( + rx_sv2_submit_shares_ext.clone(), + tx_sv2_submit_shares_ext.clone(), + tx_sv2_new_ext_mining_job.clone(), + tx_sv2_set_new_prev_hash.clone(), + tx_sv2_extranonce.clone(), + rx_sv2_extranonce.clone(), + rx_sv2_set_new_prev_hash.clone(), + rx_sv2_new_ext_mining_job.clone(), + rx_sv1_downstream.clone(), + tx_sv1_bridge.clone(), + tx_sv1_notify.clone(), + target.clone(), + tx_status.clone(), + cancellation_token.clone(), + ) + .await; + + debug!("Starting up signal listener"); + + let mut interrupt_signal_future = Box::pin(tokio::signal::ctrl_c().fuse()); + debug!("Starting up status listener"); + // Check all tasks if is_finished() is true, if so exit + loop { + let task_status = select! { + task_status = rx_status.recv().fuse() => task_status, + interrupt_signal = interrupt_signal_future => { + match interrupt_signal { + Ok(()) => { + info!("Interrupt received"); + }, + Err(err) => { + error!("Unable to listen for interrupt signal: {}", err); + // we also shut down in case of error + }, + } + break; + } + }; + let task_status: Status = task_status.unwrap(); + + match task_status.state { + // Should only be sent by the downstream listener + State::DownstreamShutdown(err) => { + error!("SHUTDOWN from: {}", err); + break; + } + State::BridgeShutdown(err) => { + error!("SHUTDOWN from: {}", err); + break; + } + State::UpstreamShutdown(err) => { + error!("SHUTDOWN from: {}", err); + cancellation_token.clone().cancel(); + + // wait a random amount of time between 0 and 3000ms + // if all the downstreams try to reconnect at the same time, the upstream may fail + tokio::time::sleep(Duration::from_millis(1000)).await; + let mut rng = rand::thread_rng(); + let wait_time = rng.gen_range(0..=3000); + tokio::time::sleep(Duration::from_millis(wait_time)).await; + + // create a new token + let cancellation_token = CancellationToken::new(); + + error!("Trying recconnecting to upstream"); + start( + rx_sv2_submit_shares_ext.clone(), + tx_sv2_submit_shares_ext.clone(), + tx_sv2_new_ext_mining_job.clone(), + tx_sv2_set_new_prev_hash.clone(), + tx_sv2_extranonce.clone(), + rx_sv2_extranonce.clone(), + rx_sv2_set_new_prev_hash.clone(), + rx_sv2_new_ext_mining_job.clone(), + rx_sv1_downstream.clone(), + tx_sv1_bridge.clone(), + tx_sv1_notify.clone(), + target.clone(), + tx_status.clone(), + cancellation_token.clone(), + ) + .await; + } + State::Healthy(msg) => { + info!("HEALTHY message: {}", msg); + } + } + } +} + +#[allow(clippy::too_many_arguments)] +async fn start<'a>( + rx_sv2_submit_shares_ext: Receiver>, + tx_sv2_submit_shares_ext: Sender>, + tx_sv2_new_ext_mining_job: Sender>, + tx_sv2_set_new_prev_hash: Sender>, + tx_sv2_extranonce: Sender<(ExtendedExtranonce, u32)>, + rx_sv2_extranonce: Receiver<(ExtendedExtranonce, u32)>, + rx_sv2_set_new_prev_hash: Receiver>, + rx_sv2_new_ext_mining_job: Receiver>, + rx_sv1_downstream: Receiver, + tx_sv1_bridge: Sender, + tx_sv1_notify: broadcast::Sender>, + target: Arc>>, + tx_status: async_channel::Sender>, + cancellation_token: CancellationToken, +) { + let proxy_config = match process_cli_args() { + Ok(p) => p, + Err(e) => panic!("failed to load config: {}", e), + }; + info!("Proxy Config: {:?}", &proxy_config); // Format `Upstream` connection address let upstream_addr = SocketAddr::new( IpAddr::from_str(&proxy_config.upstream_address) @@ -85,7 +204,7 @@ async fn main() { ); let diff_config = Arc::new(Mutex::new(proxy_config.upstream_difficulty_config.clone())); - + let cancellation_token_upstream = cancellation_token.clone(); // Instantiate a new `Upstream` (SV2 Pool) let upstream = match upstream_sv2::Upstream::new( upstream_addr, @@ -98,6 +217,7 @@ async fn main() { status::Sender::Upstream(tx_status.clone()), target.clone(), diff_config.clone(), + cancellation_token_upstream, ) .await { @@ -107,12 +227,12 @@ async fn main() { return; } }; - + let cancellation_token_init_task = cancellation_token.clone(); // Spawn a task to do all of this init work so that the main thread // can listen for signals and failures on the status channel. This // allows for the tproxy to fail gracefully if any of these init tasks //fail - task::spawn(async move { + let task = task::spawn(async move { // Connect to the SV2 Upstream role match upstream_sv2::Upstream::connect( upstream.clone(), @@ -152,6 +272,7 @@ async fn main() { async_std::task::sleep(std::time::Duration::from_millis(100)).await; } + let cancellation_token_bridge = cancellation_token_init_task.clone(); // Instantiate a new `Bridge` and begins handling incoming messages let b = proxy::Bridge::new( rx_sv1_downstream, @@ -163,6 +284,7 @@ async fn main() { extended_extranonce, target, up_id, + cancellation_token_bridge, ); proxy::Bridge::start(b.clone()); @@ -172,6 +294,7 @@ async fn main() { proxy_config.downstream_port, ); + let cancellation_token_downstream = cancellation_token_init_task.clone(); // Accept connections from one or more SV1 Downstream roles (SV1 Mining Devices) downstream_sv1::Downstream::accept_connections( downstream_addr, @@ -181,49 +304,13 @@ async fn main() { b, proxy_config.downstream_difficulty_config, diff_config, + cancellation_token_downstream, ); }); // End of init task - - debug!("Starting up signal listener"); - let mut interrupt_signal_future = Box::pin(tokio::signal::ctrl_c().fuse()); - debug!("Starting up status listener"); - - // Check all tasks if is_finished() is true, if so exit - loop { - let task_status = select! { - task_status = rx_status.recv().fuse() => task_status, - interrupt_signal = interrupt_signal_future => { - match interrupt_signal { - Ok(()) => { - info!("Interrupt received"); - }, - Err(err) => { - error!("Unable to listen for interrupt signal: {}", err); - // we also shut down in case of error - }, - } - break; - } - }; - let task_status: Status = task_status.unwrap(); - - match task_status.state { - // Should only be sent by the downstream listener - State::DownstreamShutdown(err) => { - error!("SHUTDOWN from: {}", err); - break; - } - State::BridgeShutdown(err) => { - error!("SHUTDOWN from: {}", err); - break; - } - State::UpstreamShutdown(err) => { - error!("SHUTDOWN from: {}", err); - break; - } - State::Healthy(msg) => { - info!("HEALTHY message: {}", msg); - } - } + tokio::select! { + _ = task => {}, + _ = cancellation_token.cancelled() => { + warn!("Shutting init task"); + }, } }