From a74262aed1fbc2b96e6149a1a1f453d14dfbd637 Mon Sep 17 00:00:00 2001 From: lorban Date: Mon, 10 Jun 2024 22:56:16 +0200 Subject: [PATCH] TProxy restart if gests disconnected by upstream - Add `start` function and put starting logic there. - Every AbortHandle of each task is stored in a collector, which is a mutex. - Added `kill_tasks` function that takes in input this mutex, access it, pop each JoinHandle and kill the corresponding task. When receive an UpstreamShutdown does the following: 1. waits a random amount of time s 0>, + task_collector: Arc>>, ) { let stream = std::sync::Arc::new(stream); @@ -150,11 +151,12 @@ impl Downstream { let rx_shutdown_clone = rx_shutdown.clone(); let tx_shutdown_clone = tx_shutdown.clone(); let tx_status_reader = tx_status.clone(); + let task_collector_mining_device = task_collector.clone(); // Task to read from SV1 Mining Device Client socket via `socket_reader`. Depending on the // SV1 message received, a message response is sent directly back to the SV1 Downstream // role, or the message is sent upwards to the Bridge for translation into a SV2 message // and then sent to the SV2 Upstream role. - let _socket_reader_task = task::spawn(async move { + let socket_reader_task = tokio::task::spawn(async move { let reader = BufReader::new(&*socket_reader); let mut messages = FramedRead::new( async_compat::Compat::new(reader), @@ -205,15 +207,22 @@ impl Downstream { kill(&tx_shutdown_clone).await; warn!("Downstream: Shutting down sv1 downstream reader"); }); + let _ = task_collector_mining_device.safe_lock(|a| { + a.push(( + socket_reader_task.abort_handle(), + "socket_reader_task".to_string(), + )) + }); let rx_shutdown_clone = rx_shutdown.clone(); let tx_shutdown_clone = tx_shutdown.clone(); let tx_status_writer = tx_status.clone(); let host_ = host.clone(); + let task_collector_new_sv1_message_no_transl = task_collector.clone(); // Task to receive SV1 message responses to SV1 messages that do NOT need translation. // These response messages are sent directly to the SV1 Downstream role. - let _socket_writer_task = task::spawn(async move { + let socket_writer_task = tokio::task::spawn(async move { loop { select! { res = receiver_outgoing.recv().fuse() => { @@ -242,11 +251,18 @@ impl Downstream { &host_ ); }); + let _ = task_collector_new_sv1_message_no_transl.safe_lock(|a| { + a.push(( + socket_writer_task.abort_handle(), + "socket_writer_task".to_string(), + )) + }); let tx_status_notify = tx_status; let self_ = downstream.clone(); - let _notify_task = task::spawn(async move { + let task_collector_notify_task = task_collector.clone(); + let notify_task = tokio::task::spawn(async move { let timeout_timer = std::time::Instant::now(); let mut first_sent = false; loop { @@ -329,10 +345,14 @@ impl Downstream { &host ); }); + + let _ = task_collector_notify_task + .safe_lock(|a| a.push((notify_task.abort_handle(), "notify_task".to_string()))); } /// Accept connections from one or more SV1 Downstream roles (SV1 Mining Devices) and create a /// new `Downstream` for each connection. + #[allow(clippy::too_many_arguments)] pub fn accept_connections( downstream_addr: SocketAddr, tx_sv1_submit: Sender, @@ -341,8 +361,11 @@ impl Downstream { bridge: Arc>, downstream_difficulty_config: DownstreamDifficultyConfig, upstream_difficulty_config: Arc>, + task_collector: Arc>>, ) { - task::spawn(async move { + let task_collector_downstream = task_collector.clone(); + + let accept_connections = tokio::task::spawn(async move { let downstream_listener = TcpListener::bind(downstream_addr).await.unwrap(); let mut downstream_incoming = downstream_listener.incoming(); @@ -369,6 +392,7 @@ impl Downstream { host, downstream_difficulty_config.clone(), upstream_difficulty_config.clone(), + task_collector_downstream.clone(), ) .await; } @@ -378,6 +402,12 @@ impl Downstream { } } }); + let _ = task_collector.safe_lock(|a| { + a.push(( + accept_connections.abort_handle(), + "accept_connections".to_string(), + )) + }); } /// As SV1 messages come in, determines if the message response needs to be translated to SV2 diff --git a/roles/translator/src/lib/proxy/bridge.rs b/roles/translator/src/lib/proxy/bridge.rs index ee9ad43370..74db21111e 100644 --- a/roles/translator/src/lib/proxy/bridge.rs +++ b/roles/translator/src/lib/proxy/bridge.rs @@ -1,5 +1,4 @@ use async_channel::{Receiver, Sender}; -use async_std::task; use roles_logic_sv2::{ channel_logic::channel_factory::{ExtendedChannelKind, ProxyExtendedChannelFactory, Share}, mining_sv2::{ @@ -9,7 +8,7 @@ use roles_logic_sv2::{ utils::{GroupId, Mutex}, }; use std::sync::Arc; -use tokio::sync::broadcast; +use tokio::{sync::broadcast, task::AbortHandle}; use v1::{client_to_server::Submit, server_to_client, utils::HexU32Be}; use super::super::{ @@ -64,6 +63,7 @@ pub struct Bridge { last_p_hash: Option>, target: Arc>>, last_job_id: u32, + task_collector: Arc>>, } impl Bridge { @@ -79,6 +79,7 @@ impl Bridge { extranonces: ExtendedExtranonce, target: Arc>>, up_id: u32, + task_collector: Arc>>, ) -> Arc> { let ids = Arc::new(Mutex::new(GroupId::new())); let share_per_min = 1.0; @@ -107,6 +108,7 @@ impl Bridge { last_p_hash: None, target, last_job_id: 0, + task_collector, })) } @@ -162,10 +164,12 @@ impl Bridge { /// Receives a `DownstreamMessages` message from the `Downstream`, handles based on the /// variant received. fn handle_downstream_messages(self_: Arc>) { + let task_collector_handle_downstream = + self_.safe_lock(|b| b.task_collector.clone()).unwrap(); let (rx_sv1_downstream, tx_status) = self_ .safe_lock(|s| (s.rx_sv1_downstream.clone(), s.tx_status.clone())) .unwrap(); - task::spawn(async move { + let handle_downstream = tokio::task::spawn(async move { loop { let msg = handle_result!(tx_status, rx_sv1_downstream.clone().recv().await); @@ -185,6 +189,12 @@ impl Bridge { }; } }); + let _ = task_collector_handle_downstream.safe_lock(|a| { + a.push(( + handle_downstream.abort_handle(), + "handle_downstream_message".to_string(), + )) + }); } /// receives a `SetDownstreamTarget` and updates the downstream target for the channel #[allow(clippy::result_large_err)] @@ -367,6 +377,8 @@ impl Bridge { /// corresponding `job_id` has already been received. If this is not the case, an error has /// occurred on the Upstream pool role and the connection will close. fn handle_new_prev_hash(self_: Arc>) { + let task_collector_handle_new_prev_hash = + self_.safe_lock(|b| b.task_collector.clone()).unwrap(); let (tx_sv1_notify, rx_sv2_set_new_prev_hash, tx_status) = self_ .safe_lock(|s| { ( @@ -377,7 +389,7 @@ impl Bridge { }) .unwrap(); debug!("Starting handle_new_prev_hash task"); - task::spawn(async move { + let handle_new_prev_hash = tokio::task::spawn(async move { loop { // Receive `SetNewPrevHash` from `Upstream` let sv2_set_new_prev_hash: SetNewPrevHash = @@ -397,6 +409,12 @@ impl Bridge { ) } }); + let _ = task_collector_handle_new_prev_hash.safe_lock(|a| { + a.push(( + handle_new_prev_hash.abort_handle(), + "handle_new_prev_hash".to_string(), + )) + }); } async fn handle_new_extended_mining_job_( @@ -460,6 +478,8 @@ impl Bridge { /// `SetNewPrevHash` `job_id`, an error has occurred on the Upstream pool role and the /// connection will close. fn handle_new_extended_mining_job(self_: Arc>) { + let task_collector_new_extended_mining_job = + self_.safe_lock(|b| b.task_collector.clone()).unwrap(); let (tx_sv1_notify, rx_sv2_new_ext_mining_job, tx_status) = self_ .safe_lock(|s| { ( @@ -470,7 +490,7 @@ impl Bridge { }) .unwrap(); debug!("Starting handle_new_extended_mining_job task"); - task::spawn(async move { + let handle_new_extended_mining_job = tokio::task::spawn(async move { loop { // Receive `NewExtendedMiningJob` from `Upstream` let sv2_new_extended_mining_job: NewExtendedMiningJob = handle_result!( @@ -494,6 +514,12 @@ impl Bridge { .store(true, std::sync::atomic::Ordering::SeqCst); } }); + let _ = task_collector_new_extended_mining_job.safe_lock(|a| { + a.push(( + handle_new_extended_mining_job.abort_handle(), + "handle_new_extended_mining_job".to_string(), + )) + }); } } pub struct OpenSv1Downstream { @@ -543,6 +569,7 @@ mod test { rx_sv1_notify, }; + let task_collector = Arc::new(Mutex::new(vec![])); let b = Bridge::new( rx_sv1_submit, tx_sv2_submit_shares_ext, @@ -553,6 +580,7 @@ mod test { extranonces, Arc::new(Mutex::new(upstream_target)), 1, + task_collector, ); (b, interface) } diff --git a/roles/translator/src/lib/status.rs b/roles/translator/src/lib/status.rs index 4cdd770e22..e8af6883e1 100644 --- a/roles/translator/src/lib/status.rs +++ b/roles/translator/src/lib/status.rs @@ -48,6 +48,7 @@ pub enum State<'a> { DownstreamShutdown(Error<'a>), BridgeShutdown(Error<'a>), UpstreamShutdown(Error<'a>), + UpstreamTryReconnect(Error<'a>), Healthy(String), } @@ -83,13 +84,22 @@ async fn send_status( .await .unwrap_or(()); } - Sender::Upstream(tx) => { - tx.send(Status { - state: State::UpstreamShutdown(e), - }) - .await - .unwrap_or(()); - } + Sender::Upstream(tx) => match e { + Error::ChannelErrorReceiver(_) => { + tx.send(Status { + state: State::UpstreamTryReconnect(e), + }) + .await + .unwrap_or(()); + } + _ => { + tx.send(Status { + state: State::UpstreamShutdown(e), + }) + .await + .unwrap_or(()); + } + }, Sender::TemplateReceiver(tx) => { tx.send(Status { state: State::UpstreamShutdown(e), diff --git a/roles/translator/src/lib/upstream_sv2/upstream.rs b/roles/translator/src/lib/upstream_sv2/upstream.rs index 6aab5978e4..6135174328 100644 --- a/roles/translator/src/lib/upstream_sv2/upstream.rs +++ b/roles/translator/src/lib/upstream_sv2/upstream.rs @@ -9,7 +9,7 @@ use crate::{ upstream_sv2::{EitherFrame, Message, StdFrame, UpstreamConnection}, }; use async_channel::{Receiver, Sender}; -use async_std::{net::TcpStream, task}; +use async_std::net::TcpStream; use binary_sv2::u256_from_int; use codec_sv2::{HandshakeRole, Initiator}; use error_handling::handle_result; @@ -36,8 +36,10 @@ use roles_logic_sv2::{ use std::{ net::SocketAddr, sync::{atomic::AtomicBool, Arc}, - thread::sleep, - time::Duration, +}; +use tokio::{ + task::AbortHandle, + time::{sleep, Duration}, }; use tracing::{error, info, warn}; @@ -98,6 +100,7 @@ pub struct Upstream { // and the upstream just needs to occasionally check if it has changed more than // than the configured percentage pub(super) difficulty_config: Arc>, + task_collector: Arc>>, } impl PartialEq for Upstream { @@ -124,6 +127,7 @@ impl Upstream { tx_status: status::Sender, target: Arc>>, difficulty_config: Arc>, + task_collector: Arc>>, ) -> ProxyResult<'static, Arc>> { // Connect to the SV2 Upstream role retry connection every 5 seconds. let socket = loop { @@ -135,7 +139,7 @@ impl Upstream { address, e ); - sleep(Duration::from_secs(5)); + sleep(Duration::from_secs(5)).await; } } }; @@ -171,6 +175,7 @@ impl Upstream { tx_status, target, difficulty_config, + task_collector, }))) } @@ -259,6 +264,9 @@ impl Upstream { #[allow(clippy::result_large_err)] pub fn parse_incoming(self_: Arc>) -> ProxyResult<'static, ()> { let clone = self_.clone(); + let task_collector = self_.safe_lock(|s| s.task_collector.clone()).unwrap(); + let collector1 = task_collector.clone(); + let collector2 = task_collector.clone(); let ( tx_frame, tx_sv2_extranonce, @@ -281,16 +289,22 @@ impl Upstream { { let self_ = self_.clone(); let tx_status = tx_status.clone(); - task::spawn(async move { + let start_diff_management = tokio::task::spawn(async move { // No need to start diff management immediatly - async_std::task::sleep(Duration::from_secs(10)).await; + sleep(Duration::from_secs(10)).await; loop { handle_result!(tx_status, Self::try_update_hashrate(self_.clone()).await); } }); + let _ = collector1.safe_lock(|a| { + a.push(( + start_diff_management.abort_handle(), + "start_diff_management".to_string(), + )) + }); } - task::spawn(async move { + let parse_incoming = tokio::task::spawn(async move { loop { // Waiting to receive a message from the SV2 Upstream role let incoming = handle_result!(tx_status, recv.recv().await); @@ -433,6 +447,8 @@ impl Upstream { } } }); + let _ = collector2 + .safe_lock(|a| a.push((parse_incoming.abort_handle(), "parse_incoming".to_string()))); Ok(()) } @@ -459,6 +475,7 @@ impl Upstream { #[allow(clippy::result_large_err)] pub fn handle_submit(self_: Arc>) -> ProxyResult<'static, ()> { + let task_collector = self_.safe_lock(|s| s.task_collector.clone()).unwrap(); let clone = self_.clone(); let (tx_frame, receiver, tx_status) = clone .safe_lock(|s| { @@ -470,7 +487,7 @@ impl Upstream { }) .map_err(|_| PoisonLock)?; - task::spawn(async move { + let handle_submit = tokio::task::spawn(async move { loop { let mut sv2_submit: SubmitSharesExtended = handle_result!(tx_status, receiver.recv().await); @@ -506,6 +523,9 @@ impl Upstream { ); } }); + let _ = task_collector + .safe_lock(|a| a.push((handle_submit.abort_handle(), "handle_submit".to_string()))); + Ok(()) } diff --git a/roles/translator/src/main.rs b/roles/translator/src/main.rs index f958c3fc83..fc8e91de94 100644 --- a/roles/translator/src/main.rs +++ b/roles/translator/src/main.rs @@ -3,13 +3,13 @@ mod args; mod lib; use args::Args; +use async_channel::{bounded, unbounded}; use error::{Error, ProxyResult}; +use futures::{select, FutureExt}; use lib::{downstream_sv1, error, proxy, proxy_config, status, upstream_sv2}; use proxy_config::ProxyConfig; +use rand::Rng; use roles_logic_sv2::utils::Mutex; - -use async_channel::{bounded, unbounded}; -use futures::{select, FutureExt}; use std::{ net::{IpAddr, SocketAddr}, str::FromStr, @@ -17,11 +17,11 @@ use std::{ }; use ext_config::{Config, File, FileFormat}; -use tokio::{sync::broadcast, task}; +use tokio::{sync::broadcast, task, task::AbortHandle, time::Duration}; use v1::server_to_client; use crate::status::{State, Status}; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, warn}; /// Process CLI args, if any. #[allow(clippy::result_large_err)] fn process_cli_args<'a>() -> ProxyResult<'a, ProxyConfig> { @@ -54,22 +54,121 @@ async fn main() { Ok(p) => p, Err(e) => panic!("failed to load config: {}", e), }; - info!("PC: {:?}", &proxy_config); + info!("Proxy Config: {:?}", &proxy_config); let (tx_status, rx_status) = unbounded(); - // `tx_sv1_bridge` sender is used by `Downstream` to send a `DownstreamMessages` message to - // `Bridge` via the `rx_sv1_downstream` receiver - // (Sender, Receiver) - let (tx_sv1_bridge, rx_sv1_downstream) = unbounded(); + let target = Arc::new(Mutex::new(vec![0; 32])); + + // Sender/Receiver to send SV1 `mining.notify` message from the `Bridge` to the `Downstream` + let (tx_sv1_notify, _rx_sv1_notify): ( + broadcast::Sender, + broadcast::Receiver, + ) = broadcast::channel(10); + + let task_collector: Arc>> = Arc::new(Mutex::new(Vec::new())); + start( + tx_sv1_notify.clone(), + target.clone(), + tx_status.clone(), + task_collector.clone(), + proxy_config.clone(), + ) + .await; + + debug!("Starting up signal listener"); + let task_collector_ = task_collector.clone(); + + let mut interrupt_signal_future = Box::pin(tokio::signal::ctrl_c().fuse()); + debug!("Starting up status listener"); + // Check all tasks if is_finished() is true, if so exit + loop { + let task_status = select! { + task_status = rx_status.recv().fuse() => task_status, + interrupt_signal = interrupt_signal_future => { + match interrupt_signal { + Ok(()) => { + info!("Interrupt received"); + }, + Err(err) => { + error!("Unable to listen for interrupt signal: {}", err); + // we also shut down in case of error + }, + } + break; + } + }; + let task_status: Status = task_status.unwrap(); + + match task_status.state { + // Should only be sent by the downstream listener + State::DownstreamShutdown(err) => { + error!("SHUTDOWN from: {}", err); + break; + } + State::BridgeShutdown(err) => { + error!("SHUTDOWN from: {}", err); + break; + } + State::UpstreamShutdown(err) => { + error!("SHUTDOWN from: {}", err); + break; + } + State::UpstreamTryReconnect(err) => { + error!("SHUTDOWN from: {}", err); + + // wait a random amount of time between 0 and 3000ms + // if all the downstreams try to reconnect at the same time, the upstream may fail + let mut rng = rand::thread_rng(); + let wait_time = rng.gen_range(0..=3000); + tokio::time::sleep(Duration::from_millis(wait_time)).await; + + // kill al the tasks + let task_collector_aborting = task_collector_.clone(); + kill_tasks(task_collector_aborting.clone()); + + warn!("Trying reconnecting to upstream"); + start( + tx_sv1_notify.clone(), + target.clone(), + tx_status.clone(), + task_collector_.clone(), + proxy_config.clone(), + ) + .await; + } + State::Healthy(msg) => { + info!("HEALTHY message: {}", msg); + } + } + } +} + +fn kill_tasks(task_collector: Arc>>) { + let _ = task_collector.safe_lock(|t| { + while let Some(handle) = t.pop() { + handle.0.abort(); + warn!("Killed task: {:?}", handle.1); + } + }); +} + +async fn start<'a>( + tx_sv1_notify: broadcast::Sender>, + target: Arc>>, + tx_status: async_channel::Sender>, + task_collector: Arc>>, + proxy_config: ProxyConfig, +) { // Sender/Receiver to send a SV2 `SubmitSharesExtended` from the `Bridge` to the `Upstream` // (Sender>, Receiver>) let (tx_sv2_submit_shares_ext, rx_sv2_submit_shares_ext) = bounded(10); - // Sender/Receiver to send a SV2 `SetNewPrevHash` message from the `Upstream` to the `Bridge` - // (Sender>, Receiver>) - let (tx_sv2_set_new_prev_hash, rx_sv2_set_new_prev_hash) = bounded(10); + // `tx_sv1_bridge` sender is used by `Downstream` to send a `DownstreamMessages` message to + // `Bridge` via the `rx_sv1_downstream` receiver + // (Sender, Receiver) + let (tx_sv1_bridge, rx_sv1_downstream) = unbounded(); // Sender/Receiver to send a SV2 `NewExtendedMiningJob` message from the `Upstream` to the // `Bridge` @@ -80,13 +179,10 @@ async fn main() { // passed to the `Downstream` upon a Downstream role connection // (Sender, Receiver) let (tx_sv2_extranonce, rx_sv2_extranonce) = bounded(1); - let target = Arc::new(Mutex::new(vec![0; 32])); - // Sender/Receiver to send SV1 `mining.notify` message from the `Bridge` to the `Downstream` - let (tx_sv1_notify, _rx_sv1_notify): ( - broadcast::Sender, - broadcast::Receiver, - ) = broadcast::channel(10); + // Sender/Receiver to send a SV2 `SetNewPrevHash` message from the `Upstream` to the `Bridge` + // (Sender>, Receiver>) + let (tx_sv2_set_new_prev_hash, rx_sv2_set_new_prev_hash) = bounded(10); // Format `Upstream` connection address let upstream_addr = SocketAddr::new( @@ -96,7 +192,7 @@ async fn main() { ); let diff_config = Arc::new(Mutex::new(proxy_config.upstream_difficulty_config.clone())); - + let task_collector_upstream = task_collector.clone(); // Instantiate a new `Upstream` (SV2 Pool) let upstream = match upstream_sv2::Upstream::new( upstream_addr, @@ -109,6 +205,7 @@ async fn main() { status::Sender::Upstream(tx_status.clone()), target.clone(), diff_config.clone(), + task_collector_upstream, ) .await { @@ -118,12 +215,12 @@ async fn main() { return; } }; - + let task_collector_init_task = task_collector.clone(); // Spawn a task to do all of this init work so that the main thread // can listen for signals and failures on the status channel. This // allows for the tproxy to fail gracefully if any of these init tasks //fail - task::spawn(async move { + let task = task::spawn(async move { // Connect to the SV2 Upstream role match upstream_sv2::Upstream::connect( upstream.clone(), @@ -163,6 +260,7 @@ async fn main() { async_std::task::sleep(std::time::Duration::from_millis(100)).await; } + let task_collector_bridge = task_collector_init_task.clone(); // Instantiate a new `Bridge` and begins handling incoming messages let b = proxy::Bridge::new( rx_sv1_downstream, @@ -174,6 +272,7 @@ async fn main() { extended_extranonce, target, up_id, + task_collector_bridge, ); proxy::Bridge::start(b.clone()); @@ -183,6 +282,7 @@ async fn main() { proxy_config.downstream_port, ); + let task_collector_downstream = task_collector_init_task.clone(); // Accept connections from one or more SV1 Downstream roles (SV1 Mining Devices) downstream_sv1::Downstream::accept_connections( downstream_addr, @@ -192,49 +292,8 @@ async fn main() { b, proxy_config.downstream_difficulty_config, diff_config, + task_collector_downstream, ); }); // End of init task - - debug!("Starting up signal listener"); - let mut interrupt_signal_future = Box::pin(tokio::signal::ctrl_c().fuse()); - debug!("Starting up status listener"); - - // Check all tasks if is_finished() is true, if so exit - loop { - let task_status = select! { - task_status = rx_status.recv().fuse() => task_status, - interrupt_signal = interrupt_signal_future => { - match interrupt_signal { - Ok(()) => { - info!("Interrupt received"); - }, - Err(err) => { - error!("Unable to listen for interrupt signal: {}", err); - // we also shut down in case of error - }, - } - break; - } - }; - let task_status: Status = task_status.unwrap(); - - match task_status.state { - // Should only be sent by the downstream listener - State::DownstreamShutdown(err) => { - error!("SHUTDOWN from: {}", err); - break; - } - State::BridgeShutdown(err) => { - error!("SHUTDOWN from: {}", err); - break; - } - State::UpstreamShutdown(err) => { - error!("SHUTDOWN from: {}", err); - break; - } - State::Healthy(msg) => { - info!("HEALTHY message: {}", msg); - } - } - } + let _ = task_collector.safe_lock(|t| t.push((task.abort_handle(), "init task".to_string()))); }