Skip to content

Commit

Permalink
Process commitments in a separate thread (#585)
Browse files Browse the repository at this point in the history
* Retry DA commitment until success

* Pick no more l1 blocks just to fit min_soft_confirmations_per_commitment

* Process commitments in a separate thread

* Rename func and struct

* Fix indices in get_commitment_info

* Hide structs behind cfg native

* fix tests + some renaming of vars

---------

Co-authored-by: eyusufatik <[email protected]>
  • Loading branch information
kpp and eyusufatik authored May 17, 2024
1 parent 0b9e560 commit bf131c8
Show file tree
Hide file tree
Showing 5 changed files with 202 additions and 192 deletions.
80 changes: 33 additions & 47 deletions crates/bitcoin-da/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,8 @@ use bitcoin::{Address, BlockHash, Txid};
use hex::ToHex;
use serde::{Deserialize, Serialize};
use sov_rollup_interface::da::DaSpec;
use sov_rollup_interface::services::da::DaService;
use sov_rollup_interface::services::da::{BlobWithNotifier, DaService};
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
use tokio::sync::oneshot::{
channel as oneshot_channel, Receiver as OneshotReceiver, Sender as OneshotSender,
};
use tracing::{error, info, instrument, trace};

use crate::helpers::builders::{
Expand All @@ -44,7 +41,7 @@ pub struct BitcoinService {
network: bitcoin::Network,
da_private_key: Option<SecretKey>,
reveal_tx_id_prefix: Vec<u8>,
inscribes_queue: UnboundedSender<InscriptionRawTx>,
inscribes_queue: UnboundedSender<BlobWithNotifier<TxidWrapper>>,
}

/// Runtime configuration for the DA service
Expand All @@ -68,11 +65,6 @@ pub struct DaServiceConfig {
const FINALITY_DEPTH: u64 = 4; // blocks
const POLLING_INTERVAL: u64 = 10; // seconds

struct InscriptionRawTx {
blob: Vec<u8>,
notify: OneshotSender<Result<TxidWrapper, anyhow::Error>>,
}

impl BitcoinService {
// Create a new instance of the DA service from the given configuration.
pub async fn new(config: DaServiceConfig, chain_params: RollupParams) -> Self {
Expand All @@ -85,7 +77,7 @@ impl BitcoinService {
.da_private_key
.map(|pk| SecretKey::from_str(&pk).expect("Invalid private key"));

let (tx, mut rx) = unbounded_channel::<InscriptionRawTx>();
let (tx, mut rx) = unbounded_channel::<BlobWithNotifier<TxidWrapper>>();

let this = Self::with_client(
client,
Expand All @@ -112,31 +104,34 @@ impl BitcoinService {
// We execute commit and reveal txs one by one to chain them
while let Some(request) = rx.recv().await {
trace!("A new request is received");
let fee_sat_per_vbyte = match this.get_fee_rate().await {
Ok(rate) => rate,
Err(e) => {
let _ = request.notify.send(Err(e));
continue;
}
};
match this
.send_transaction_with_fee_rate(
prev_tx.take(),
request.blob,
fee_sat_per_vbyte,
)
.await
{
Ok(tx) => {
let tx_id = TxidWrapper(tx.id);
info!(%tx.id, "Send tx to BitcoinDA");
prev_tx = Some(tx);
let _ = request.notify.send(Ok(tx_id));
}
Err(e) => {
error!(?e, "Failed to send transaction to DA layer");
let _ = request.notify.send(Err(e));
let prev = prev_tx.take();
loop {
// Build and send tx with retries:
let blob = request.blob.clone();
let fee_sat_per_vbyte = match this.get_fee_rate().await {
Ok(rate) => rate,
Err(e) => {
error!(?e, "Failed to call get_fee_rate. Retrying...");
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
};
match this
.send_transaction_with_fee_rate(prev, blob, fee_sat_per_vbyte)
.await
{
Ok(tx) => {
let tx_id = TxidWrapper(tx.id);
info!(%tx.id, "Sent tx to BitcoinDA");
prev_tx = Some(tx);
let _ = request.notify.send(Ok(tx_id));
}
Err(e) => {
error!(?e, "Failed to send transaction to DA layer");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
break;
}
}

Expand Down Expand Up @@ -176,7 +171,7 @@ impl BitcoinService {
network: bitcoin::Network,
da_private_key: Option<SecretKey>,
reveal_tx_id_prefix: Vec<u8>,
inscribes_queue: UnboundedSender<InscriptionRawTx>,
inscribes_queue: UnboundedSender<BlobWithNotifier<TxidWrapper>>,
) -> Self {
let wallets = client
.list_wallets()
Expand Down Expand Up @@ -509,17 +504,8 @@ impl DaService for BitcoinService {
unimplemented!("Use send_tx_no_wait instead")
}

#[instrument(level = "trace", skip_all)]
async fn send_tx_no_wait(
&self,
blob: Vec<u8>,
) -> OneshotReceiver<Result<Self::TransactionId, Self::Error>> {
let (notify, rx) = oneshot_channel();
let request = InscriptionRawTx { blob, notify };
self.inscribes_queue
.send(request)
.expect("Bitcoint service already stopped");
rx
fn get_send_transaction_queue(&self) -> UnboundedSender<BlobWithNotifier<Self::TransactionId>> {
self.inscribes_queue.clone()
}

async fn send_aggregated_zk_proof(
Expand Down
35 changes: 25 additions & 10 deletions crates/sequencer/src/commitment_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,22 +46,37 @@ pub fn get_commitment_info(
// if there is a height then start from height + 1 and go to prev_l1_height
let (l2_range_to_submit, l1_height_range) = match last_commitment_l1_height {
Some(last_commitment_l1_height) => {
let l1_height_range = (last_commitment_l1_height.0 + 1, prev_l1_height);
let l1_start = last_commitment_l1_height.0 + 1;
let mut l1_end = l1_start;

let Some((l2_start_height, _)) =
ledger_db.get_l2_range_by_l1_height(SlotNumber(l1_height_range.0))?
else {
bail!("Sequencer: Failed to get L1 L2 connection");
};
let Some((_, l2_end_height)) =
ledger_db.get_l2_range_by_l1_height(SlotNumber(prev_l1_height))?
let Some((l2_start, mut l2_end)) =
ledger_db.get_l2_range_by_l1_height(SlotNumber(l1_start))?
else {
bail!("Sequencer: Failed to get L1 L2 connection");
};

let l2_range_to_submit = (l2_start_height, l2_end_height);
// Take while sum of l2 ranges <= min_soft_confirmations_per_commitment
for l1_i in l1_start..=prev_l1_height {
l1_end = l1_i;

(l2_range_to_submit, l1_height_range)
let Some((_, l2_end_new)) =
ledger_db.get_l2_range_by_l1_height(SlotNumber(l1_end))?
else {
bail!("Sequencer: Failed to get L1 L2 connection");
};

l2_end = l2_end_new;

let l2_range_length = 1 + l2_end.0 - l2_start.0;
if l2_range_length >= min_soft_confirmations_per_commitment {
break;
}
}
let l1_height_range = (l1_start, l1_end);

let l2_height_range = (l2_start, l2_end);

(l2_height_range, l1_height_range)
}
None => {
let first_soft_confirmation = match ledger_db.get_soft_batch_by_number::<()>(1)? {
Expand Down
Loading

0 comments on commit bf131c8

Please sign in to comment.