From 07058a5fd0b99500f0ca43bdee4f5184e7fe339b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ahmet=20Yaz=C4=B1c=C4=B1?= <75089142+yaziciahmet@users.noreply.github.com> Date: Wed, 9 Oct 2024 23:44:43 +0200 Subject: [PATCH] Use spawn_blocking for da queue (#1311) --- crates/bitcoin-da/src/service.rs | 143 ++++++++++++++++--------------- 1 file changed, 74 insertions(+), 69 deletions(-) diff --git a/crates/bitcoin-da/src/service.rs b/crates/bitcoin-da/src/service.rs index 09224a043..8a6a99b4d 100644 --- a/crates/bitcoin-da/src/service.rs +++ b/crates/bitcoin-da/src/service.rs @@ -183,85 +183,90 @@ impl BitcoinService { self: Arc, mut rx: UnboundedReceiver>>, ) { - tokio::spawn(async move { - let mut prev_utxo = match self.get_prev_utxo().await { - Ok(Some(prev_utxo)) => Some(prev_utxo), - Ok(None) => { - info!("No pending transactions found"); - None - } - Err(e) => { - error!(?e, "Failed to get pending transactions"); - None - } - }; + // This should be spawn_blocking, since it is a CPU-bound worker. + // When spawned with tokio::spawn, it blocks other futures and + // disrupts tokio runtime. + tokio::task::spawn_blocking(|| { + tokio::runtime::Handle::current().block_on(async move { + let mut prev_utxo = match self.get_prev_utxo().await { + Ok(Some(prev_utxo)) => Some(prev_utxo), + Ok(None) => { + info!("No pending transactions found"); + None + } + Err(e) => { + error!(?e, "Failed to get pending transactions"); + None + } + }; - trace!("BitcoinDA queue is initialized. Waiting for the first request..."); - - loop { - select! { - request_opt = rx.recv() => { - if let Some(request_opt) = request_opt { - match request_opt { - Some(request) => { - trace!("A new request is received"); - let prev = prev_utxo.take(); - loop { - // Build and send tx with retries: - let fee_sat_per_vbyte = match self.get_fee_rate().await { - Ok(rate) => rate, - Err(e) => { - error!(?e, "Failed to call get_fee_rate. Retrying..."); - tokio::time::sleep(Duration::from_secs(1)).await; - continue; - } - }; - match self - .send_transaction_with_fee_rate( - prev.clone(), - request.da_data.clone(), - fee_sat_per_vbyte, - ) - .await - { - Ok(tx) => { - let tx_id = TxidWrapper(tx.id); - info!(%tx.id, "Sent tx to BitcoinDA"); - prev_utxo = Some(UTXO { - tx_id: tx.id, - vout: 0, - script_pubkey: tx.tx.output[0].script_pubkey.to_hex_string(), - address: None, - amount: tx.tx.output[0].value.to_sat(), - confirmations: 0, - spendable: true, - solvable: true, - }); - - let _ = request.notify.send(Ok(tx_id)); - } - Err(e) => { - error!(?e, "Failed to send transaction to DA layer"); - tokio::time::sleep(Duration::from_secs(1)).await; - continue; + trace!("BitcoinDA queue is initialized. Waiting for the first request..."); + + loop { + select! { + request_opt = rx.recv() => { + if let Some(request_opt) = request_opt { + match request_opt { + Some(request) => { + trace!("A new request is received"); + let prev = prev_utxo.take(); + loop { + // Build and send tx with retries: + let fee_sat_per_vbyte = match self.get_fee_rate().await { + Ok(rate) => rate, + Err(e) => { + error!(?e, "Failed to call get_fee_rate. Retrying..."); + tokio::time::sleep(Duration::from_secs(1)).await; + continue; + } + }; + match self + .send_transaction_with_fee_rate( + prev.clone(), + request.da_data.clone(), + fee_sat_per_vbyte, + ) + .await + { + Ok(tx) => { + let tx_id = TxidWrapper(tx.id); + info!(%tx.id, "Sent tx to BitcoinDA"); + prev_utxo = Some(UTXO { + tx_id: tx.id, + vout: 0, + script_pubkey: tx.tx.output[0].script_pubkey.to_hex_string(), + address: None, + amount: tx.tx.output[0].value.to_sat(), + confirmations: 0, + spendable: true, + solvable: true, + }); + + let _ = request.notify.send(Ok(tx_id)); + } + Err(e) => { + error!(?e, "Failed to send transaction to DA layer"); + tokio::time::sleep(Duration::from_secs(1)).await; + continue; + } } + break; } - break; } - } - None => { - info!("Shutdown signal received. Stopping BitcoinDA queue."); - break; + None => { + info!("Shutdown signal received. Stopping BitcoinDA queue."); + break; + } } } + }, + _ = signal::ctrl_c() => { + return; } - }, - _ = signal::ctrl_c() => { - return; } } - } + }); error!("BitcoinDA queue stopped"); });