From a0f15c09b15d859476b652a9ff46930a92e7a286 Mon Sep 17 00:00:00 2001 From: bit-aloo Date: Tue, 12 Nov 2024 12:01:33 +0530 Subject: [PATCH 1/7] add mempool updation logic --- roles/jd-server/src/lib/job_declarator/mod.rs | 2 +- roles/jd-server/src/lib/mempool/mod.rs | 46 +++++++++++-------- 2 files changed, 27 insertions(+), 21 deletions(-) diff --git a/roles/jd-server/src/lib/job_declarator/mod.rs b/roles/jd-server/src/lib/job_declarator/mod.rs index 9cf0d5ab9..9e775bc92 100644 --- a/roles/jd-server/src/lib/job_declarator/mod.rs +++ b/roles/jd-server/src/lib/job_declarator/mod.rs @@ -145,7 +145,7 @@ impl JobDeclaratorDownstream { .ok_or(Box::new(JdsError::ImpossibleToReconstructBlock( "Txid found in jds mempool but transactions not present".to_string(), )))?; - transactions_list.push(tx); + transactions_list.push(tx.0); } else { return Err(Box::new(JdsError::ImpossibleToReconstructBlock( "Unknown transaction".to_string(), diff --git a/roles/jd-server/src/lib/mempool/mod.rs b/roles/jd-server/src/lib/mempool/mod.rs index a9aa8ecc9..87f6d72ed 100644 --- a/roles/jd-server/src/lib/mempool/mod.rs +++ b/roles/jd-server/src/lib/mempool/mod.rs @@ -12,12 +12,12 @@ use stratum_common::{bitcoin, bitcoin::hash_types::Txid}; #[derive(Clone, Debug)] pub struct TransactionWithHash { pub id: Txid, - pub tx: Option, + pub tx: Option<(Transaction, u32)>, } #[derive(Clone, Debug)] pub struct JDsMempool { - pub mempool: HashMap>, + pub mempool: HashMap>, auth: mini_rpc_client::Auth, url: String, new_block_receiver: Receiver, @@ -50,7 +50,7 @@ impl JDsMempool { new_block_receiver: Receiver, ) -> Self { let auth = mini_rpc_client::Auth::new(username, password); - let empty_mempool: HashMap> = HashMap::new(); + let empty_mempool: HashMap> = HashMap::new(); JDsMempool { mempool: empty_mempool, auth, @@ -82,42 +82,48 @@ impl JDsMempool { .get_raw_transaction(&txid.to_string(), None) .await .map_err(JdsMempoolError::Rpc)?; - let _ = - self_.safe_lock(|a| a.mempool.insert(transaction.txid(), Some(transaction))); + let _ = self_ + .safe_lock(|a| a.mempool.insert(transaction.txid(), Some((transaction, 1)))); } } // fill in the mempool the transactions given in input for transaction in transactions { - let _ = self_.safe_lock(|a| a.mempool.insert(transaction.txid(), Some(transaction))); + let _ = + self_.safe_lock(|a| a.mempool.insert(transaction.txid(), Some((transaction, 1)))); } Ok(()) } pub async fn update_mempool(self_: Arc>) -> Result<(), JdsMempoolError> { - let mut mempool_ordered: HashMap> = HashMap::new(); - let client = self_ .safe_lock(|x| x.get_client())? .ok_or(JdsMempoolError::NoClient)?; - let mempool: Vec = client.get_raw_mempool().await?; - for id in &mempool { - let key_id = Txid::from_str(id) - .map_err(|err| JdsMempoolError::Rpc(RpcError::Deserialization(err.to_string())))?; + let mempool = client.get_raw_mempool().await?; - let tx = self_.safe_lock(|x| match x.mempool.get(&key_id) { - Some(entry) => entry.clone(), - None => None, - })?; + let raw_mempool_txids: Result, _> = mempool + .into_iter() + .map(|id| { + Txid::from_str(&id) + .map_err(|err| JdsMempoolError::Rpc(RpcError::Deserialization(err.to_string()))) + }) + .collect(); - mempool_ordered.insert(key_id, tx); - } + let raw_mempool_txids = raw_mempool_txids?; + + // Holding the lock till the light mempool updation is complete. + let is_mempool_empty = self_.safe_lock(|x| { + x.mempool.retain(|_, v| v.is_some()); + raw_mempool_txids.iter().for_each(|txid| { + x.mempool.entry(*txid).or_insert(None); + }); + x.mempool.is_empty() + })?; - if mempool_ordered.is_empty() { + if is_mempool_empty { Err(JdsMempoolError::EmptyMempool) } else { - let _ = self_.safe_lock(|x| x.mempool = mempool_ordered); Ok(()) } } From 1b226691cdc77dfec3baefa61c3ff211714cdaf2 Mon Sep 17 00:00:00 2001 From: bit-aloo Date: Tue, 12 Nov 2024 12:01:56 +0530 Subject: [PATCH 2/7] handle new Declare mining job mempool updation --- .../src/lib/job_declarator/message_handler.rs | 76 ++++++++++++++++++- 1 file changed, 74 insertions(+), 2 deletions(-) diff --git a/roles/jd-server/src/lib/job_declarator/message_handler.rs b/roles/jd-server/src/lib/job_declarator/message_handler.rs index 29bbd7bf3..ce5be5c6e 100644 --- a/roles/jd-server/src/lib/job_declarator/message_handler.rs +++ b/roles/jd-server/src/lib/job_declarator/message_handler.rs @@ -7,14 +7,17 @@ use roles_logic_sv2::{ ProvideMissingTransactions, ProvideMissingTransactionsSuccess, SubmitSolutionJd, }, parsers::JobDeclaration, + utils::Mutex, }; -use std::{convert::TryInto, io::Cursor}; +use std::{convert::TryInto, io::Cursor, sync::Arc}; use stratum_common::bitcoin::{Transaction, Txid}; pub type SendTo = SendTo_, ()>; +use crate::mempool::JDsMempool; + use super::{signed_token, TransactionState}; use roles_logic_sv2::{errors::Error, parsers::PoolMessages as AllMessages}; use stratum_common::bitcoin::consensus::Decodable; -use tracing::info; +use tracing::{debug, info}; use super::JobDeclaratorDownstream; @@ -61,6 +64,10 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream { } fn handle_declare_mining_job(&mut self, message: DeclareMiningJob) -> Result { + if let Some(old_mining_job) = self.declared_mining_job.0.take() { + clear_declared_mining_job(old_mining_job, &message, self.mempool.clone())?; + } + // the transactions that are present in the mempool are stored here, that is sent to the // mempool which use the rpc client to retrieve the whole data for each transaction. // The unknown transactions is a vector that contains the transactions that are not in the @@ -110,6 +117,8 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream { .known_transactions .append(&mut known_transactions); + dbg!(missing_txs.len()); + if missing_txs.is_empty() { let message_success = DeclareMiningJobSuccess { request_id: message.request_id, @@ -220,3 +229,66 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream { Ok(SendTo::None(Some(m))) } } + +fn clear_declared_mining_job( + old_mining_job: DeclareMiningJob, + new_mining_job: &DeclareMiningJob, + mempool: Arc>, +) -> Result<(), Error> { + let old_transactions = old_mining_job.tx_short_hash_list.inner_as_ref(); + let new_transactions = new_mining_job.tx_short_hash_list.inner_as_ref(); + + if old_transactions.is_empty() { + info!("No transactions to remove from mempool"); + return Ok(()); + } + + let nonce = old_mining_job.tx_short_hash_nonce; + + let result = mempool + .safe_lock(|mempool_| -> Result<(), Error> { + let short_ids_map = mempool_ + .to_short_ids(nonce) + .ok_or(Error::JDSMissingTransactions)?; + + for short_id in old_transactions + .iter() + .filter(|&id| !new_transactions.contains(id)) + { + if let Some(transaction_with_hash) = short_ids_map.get(*short_id) { + let txid = transaction_with_hash.id; + match mempool_.mempool.get_mut(&txid) { + Some(Some((_transaction, counter))) => { + if *counter > 1 { + *counter -= 1; + debug!( + "Fat transaction {:?} counter decremented; job id {:?} dropped", + txid, old_mining_job.request_id + ); + } else { + mempool_.mempool.insert(txid, None); + debug!( + "Fat transaction {:?} with job id {:?} removed from mempool", + txid, old_mining_job.request_id + ); + } + } + Some(None) => debug!( + "Thin transaction {:?} with job id {:?} removed from mempool", + txid, old_mining_job.request_id + ), + None => {} + } + } else { + debug!( + "Transaction with short id {:?} not found in mempool for old jobs", + short_id + ); + } + } + Ok(()) + }) + .map_err(|e| Error::PoisonLock(e.to_string()))?; + + result.map_err(|err| Error::PoisonLock(err.to_string())) +} From 50e4c0d31d05876012cf5c1a6cf23b257559ae84 Mon Sep 17 00:00:00 2001 From: bit-aloo Date: Tue, 12 Nov 2024 13:47:34 +0530 Subject: [PATCH 3/7] remove tx from jds mempool instead of persisting in flat form --- roles/jd-server/src/lib/job_declarator/message_handler.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/roles/jd-server/src/lib/job_declarator/message_handler.rs b/roles/jd-server/src/lib/job_declarator/message_handler.rs index ce5be5c6e..b12ca47a4 100644 --- a/roles/jd-server/src/lib/job_declarator/message_handler.rs +++ b/roles/jd-server/src/lib/job_declarator/message_handler.rs @@ -117,7 +117,6 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream { .known_transactions .append(&mut known_transactions); - dbg!(missing_txs.len()); if missing_txs.is_empty() { let message_success = DeclareMiningJobSuccess { @@ -266,7 +265,7 @@ fn clear_declared_mining_job( txid, old_mining_job.request_id ); } else { - mempool_.mempool.insert(txid, None); + mempool_.mempool.remove(&txid); debug!( "Fat transaction {:?} with job id {:?} removed from mempool", txid, old_mining_job.request_id From b569b2a8f755a9d1e4c593fa27a8b38a828f21bc Mon Sep 17 00:00:00 2001 From: bit-aloo Date: Tue, 12 Nov 2024 15:14:11 +0530 Subject: [PATCH 4/7] add counter incremental --- .../src/lib/job_declarator/message_handler.rs | 1 - roles/jd-server/src/lib/mempool/mod.rs | 24 +++++++++++++++---- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/roles/jd-server/src/lib/job_declarator/message_handler.rs b/roles/jd-server/src/lib/job_declarator/message_handler.rs index b12ca47a4..cd75e86af 100644 --- a/roles/jd-server/src/lib/job_declarator/message_handler.rs +++ b/roles/jd-server/src/lib/job_declarator/message_handler.rs @@ -117,7 +117,6 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream { .known_transactions .append(&mut known_transactions); - if missing_txs.is_empty() { let message_success = DeclareMiningJobSuccess { request_id: message.request_id, diff --git a/roles/jd-server/src/lib/mempool/mod.rs b/roles/jd-server/src/lib/mempool/mod.rs index 87f6d72ed..cb8c71565 100644 --- a/roles/jd-server/src/lib/mempool/mod.rs +++ b/roles/jd-server/src/lib/mempool/mod.rs @@ -82,15 +82,31 @@ impl JDsMempool { .get_raw_transaction(&txid.to_string(), None) .await .map_err(JdsMempoolError::Rpc)?; - let _ = self_ - .safe_lock(|a| a.mempool.insert(transaction.txid(), Some((transaction, 1)))); + let _ = self_.safe_lock(|a| { + a.mempool + .entry(transaction.txid()) + .and_modify(|entry| { + if let Some((_, count)) = entry { + *count += 1; + } + }) + .or_insert(Some((transaction, 1))); + }); } } // fill in the mempool the transactions given in input for transaction in transactions { - let _ = - self_.safe_lock(|a| a.mempool.insert(transaction.txid(), Some((transaction, 1)))); + let _ = self_.safe_lock(|a| { + a.mempool + .entry(transaction.txid()) + .and_modify(|entry| { + if let Some((_, count)) = entry { + *count += 1; + } + }) + .or_insert(Some((transaction, 1))); + }); } Ok(()) } From 367bbc15f011a04f816cba3857a511ff03d19ab4 Mon Sep 17 00:00:00 2001 From: bit-aloo Date: Tue, 12 Nov 2024 17:19:19 +0530 Subject: [PATCH 5/7] add debug statements --- .../src/lib/job_declarator/message_handler.rs | 7 ++++--- roles/jd-server/src/lib/mempool/mod.rs | 12 ++++++++++++ 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/roles/jd-server/src/lib/job_declarator/message_handler.rs b/roles/jd-server/src/lib/job_declarator/message_handler.rs index cd75e86af..9a607f2a2 100644 --- a/roles/jd-server/src/lib/job_declarator/message_handler.rs +++ b/roles/jd-server/src/lib/job_declarator/message_handler.rs @@ -117,6 +117,7 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream { .known_transactions .append(&mut known_transactions); + dbg!(missing_txs.len()); if missing_txs.is_empty() { let message_success = DeclareMiningJobSuccess { request_id: message.request_id, @@ -259,19 +260,19 @@ fn clear_declared_mining_job( Some(Some((_transaction, counter))) => { if *counter > 1 { *counter -= 1; - debug!( + info!( "Fat transaction {:?} counter decremented; job id {:?} dropped", txid, old_mining_job.request_id ); } else { mempool_.mempool.remove(&txid); - debug!( + info!( "Fat transaction {:?} with job id {:?} removed from mempool", txid, old_mining_job.request_id ); } } - Some(None) => debug!( + Some(None) => info!( "Thin transaction {:?} with job id {:?} removed from mempool", txid, old_mining_job.request_id ), diff --git a/roles/jd-server/src/lib/mempool/mod.rs b/roles/jd-server/src/lib/mempool/mod.rs index cb8c71565..353180a14 100644 --- a/roles/jd-server/src/lib/mempool/mod.rs +++ b/roles/jd-server/src/lib/mempool/mod.rs @@ -88,6 +88,8 @@ impl JDsMempool { .and_modify(|entry| { if let Some((_, count)) = entry { *count += 1; + } else { + *entry = Some((transaction.clone(), 1)); } }) .or_insert(Some((transaction, 1))); @@ -97,12 +99,15 @@ impl JDsMempool { // fill in the mempool the transactions given in input for transaction in transactions { + dbg!(transaction.txid()); let _ = self_.safe_lock(|a| { a.mempool .entry(transaction.txid()) .and_modify(|entry| { if let Some((_, count)) = entry { *count += 1; + } else { + *entry = Some((transaction.clone(), 1)); } }) .or_insert(Some((transaction, 1))); @@ -118,6 +123,9 @@ impl JDsMempool { let mempool = client.get_raw_mempool().await?; + dbg!("raw Mempool size"); + dbg!(mempool.len()); + let raw_mempool_txids: Result, _> = mempool .into_iter() .map(|id| { @@ -130,10 +138,14 @@ impl JDsMempool { // Holding the lock till the light mempool updation is complete. let is_mempool_empty = self_.safe_lock(|x| { + dbg!("Before Update"); + dbg!(x.mempool.len()); x.mempool.retain(|_, v| v.is_some()); raw_mempool_txids.iter().for_each(|txid| { x.mempool.entry(*txid).or_insert(None); }); + dbg!("After Update"); + dbg!(x.mempool.len()); x.mempool.is_empty() })?; From bf06bce564cb9aa8ebb4d264f4c27aa647906154 Mon Sep 17 00:00:00 2001 From: bit-aloo Date: Tue, 12 Nov 2024 19:18:06 +0530 Subject: [PATCH 6/7] removed debug logs and statements --- .../jd-server/src/lib/job_declarator/message_handler.rs | 7 +++---- roles/jd-server/src/lib/mempool/mod.rs | 9 --------- 2 files changed, 3 insertions(+), 13 deletions(-) diff --git a/roles/jd-server/src/lib/job_declarator/message_handler.rs b/roles/jd-server/src/lib/job_declarator/message_handler.rs index 9a607f2a2..cd75e86af 100644 --- a/roles/jd-server/src/lib/job_declarator/message_handler.rs +++ b/roles/jd-server/src/lib/job_declarator/message_handler.rs @@ -117,7 +117,6 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream { .known_transactions .append(&mut known_transactions); - dbg!(missing_txs.len()); if missing_txs.is_empty() { let message_success = DeclareMiningJobSuccess { request_id: message.request_id, @@ -260,19 +259,19 @@ fn clear_declared_mining_job( Some(Some((_transaction, counter))) => { if *counter > 1 { *counter -= 1; - info!( + debug!( "Fat transaction {:?} counter decremented; job id {:?} dropped", txid, old_mining_job.request_id ); } else { mempool_.mempool.remove(&txid); - info!( + debug!( "Fat transaction {:?} with job id {:?} removed from mempool", txid, old_mining_job.request_id ); } } - Some(None) => info!( + Some(None) => debug!( "Thin transaction {:?} with job id {:?} removed from mempool", txid, old_mining_job.request_id ), diff --git a/roles/jd-server/src/lib/mempool/mod.rs b/roles/jd-server/src/lib/mempool/mod.rs index 353180a14..0541593b4 100644 --- a/roles/jd-server/src/lib/mempool/mod.rs +++ b/roles/jd-server/src/lib/mempool/mod.rs @@ -99,7 +99,6 @@ impl JDsMempool { // fill in the mempool the transactions given in input for transaction in transactions { - dbg!(transaction.txid()); let _ = self_.safe_lock(|a| { a.mempool .entry(transaction.txid()) @@ -123,9 +122,6 @@ impl JDsMempool { let mempool = client.get_raw_mempool().await?; - dbg!("raw Mempool size"); - dbg!(mempool.len()); - let raw_mempool_txids: Result, _> = mempool .into_iter() .map(|id| { @@ -138,14 +134,9 @@ impl JDsMempool { // Holding the lock till the light mempool updation is complete. let is_mempool_empty = self_.safe_lock(|x| { - dbg!("Before Update"); - dbg!(x.mempool.len()); - x.mempool.retain(|_, v| v.is_some()); raw_mempool_txids.iter().for_each(|txid| { x.mempool.entry(*txid).or_insert(None); }); - dbg!("After Update"); - dbg!(x.mempool.len()); x.mempool.is_empty() })?; From 7cc0424e3250f08e42e94ea76679cd43b22aaa44 Mon Sep 17 00:00:00 2001 From: bit-aloo Date: Wed, 13 Nov 2024 07:59:20 +0530 Subject: [PATCH 7/7] update mempool updation time period to 0.1 seconds --- roles/jd-server/config-examples/jds-config-local-example.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/roles/jd-server/config-examples/jds-config-local-example.toml b/roles/jd-server/config-examples/jds-config-local-example.toml index dc8ce0555..084089274 100644 --- a/roles/jd-server/config-examples/jds-config-local-example.toml +++ b/roles/jd-server/config-examples/jds-config-local-example.toml @@ -28,4 +28,4 @@ core_rpc_pass = "password" # Time interval used for JDS mempool update [mempool_update_interval] unit = "secs" -value = 1 +value = 0.1