From 5f5c3190267abafdacceedf521403403299bcfbf Mon Sep 17 00:00:00 2001 From: jbesraa Date: Tue, 13 Aug 2024 13:03:16 +0300 Subject: [PATCH] Refactor JDServer --- roles/jd-server/src/lib/core.rs | 183 ++++++++++++++++++++++++++++++++ roles/jd-server/src/lib/mod.rs | 1 + roles/jd-server/src/main.rs | 171 +---------------------------- 3 files changed, 188 insertions(+), 167 deletions(-) create mode 100644 roles/jd-server/src/lib/core.rs diff --git a/roles/jd-server/src/lib/core.rs b/roles/jd-server/src/lib/core.rs new file mode 100644 index 0000000000..d1b707090e --- /dev/null +++ b/roles/jd-server/src/lib/core.rs @@ -0,0 +1,183 @@ +use crate::job_declarator::JobDeclarator; +use async_channel::{bounded, unbounded, Receiver, Sender}; +use error_handling::handle_result; +use roles_logic_sv2::utils::Mutex; +use std::{ops::Sub, sync::Arc}; +use tokio::{select, task}; +use tracing::{error, info, warn}; + +use crate::{ + mempool::{self, error::JdsMempoolError}, + status, Configuration, +}; + +pub struct JDServer { + config: Configuration, +} + +impl JDServer { + pub fn new(config: Configuration) -> Self { + Self { config } + } + pub async fn start(&self) { + let config = self.config.clone(); + let url = config.core_rpc_url.clone() + ":" + &config.core_rpc_port.clone().to_string(); + let username = config.core_rpc_user.clone(); + let password = config.core_rpc_pass.clone(); + // TODO should we manage what to do when the limit is reaced? + let (new_block_sender, new_block_receiver): (Sender, Receiver) = + bounded(10); + let mempool = Arc::new(Mutex::new(mempool::JDsMempool::new( + url.clone(), + username, + password, + new_block_receiver, + ))); + let mempool_update_interval = config.mempool_update_interval; + let mempool_cloned_ = mempool.clone(); + let (status_tx, status_rx) = unbounded(); + let sender = status::Sender::Downstream(status_tx.clone()); + let mut last_empty_mempool_warning = + std::time::Instant::now().sub(std::time::Duration::from_secs(60)); + + // TODO if the jd-server is launched with core_rpc_url empty, the following flow is never + // taken. Consequentally new_block_receiver in JDsMempool::on_submit is never read, possibly + // reaching the channel bound. The new_block_sender is given as input to JobDeclarator::start() + if url.contains("http") { + let sender_update_mempool = sender.clone(); + task::spawn(async move { + loop { + let update_mempool_result: Result<(), mempool::error::JdsMempoolError> = + mempool::JDsMempool::update_mempool(mempool_cloned_.clone()).await; + if let Err(err) = update_mempool_result { + match err { + JdsMempoolError::EmptyMempool => { + if last_empty_mempool_warning.elapsed().as_secs() >= 60 { + warn!("{:?}", err); + warn!("Template Provider is running, but its mempool is empty (possible reasons: you're testing in testnet, signet, or regtest)"); + last_empty_mempool_warning = std::time::Instant::now(); + } + } + JdsMempoolError::NoClient => { + mempool::error::handle_error(&err); + handle_result!(sender_update_mempool, Err(err)); + } + JdsMempoolError::Rpc(_) => { + mempool::error::handle_error(&err); + handle_result!(sender_update_mempool, Err(err)); + } + JdsMempoolError::PoisonLock(_) => { + mempool::error::handle_error(&err); + handle_result!(sender_update_mempool, Err(err)); + } + } + } + tokio::time::sleep(mempool_update_interval).await; + // DO NOT REMOVE THIS LINE + //let _transactions = mempool::JDsMempool::_get_transaction_list(mempool_cloned_.clone()); + } + }); + + let mempool_cloned = mempool.clone(); + let sender_submit_solution = sender.clone(); + task::spawn(async move { + loop { + let result = mempool::JDsMempool::on_submit(mempool_cloned.clone()).await; + if let Err(err) = result { + match err { + JdsMempoolError::EmptyMempool => { + if last_empty_mempool_warning.elapsed().as_secs() >= 60 { + warn!("{:?}", err); + warn!("Template Provider is running, but its mempool is empty (possible reasons: you're testing in testnet, signet, or regtest)"); + last_empty_mempool_warning = std::time::Instant::now(); + } + } + _ => { + // TODO here there should be a better error managmenet + mempool::error::handle_error(&err); + handle_result!(sender_submit_solution, Err(err)); + } + } + } + } + }); + }; + + let cloned = config.clone(); + let mempool_cloned = mempool.clone(); + let (sender_add_txs_to_mempool, receiver_add_txs_to_mempool) = unbounded(); + task::spawn(async move { + JobDeclarator::start( + cloned, + sender, + mempool_cloned, + new_block_sender, + sender_add_txs_to_mempool, + ) + .await + }); + task::spawn(async move { + loop { + if let Ok(add_transactions_to_mempool) = receiver_add_txs_to_mempool.recv().await { + let mempool_cloned = mempool.clone(); + task::spawn(async move { + match mempool::JDsMempool::add_tx_data_to_mempool( + mempool_cloned, + add_transactions_to_mempool, + ) + .await + { + Ok(_) => (), + Err(err) => { + // TODO + // here there should be a better error management + mempool::error::handle_error(&err); + } + } + }); + } + } + }); + + // Start the error handling loop + // See `./status.rs` and `utils/error_handling` for information on how this operates + loop { + let task_status = select! { + task_status = status_rx.recv() => task_status, + interrupt_signal = tokio::signal::ctrl_c() => { + match interrupt_signal { + Ok(()) => { + info!("Interrupt received"); + }, + Err(err) => { + error!("Unable to listen for interrupt signal: {}", err); + // we also shut down in case of error + }, + } + break; + } + }; + let task_status: status::Status = task_status.unwrap(); + + match task_status.state { + // Should only be sent by the downstream listener + status::State::DownstreamShutdown(err) => { + error!( + "SHUTDOWN from Downstream: {}\nTry to restart the downstream listener", + err + ); + } + status::State::TemplateProviderShutdown(err) => { + error!("SHUTDOWN from Upstream: {}\nTry to reconnecting or connecting to a new upstream", err); + break; + } + status::State::Healthy(msg) => { + info!("HEALTHY message: {}", msg); + } + status::State::DownstreamInstanceDropped(downstream_id) => { + warn!("Dropping downstream instance {} from jds", downstream_id); + } + } + } + } +} diff --git a/roles/jd-server/src/lib/mod.rs b/roles/jd-server/src/lib/mod.rs index 07f7c6603a..e5387cb577 100644 --- a/roles/jd-server/src/lib/mod.rs +++ b/roles/jd-server/src/lib/mod.rs @@ -1,3 +1,4 @@ +pub mod core; pub mod error; pub mod job_declarator; pub mod mempool; diff --git a/roles/jd-server/src/main.rs b/roles/jd-server/src/main.rs index ac030445a9..519ef7033a 100644 --- a/roles/jd-server/src/main.rs +++ b/roles/jd-server/src/main.rs @@ -1,18 +1,13 @@ #![allow(special_module_name)] use crate::lib::{ - mempool::{self, error::JdsMempoolError}, + job_declarator, + mempool::{self}, status, Configuration, }; -use async_channel::{bounded, unbounded, Receiver, Sender}; -use error_handling::handle_result; -use roles_logic_sv2::utils::Mutex; -use std::{ops::Sub, sync::Arc}; -use tokio::{select, task}; -use tracing::{error, info, warn}; +use tracing::error; mod lib; use ext_config::{Config, File, FileFormat}; -use lib::job_declarator::JobDeclarator; mod args { use std::path::PathBuf; @@ -108,163 +103,5 @@ async fn main() { } }; - let url = config.core_rpc_url.clone() + ":" + &config.core_rpc_port.clone().to_string(); - let username = config.core_rpc_user.clone(); - let password = config.core_rpc_pass.clone(); - // TODO should we manage what to do when the limit is reaced? - let (new_block_sender, new_block_receiver): (Sender, Receiver) = bounded(10); - let mempool = Arc::new(Mutex::new(mempool::JDsMempool::new( - url.clone(), - username, - password, - new_block_receiver, - ))); - let mempool_update_interval = config.mempool_update_interval; - let mempool_cloned_ = mempool.clone(); - let (status_tx, status_rx) = unbounded(); - let sender = status::Sender::Downstream(status_tx.clone()); - let mut last_empty_mempool_warning = - std::time::Instant::now().sub(std::time::Duration::from_secs(60)); - - // TODO if the jd-server is launched with core_rpc_url empty, the following flow is never - // taken. Consequentally new_block_receiver in JDsMempool::on_submit is never read, possibly - // reaching the channel bound. The new_block_sender is given as input to JobDeclarator::start() - if url.contains("http") { - let sender_update_mempool = sender.clone(); - task::spawn(async move { - loop { - let update_mempool_result: Result<(), mempool::error::JdsMempoolError> = - mempool::JDsMempool::update_mempool(mempool_cloned_.clone()).await; - if let Err(err) = update_mempool_result { - match err { - JdsMempoolError::EmptyMempool => { - if last_empty_mempool_warning.elapsed().as_secs() >= 60 { - warn!("{:?}", err); - warn!("Template Provider is running, but its mempool is empty (possible reasons: you're testing in testnet, signet, or regtest)"); - last_empty_mempool_warning = std::time::Instant::now(); - } - } - JdsMempoolError::NoClient => { - mempool::error::handle_error(&err); - handle_result!(sender_update_mempool, Err(err)); - } - JdsMempoolError::Rpc(_) => { - mempool::error::handle_error(&err); - handle_result!(sender_update_mempool, Err(err)); - } - JdsMempoolError::PoisonLock(_) => { - mempool::error::handle_error(&err); - handle_result!(sender_update_mempool, Err(err)); - } - } - } - tokio::time::sleep(mempool_update_interval).await; - // DO NOT REMOVE THIS LINE - //let _transactions = mempool::JDsMempool::_get_transaction_list(mempool_cloned_.clone()); - } - }); - - let mempool_cloned = mempool.clone(); - let sender_submit_solution = sender.clone(); - task::spawn(async move { - loop { - let result = mempool::JDsMempool::on_submit(mempool_cloned.clone()).await; - if let Err(err) = result { - match err { - JdsMempoolError::EmptyMempool => { - if last_empty_mempool_warning.elapsed().as_secs() >= 60 { - warn!("{:?}", err); - warn!("Template Provider is running, but its mempool is empty (possible reasons: you're testing in testnet, signet, or regtest)"); - last_empty_mempool_warning = std::time::Instant::now(); - } - } - _ => { - // TODO here there should be a better error managmenet - mempool::error::handle_error(&err); - handle_result!(sender_submit_solution, Err(err)); - } - } - } - } - }); - }; - - info!("Jds INITIALIZING with config: {:?}", &args.config_path); - - let cloned = config.clone(); - let mempool_cloned = mempool.clone(); - let (sender_add_txs_to_mempool, receiver_add_txs_to_mempool) = unbounded(); - task::spawn(async move { - JobDeclarator::start( - cloned, - sender, - mempool_cloned, - new_block_sender, - sender_add_txs_to_mempool, - ) - .await - }); - task::spawn(async move { - loop { - if let Ok(add_transactions_to_mempool) = receiver_add_txs_to_mempool.recv().await { - let mempool_cloned = mempool.clone(); - task::spawn(async move { - match lib::mempool::JDsMempool::add_tx_data_to_mempool( - mempool_cloned, - add_transactions_to_mempool, - ) - .await - { - Ok(_) => (), - Err(err) => { - // TODO - // here there should be a better error management - mempool::error::handle_error(&err); - } - } - }); - } - } - }); - - // Start the error handling loop - // See `./status.rs` and `utils/error_handling` for information on how this operates - loop { - let task_status = select! { - task_status = status_rx.recv() => task_status, - interrupt_signal = tokio::signal::ctrl_c() => { - match interrupt_signal { - Ok(()) => { - info!("Interrupt received"); - }, - Err(err) => { - error!("Unable to listen for interrupt signal: {}", err); - // we also shut down in case of error - }, - } - break; - } - }; - let task_status: status::Status = task_status.unwrap(); - - match task_status.state { - // Should only be sent by the downstream listener - status::State::DownstreamShutdown(err) => { - error!( - "SHUTDOWN from Downstream: {}\nTry to restart the downstream listener", - err - ); - } - status::State::TemplateProviderShutdown(err) => { - error!("SHUTDOWN from Upstream: {}\nTry to reconnecting or connecting to a new upstream", err); - break; - } - status::State::Healthy(msg) => { - info!("HEALTHY message: {}", msg); - } - status::State::DownstreamInstanceDropped(downstream_id) => { - warn!("Dropping downstream instance {} from jds", downstream_id); - } - } - } + lib::core::JDServer::new(config).start().await; }