Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix missing transaction in light mempool bug #1239

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,4 @@ core_rpc_pass = "password"
# Time interval used for JDS mempool update
[mempool_update_interval]
unit = "secs"
value = 1
value = 0.1
74 changes: 72 additions & 2 deletions roles/jd-server/src/lib/job_declarator/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@ use roles_logic_sv2::{
ProvideMissingTransactions, ProvideMissingTransactionsSuccess, SubmitSolutionJd,
},
parsers::JobDeclaration,
utils::Mutex,
};
use std::{convert::TryInto, io::Cursor};
use std::{convert::TryInto, io::Cursor, sync::Arc};
use stratum_common::bitcoin::{Transaction, Txid};
pub type SendTo = SendTo_<JobDeclaration<'static>, ()>;
use crate::mempool::JDsMempool;

use super::{signed_token, TransactionState};
use roles_logic_sv2::{errors::Error, parsers::PoolMessages as AllMessages};
use stratum_common::bitcoin::consensus::Decodable;
use tracing::info;
use tracing::{debug, info};

use super::JobDeclaratorDownstream;

Expand Down Expand Up @@ -61,6 +64,10 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream {
}

fn handle_declare_mining_job(&mut self, message: DeclareMiningJob) -> Result<SendTo, Error> {
if let Some(old_mining_job) = self.declared_mining_job.0.take() {
clear_declared_mining_job(old_mining_job, &message, self.mempool.clone())?;
}

// the transactions that are present in the mempool are stored here, that is sent to the
// mempool which use the rpc client to retrieve the whole data for each transaction.
// The unknown transactions is a vector that contains the transactions that are not in the
Expand Down Expand Up @@ -220,3 +227,66 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream {
Ok(SendTo::None(Some(m)))
}
}

fn clear_declared_mining_job(
old_mining_job: DeclareMiningJob,
new_mining_job: &DeclareMiningJob,
mempool: Arc<Mutex<JDsMempool>>,
) -> Result<(), Error> {
let old_transactions = old_mining_job.tx_short_hash_list.inner_as_ref();
let new_transactions = new_mining_job.tx_short_hash_list.inner_as_ref();

if old_transactions.is_empty() {
info!("No transactions to remove from mempool");
return Ok(());
}

let nonce = old_mining_job.tx_short_hash_nonce;

let result = mempool
.safe_lock(|mempool_| -> Result<(), Error> {
let short_ids_map = mempool_
.to_short_ids(nonce)
.ok_or(Error::JDSMissingTransactions)?;

for short_id in old_transactions
.iter()
.filter(|&id| !new_transactions.contains(id))
{
if let Some(transaction_with_hash) = short_ids_map.get(*short_id) {
let txid = transaction_with_hash.id;
match mempool_.mempool.get_mut(&txid) {
Some(Some((_transaction, counter))) => {
if *counter > 1 {
*counter -= 1;
debug!(
"Fat transaction {:?} counter decremented; job id {:?} dropped",
txid, old_mining_job.request_id
);
} else {
mempool_.mempool.remove(&txid);
debug!(
"Fat transaction {:?} with job id {:?} removed from mempool",
txid, old_mining_job.request_id
);
}
}
Some(None) => debug!(
"Thin transaction {:?} with job id {:?} removed from mempool",
txid, old_mining_job.request_id
),
None => {}
}
} else {
debug!(
"Transaction with short id {:?} not found in mempool for old jobs",
short_id
);
}
}
Ok(())
})
.map_err(|e| Error::PoisonLock(e.to_string()))?;

result.map_err(|err| Error::PoisonLock(err.to_string()))
}
2 changes: 1 addition & 1 deletion roles/jd-server/src/lib/job_declarator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ impl JobDeclaratorDownstream {
.ok_or(Box::new(JdsError::ImpossibleToReconstructBlock(
"Txid found in jds mempool but transactions not present".to_string(),
)))?;
transactions_list.push(tx);
transactions_list.push(tx.0);
GitGab19 marked this conversation as resolved.
Show resolved Hide resolved
} else {
return Err(Box::new(JdsError::ImpossibleToReconstructBlock(
"Unknown transaction".to_string(),
Expand Down
65 changes: 45 additions & 20 deletions roles/jd-server/src/lib/mempool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ use stratum_common::{bitcoin, bitcoin::hash_types::Txid};
#[derive(Clone, Debug)]
pub struct TransactionWithHash {
pub id: Txid,
pub tx: Option<Transaction>,
pub tx: Option<(Transaction, u32)>,
}

#[derive(Clone, Debug)]
pub struct JDsMempool {
pub mempool: HashMap<Txid, Option<Transaction>>,
pub mempool: HashMap<Txid, Option<(Transaction, u32)>>,
auth: mini_rpc_client::Auth,
url: String,
new_block_receiver: Receiver<String>,
Expand Down Expand Up @@ -50,7 +50,7 @@ impl JDsMempool {
new_block_receiver: Receiver<String>,
) -> Self {
let auth = mini_rpc_client::Auth::new(username, password);
let empty_mempool: HashMap<Txid, Option<Transaction>> = HashMap::new();
let empty_mempool: HashMap<Txid, Option<(Transaction, u32)>> = HashMap::new();
JDsMempool {
mempool: empty_mempool,
auth,
Expand Down Expand Up @@ -82,42 +82,67 @@ impl JDsMempool {
.get_raw_transaction(&txid.to_string(), None)
.await
.map_err(JdsMempoolError::Rpc)?;
let _ =
self_.safe_lock(|a| a.mempool.insert(transaction.txid(), Some(transaction)));
let _ = self_.safe_lock(|a| {
a.mempool
.entry(transaction.txid())
.and_modify(|entry| {
if let Some((_, count)) = entry {
*count += 1;
} else {
*entry = Some((transaction.clone(), 1));
}
})
.or_insert(Some((transaction, 1)));
});
}
}

// fill in the mempool the transactions given in input
for transaction in transactions {
let _ = self_.safe_lock(|a| a.mempool.insert(transaction.txid(), Some(transaction)));
let _ = self_.safe_lock(|a| {
a.mempool
.entry(transaction.txid())
.and_modify(|entry| {
if let Some((_, count)) = entry {
*count += 1;
} else {
*entry = Some((transaction.clone(), 1));
}
})
.or_insert(Some((transaction, 1)));
});
}
Ok(())
}

pub async fn update_mempool(self_: Arc<Mutex<Self>>) -> Result<(), JdsMempoolError> {
let mut mempool_ordered: HashMap<Txid, Option<Transaction>> = HashMap::new();

let client = self_
.safe_lock(|x| x.get_client())?
.ok_or(JdsMempoolError::NoClient)?;

let mempool: Vec<String> = client.get_raw_mempool().await?;
for id in &mempool {
let key_id = Txid::from_str(id)
.map_err(|err| JdsMempoolError::Rpc(RpcError::Deserialization(err.to_string())))?;
let mempool = client.get_raw_mempool().await?;

let tx = self_.safe_lock(|x| match x.mempool.get(&key_id) {
Some(entry) => entry.clone(),
None => None,
})?;
let raw_mempool_txids: Result<Vec<Txid>, _> = mempool
.into_iter()
.map(|id| {
Txid::from_str(&id)
.map_err(|err| JdsMempoolError::Rpc(RpcError::Deserialization(err.to_string())))
})
.collect();

mempool_ordered.insert(key_id, tx);
}
let raw_mempool_txids = raw_mempool_txids?;

// Holding the lock till the light mempool updation is complete.
let is_mempool_empty = self_.safe_lock(|x| {
raw_mempool_txids.iter().for_each(|txid| {
x.mempool.entry(*txid).or_insert(None);
});
x.mempool.is_empty()
})?;

if mempool_ordered.is_empty() {
if is_mempool_empty {
Err(JdsMempoolError::EmptyMempool)
} else {
let _ = self_.safe_lock(|x| x.mempool = mempool_ordered);
Ok(())
}
}
Expand Down
Loading