Skip to content

Commit

Permalink
Use spawn_blocking for da queue (#1311)
Browse files Browse the repository at this point in the history
  • Loading branch information
yaziciahmet authored Oct 9, 2024
1 parent 1f5782f commit 07058a5
Showing 1 changed file with 74 additions and 69 deletions.
143 changes: 74 additions & 69 deletions crates/bitcoin-da/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,85 +183,90 @@ impl BitcoinService {
self: Arc<Self>,
mut rx: UnboundedReceiver<Option<SenderWithNotifier<TxidWrapper>>>,
) {
tokio::spawn(async move {
let mut prev_utxo = match self.get_prev_utxo().await {
Ok(Some(prev_utxo)) => Some(prev_utxo),
Ok(None) => {
info!("No pending transactions found");
None
}
Err(e) => {
error!(?e, "Failed to get pending transactions");
None
}
};
// This should be spawn_blocking, since it is a CPU-bound worker.
// When spawned with tokio::spawn, it blocks other futures and
// disrupts tokio runtime.
tokio::task::spawn_blocking(|| {
tokio::runtime::Handle::current().block_on(async move {
let mut prev_utxo = match self.get_prev_utxo().await {
Ok(Some(prev_utxo)) => Some(prev_utxo),
Ok(None) => {
info!("No pending transactions found");
None
}
Err(e) => {
error!(?e, "Failed to get pending transactions");
None
}
};

trace!("BitcoinDA queue is initialized. Waiting for the first request...");

loop {
select! {
request_opt = rx.recv() => {
if let Some(request_opt) = request_opt {
match request_opt {
Some(request) => {
trace!("A new request is received");
let prev = prev_utxo.take();
loop {
// Build and send tx with retries:
let fee_sat_per_vbyte = match self.get_fee_rate().await {
Ok(rate) => rate,
Err(e) => {
error!(?e, "Failed to call get_fee_rate. Retrying...");
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
};
match self
.send_transaction_with_fee_rate(
prev.clone(),
request.da_data.clone(),
fee_sat_per_vbyte,
)
.await
{
Ok(tx) => {
let tx_id = TxidWrapper(tx.id);
info!(%tx.id, "Sent tx to BitcoinDA");
prev_utxo = Some(UTXO {
tx_id: tx.id,
vout: 0,
script_pubkey: tx.tx.output[0].script_pubkey.to_hex_string(),
address: None,
amount: tx.tx.output[0].value.to_sat(),
confirmations: 0,
spendable: true,
solvable: true,
});

let _ = request.notify.send(Ok(tx_id));
}
Err(e) => {
error!(?e, "Failed to send transaction to DA layer");
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
trace!("BitcoinDA queue is initialized. Waiting for the first request...");

loop {
select! {
request_opt = rx.recv() => {
if let Some(request_opt) = request_opt {
match request_opt {
Some(request) => {
trace!("A new request is received");
let prev = prev_utxo.take();
loop {
// Build and send tx with retries:
let fee_sat_per_vbyte = match self.get_fee_rate().await {
Ok(rate) => rate,
Err(e) => {
error!(?e, "Failed to call get_fee_rate. Retrying...");
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
};
match self
.send_transaction_with_fee_rate(
prev.clone(),
request.da_data.clone(),
fee_sat_per_vbyte,
)
.await
{
Ok(tx) => {
let tx_id = TxidWrapper(tx.id);
info!(%tx.id, "Sent tx to BitcoinDA");
prev_utxo = Some(UTXO {
tx_id: tx.id,
vout: 0,
script_pubkey: tx.tx.output[0].script_pubkey.to_hex_string(),
address: None,
amount: tx.tx.output[0].value.to_sat(),
confirmations: 0,
spendable: true,
solvable: true,
});

let _ = request.notify.send(Ok(tx_id));
}
Err(e) => {
error!(?e, "Failed to send transaction to DA layer");
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
}
break;
}
break;
}
}

None => {
info!("Shutdown signal received. Stopping BitcoinDA queue.");
break;
None => {
info!("Shutdown signal received. Stopping BitcoinDA queue.");
break;
}
}
}
},
_ = signal::ctrl_c() => {
return;
}
},
_ = signal::ctrl_c() => {
return;
}
}
}
});

error!("BitcoinDA queue stopped");
});
Expand Down

0 comments on commit 07058a5

Please sign in to comment.