From f3a13e7c3c626c86c9709952651a2c90a9df0a70 Mon Sep 17 00:00:00 2001 From: Nadav Ivgi Date: Tue, 6 Aug 2024 05:16:10 +0300 Subject: [PATCH] Avoid recomputing txids when possible Following https://github.com/Blockstream/electrs/pull/89#discussion_r1632848608 and https://github.com/Blockstream/electrs/pull/89#discussion_r1632919266 --- src/daemon.rs | 11 ++++++----- src/new_index/mempool.rs | 14 ++++++-------- src/new_index/query.rs | 2 +- 3 files changed, 13 insertions(+), 14 deletions(-) diff --git a/src/daemon.rs b/src/daemon.rs index 6ebf2dcaa..4fb24af69 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -11,7 +11,7 @@ use std::time::Duration; use base64::prelude::{Engine, BASE64_STANDARD}; use error_chain::ChainedError; use hex::FromHex; -use rayon::iter::{IntoParallelIterator, ParallelIterator}; +use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; use serde_json::{from_str, from_value, Value}; #[cfg(not(feature = "liquid"))] @@ -461,7 +461,7 @@ impl Daemon { &'a self, method: &'a str, params_list: Vec, - ) -> impl ParallelIterator> + 'a { + ) -> impl ParallelIterator> + IndexedParallelIterator + 'a { self.rpc_threads.install(move || { params_list.into_par_iter().map(move |params| { // Store a local per-thread Daemon, each with its own TCP connection. These will @@ -537,7 +537,7 @@ impl Daemon { /// Fetch the given transactions in parallel over multiple threads and RPC connections, /// ignoring any missing ones and returning whatever is available. - pub fn gettransactions_available(&self, txids: &[&Txid]) -> Result> { + pub fn gettransactions_available(&self, txids: &[&Txid]) -> Result> { const RPC_INVALID_ADDRESS_OR_KEY: i64 = -5; let params_list: Vec = txids @@ -546,8 +546,9 @@ impl Daemon { .collect(); self.requests_iter("getrawtransaction", params_list) - .filter_map(|res| match res { - Ok(val) => Some(tx_from_value(val)), + .zip(txids) + .filter_map(|(res, txid)| match res { + Ok(val) => Some(tx_from_value(val).map(|tx| (**txid, tx))), // Ignore 'tx not found' errors Err(Error(ErrorKind::RpcError(code, _, _), _)) if code == RPC_INVALID_ADDRESS_OR_KEY => diff --git a/src/new_index/mempool.rs b/src/new_index/mempool.rs index 8dae5bc62..374dcafeb 100644 --- a/src/new_index/mempool.rs +++ b/src/new_index/mempool.rs @@ -288,14 +288,12 @@ impl Mempool { self.backlog_stats = (BacklogStats::new(&self.feeinfo), Instant::now()); } - pub fn add_by_txid(&mut self, daemon: &Daemon, txid: &Txid) -> Result<()> { - if self.txstore.get(txid).is_none() { + pub fn add_by_txid(&mut self, daemon: &Daemon, txid: Txid) -> Result<()> { + if self.txstore.get(&txid).is_none() { if let Ok(tx) = daemon.getmempooltx(&txid) { - self.add({ - let mut txs_map = HashMap::new(); - txs_map.insert(tx.txid(), tx); - txs_map - }) + let mut txs_map = HashMap::new(); + txs_map.insert(txid, tx); + self.add(txs_map) } else { bail!("add_by_txid cannot find {}", txid); } @@ -528,7 +526,7 @@ impl Mempool { } let fetched_count = new_txs.len(); - fetched_txs.extend(&mut new_txs.into_iter().map(|tx| (tx.txid(), tx))); + fetched_txs.extend(new_txs); // Retry if any transactions were evicted form the mempool before we managed to get them if fetched_count != new_txids.len() { diff --git a/src/new_index/query.rs b/src/new_index/query.rs index e178bec49..26d983734 100644 --- a/src/new_index/query.rs +++ b/src/new_index/query.rs @@ -75,7 +75,7 @@ impl Query { .mempool .write() .unwrap() - .add_by_txid(&self.daemon, &txid); + .add_by_txid(&self.daemon, txid); Ok(txid) }