Skip to content

Commit

Permalink
DaService::send_transaction accepts DaData instead of plain Vec<u8>
Browse files Browse the repository at this point in the history
  • Loading branch information
kpp committed Aug 13, 2024
1 parent b40e674 commit 633c376
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 64 deletions.
4 changes: 2 additions & 2 deletions bin/citrea/src/rollup/bitcoin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use sov_modules_rollup_blueprint::RollupBlueprint;
use sov_modules_stf_blueprint::StfBlueprint;
use sov_prover_storage_manager::ProverStorageManager;
use sov_rollup_interface::da::DaVerifier;
use sov_rollup_interface::services::da::BlobWithNotifier;
use sov_rollup_interface::services::da::SenderWithNotifier;
use sov_rollup_interface::zk::{Zkvm, ZkvmHost};
use sov_state::{DefaultStorageSpec, Storage, ZkStorage};
use sov_stf_runner::{FullNodeConfig, ProverConfig};
Expand Down Expand Up @@ -110,7 +110,7 @@ impl RollupBlueprint for BitcoinRollup {
&self,
rollup_config: &FullNodeConfig<Self::DaConfig>,
) -> Result<Arc<Self::DaService>, anyhow::Error> {
let (tx, rx) = unbounded_channel::<BlobWithNotifier<TxidWrapper>>();
let (tx, rx) = unbounded_channel::<SenderWithNotifier<TxidWrapper>>();

let service = Arc::new(
BitcoinService::new(
Expand Down
24 changes: 13 additions & 11 deletions crates/bitcoin-da/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use bitcoin::{merkle_tree, Amount, BlockHash, CompactTarget, Transaction, Txid,
use bitcoincore_rpc::jsonrpc_async::Error as RpcError;
use bitcoincore_rpc::{Auth, Client, Error, RpcApi};
use serde::{Deserialize, Serialize};
use sov_rollup_interface::da::DaSpec;
use sov_rollup_interface::services::da::{BlobWithNotifier, DaService};
use sov_rollup_interface::da::{DaData, DaSpec};
use sov_rollup_interface::services::da::{DaService, SenderWithNotifier};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::sync::oneshot::channel as oneshot_channel;
use tracing::{debug, error, info, instrument, trace};
Expand Down Expand Up @@ -49,7 +49,7 @@ pub struct BitcoinService {
network: bitcoin::Network,
da_private_key: Option<SecretKey>,
reveal_tx_id_prefix: Vec<u8>,
inscribes_queue: UnboundedSender<BlobWithNotifier<TxidWrapper>>,
inscribes_queue: UnboundedSender<SenderWithNotifier<TxidWrapper>>,
}

/// Runtime configuration for the DA service
Expand Down Expand Up @@ -78,7 +78,7 @@ impl BitcoinService {
pub async fn new(
config: DaServiceConfig,
chain_params: RollupParams,
tx: UnboundedSender<BlobWithNotifier<TxidWrapper>>,
tx: UnboundedSender<SenderWithNotifier<TxidWrapper>>,
) -> Result<Self> {
let client = Client::new(
&config.node_url,
Expand All @@ -105,7 +105,7 @@ impl BitcoinService {

pub fn spawn_da_queue(
self: Arc<Self>,
mut rx: UnboundedReceiver<BlobWithNotifier<TxidWrapper>>,
mut rx: UnboundedReceiver<SenderWithNotifier<TxidWrapper>>,
) {
// This is a queue of inscribe requests
tokio::task::spawn_blocking(|| {
Expand Down Expand Up @@ -136,7 +136,7 @@ impl BitcoinService {
let prev = prev_tx.take();
loop {
// Build and send tx with retries:
let blob = request.blob.clone();
let blob = borsh::to_vec(&request.da_data).expect("Should serialize");
let fee_sat_per_vbyte = match self.get_fee_rate().await {
Ok(rate) => rate,
Err(e) => {
Expand Down Expand Up @@ -207,7 +207,7 @@ impl BitcoinService {
network: bitcoin::Network,
da_private_key: Option<SecretKey>,
reveal_tx_id_prefix: Vec<u8>,
inscribes_queue: UnboundedSender<BlobWithNotifier<TxidWrapper>>,
inscribes_queue: UnboundedSender<SenderWithNotifier<TxidWrapper>>,
) -> Self {
let wallets = client
.list_wallets()
Expand Down Expand Up @@ -565,18 +565,20 @@ impl DaService for BitcoinService {
#[instrument(level = "trace", skip_all)]
async fn send_transaction(
&self,
blob: &[u8],
da_data: DaData,
) -> Result<<Self as DaService>::TransactionId, Self::Error> {
let queue = self.get_send_transaction_queue();
let (tx, rx) = oneshot_channel();
queue.send(BlobWithNotifier {
blob: blob.to_vec(),
queue.send(SenderWithNotifier {
da_data,
notify: tx,
})?;
rx.await?
}

fn get_send_transaction_queue(&self) -> UnboundedSender<BlobWithNotifier<Self::TransactionId>> {
fn get_send_transaction_queue(
&self,
) -> UnboundedSender<SenderWithNotifier<Self::TransactionId>> {
self.inscribes_queue.clone()
}

Expand Down
6 changes: 1 addition & 5 deletions crates/prover/src/prover_service/parallel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,7 @@ where
let da_data = DaData::ZKProof(proof.clone());

let tx_id = da_service
.send_transaction(
borsh::to_vec(&da_data)
.expect("Should serialize")
.as_slice(),
)
.send_transaction(da_data)
.await
.map_err(|e| anyhow::anyhow!(e))?;
break Ok((tx_id, proof));
Expand Down
7 changes: 3 additions & 4 deletions crates/sequencer/src/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use sov_modules_api::{
};
use sov_modules_stf_blueprint::StfBlueprintTrait;
use sov_rollup_interface::da::{BlockHeaderTrait, DaData, DaSpec, SequencerCommitment};
use sov_rollup_interface::services::da::{BlobWithNotifier, DaService};
use sov_rollup_interface::services::da::{DaService, SenderWithNotifier};
use sov_rollup_interface::stf::{SoftConfirmationReceipt, StateTransitionFunction};
use sov_rollup_interface::storage::HierarchicalStorageManager;
use sov_rollup_interface::zk::ZkvmHost;
Expand Down Expand Up @@ -717,10 +717,9 @@ where

debug!("Sequencer: submitting commitment: {:?}", commitment);

let blob = borsh::to_vec(&DaData::SequencerCommitment(commitment.clone()))
.map_err(|e| anyhow!(e))?;
let da_data = DaData::SequencerCommitment(commitment.clone());
let (notify, rx) = oneshot_channel();
let request = BlobWithNotifier { blob, notify };
let request = SenderWithNotifier { da_data, notify };
self.da_service
.get_send_transaction_queue()
.send(request)
Expand Down
118 changes: 84 additions & 34 deletions crates/sovereign-sdk/adapters/mock-da/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use std::time::Duration;
use async_trait::async_trait;
use pin_project::pin_project;
use sha2::Digest;
use sov_rollup_interface::da::{BlockHeaderTrait, DaSpec, Time};
use sov_rollup_interface::services::da::{BlobWithNotifier, DaService, SlotData};
use sov_rollup_interface::da::{BlockHeaderTrait, DaData, DaSpec, Time};
use sov_rollup_interface::services::da::{DaService, SenderWithNotifier, SlotData};
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
use tokio::sync::{broadcast, Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard};
use tokio::time;
Expand Down Expand Up @@ -149,7 +149,9 @@ impl MockDaService {
blocks.prune_above(height);

for blob in blobs {
let _ = self.add_blob(&blocks, &blob, Default::default())?;
use sov_rollup_interface::zk::Proof;
self.send_transaction(DaData::ZKProof(Proof::Full(blob)))
.await?;
}

Ok(())
Expand Down Expand Up @@ -190,22 +192,22 @@ impl MockDaService {
pub async fn publish_test_block(&self) -> anyhow::Result<()> {
let blocks = self.blocks.lock().await;
let blob = vec![];
let _ = self.add_blob(&blocks, &blob, Default::default())?;
let _ = self.add_blob(&blocks, blob, Default::default())?;
Ok(())
}

fn add_blob(
&self,
blocks: &AsyncMutexGuard<'_, DbConnector>,
blob: &[u8],
blob: Vec<u8>,
zkp_proof: Vec<u8>,
) -> anyhow::Result<u64> {
let (previous_block_hash, height) = match blocks.last().map(|b| b.header().clone()) {
None => (GENESIS_HEADER.hash(), GENESIS_HEADER.height() + 1),
Some(block_header) => (block_header.hash(), block_header.height + 1),
};

let data_hash = hash_to_array(blob);
let data_hash = hash_to_array(&blob);
let proof_hash = hash_to_array(&zkp_proof);
// Hash only from single blob
let block_hash = block_hash(height, data_hash, proof_hash, previous_block_hash.into());
Expand Down Expand Up @@ -324,7 +326,7 @@ impl DaService for MockDaService {

let len = blocks.len() as u64;
if len == 0 && height == 1 {
let _ = self.add_blob(&blocks, &[] as &[u8], Default::default())?;
let _ = self.add_blob(&blocks, Default::default(), Default::default())?;
}

// if wait for height doesn't lock its own blocks, can't make it async
Expand Down Expand Up @@ -393,18 +395,21 @@ impl DaService for MockDaService {
([0u8; 32], ())
}

async fn send_transaction(&self, blob: &[u8]) -> Result<Self::TransactionId, Self::Error> {
async fn send_transaction(&self, da_data: DaData) -> Result<Self::TransactionId, Self::Error> {
let blob = borsh::to_vec(&da_data).unwrap();
let blocks = self.blocks.lock().await;
let _ = self.add_blob(&blocks, blob, Default::default())?;
Ok(MockHash([0; 32]))
}

fn get_send_transaction_queue(&self) -> UnboundedSender<BlobWithNotifier<Self::TransactionId>> {
let (tx, mut rx) = unbounded_channel::<BlobWithNotifier<Self::TransactionId>>();
fn get_send_transaction_queue(
&self,
) -> UnboundedSender<SenderWithNotifier<Self::TransactionId>> {
let (tx, mut rx) = unbounded_channel::<SenderWithNotifier<Self::TransactionId>>();
let this = self.clone();
tokio::spawn(async move {
while let Some(req) = rx.recv().await {
let res = this.send_transaction(&req.blob).await;
let res = this.send_transaction(req.da_data).await;
let _ = req.notify.send(res);
}
});
Expand Down Expand Up @@ -472,6 +477,7 @@ fn block_hash(
#[cfg(test)]
mod tests {
use sov_rollup_interface::da::{BlobReaderTrait, BlockHeaderTrait};
use sov_rollup_interface::zk::Proof;
use tokio::task::JoinHandle;
use tokio_stream::StreamExt;

Expand Down Expand Up @@ -551,17 +557,18 @@ mod tests {
get_finalized_headers_collector(&mut da, number_of_finalized_blocks).await;

for i in 0..num_blocks {
let published_blob: Vec<u8> = vec![i as u8; i + 1];
let published_blob = DaData::ZKProof(Proof::Full(vec![i as u8; i + 1]));
let height = (i + 1) as u64;

da.send_transaction(&published_blob).await.unwrap();
da.send_transaction(published_blob.clone()).await.unwrap();

let mut block = da.get_block_at(height).await.unwrap();

assert_eq!(height, block.header.height());
assert_eq!(1, block.blobs.len());
let blob = &mut block.blobs[0];
let retrieved_data = blob.full_data().to_vec();
let retrieved_data = borsh::from_slice(&retrieved_data).unwrap();
assert_eq!(published_blob, retrieved_data);

let last_finalized_block_response = da.get_last_finalized_block_header().await;
Expand Down Expand Up @@ -598,7 +605,9 @@ mod tests {
for (i, blob) in blobs.iter().enumerate() {
let height = (i + 1) as u64;
// Send transaction should pass
da.send_transaction(blob).await.unwrap();
da.send_transaction(DaData::ZKProof(Proof::Full(blob.to_owned())))
.await
.unwrap();
let last_finalized_block_response = da.get_last_finalized_block_header().await;
validate_get_finalized_header_response(
height,
Expand All @@ -624,7 +633,10 @@ mod tests {
let last_finalized_header = da.get_last_finalized_block_header().await.unwrap();
assert_eq!(expected_finalized_height, last_finalized_header.height());

assert_eq!(&blob, fetched_block.blobs[0].full_data());
let da_data = DaData::ZKProof(Proof::Full(blob));
let retrieved_data = fetched_block.blobs[0].full_data();
let retrieved_data = borsh::from_slice(retrieved_data).unwrap();
assert_eq!(da_data, retrieved_data);

let head_block_header = da.get_head_block_header().await.unwrap();
assert_eq!(expected_head_height, head_block_header.height());
Expand Down Expand Up @@ -676,9 +688,15 @@ mod tests {

// 1 -> 2 -> 3

da.send_transaction(&[1, 2, 3, 4]).await.unwrap();
da.send_transaction(&[4, 5, 6, 7]).await.unwrap();
da.send_transaction(&[8, 9, 0, 1]).await.unwrap();
da.send_transaction(DaData::ZKProof(Proof::Full(vec![1, 2, 3, 4])))
.await
.unwrap();
da.send_transaction(DaData::ZKProof(Proof::Full(vec![4, 5, 6, 7])))
.await
.unwrap();
da.send_transaction(DaData::ZKProof(Proof::Full(vec![8, 9, 0, 1])))
.await
.unwrap();

let block_1_before = da.get_block_at(1).await.unwrap();
let block_2_before = da.get_block_at(2).await.unwrap();
Expand Down Expand Up @@ -727,13 +745,21 @@ mod tests {
// \ -> 3.2 -> 4.2

// 1
da.send_transaction(&[1, 2, 3, 4]).await.unwrap();
da.send_transaction(DaData::ZKProof(Proof::Full(vec![1, 2, 3, 4])))
.await
.unwrap();
// 2
da.send_transaction(&[4, 5, 6, 7]).await.unwrap();
da.send_transaction(DaData::ZKProof(Proof::Full(vec![4, 5, 6, 7])))
.await
.unwrap();
// 3.1
da.send_transaction(&[8, 9, 0, 1]).await.unwrap();
da.send_transaction(DaData::ZKProof(Proof::Full(vec![8, 9, 0, 1])))
.await
.unwrap();
// 4.1
da.send_transaction(&[2, 3, 4, 5]).await.unwrap();
da.send_transaction(DaData::ZKProof(Proof::Full(vec![2, 3, 4, 5])))
.await
.unwrap();

let _block_1 = da.get_block_at(1).await.unwrap();
let block_2 = da.get_block_at(2).await.unwrap();
Expand Down Expand Up @@ -761,10 +787,18 @@ mod tests {

// 1 -> 2 -> 3 -> 4

da.send_transaction(&[1, 2, 3, 4]).await.unwrap();
da.send_transaction(&[4, 5, 6, 7]).await.unwrap();
da.send_transaction(&[8, 9, 0, 1]).await.unwrap();
da.send_transaction(&[2, 3, 4, 5]).await.unwrap();
da.send_transaction(DaData::ZKProof(Proof::Full(vec![1, 2, 3, 4])))
.await
.unwrap();
da.send_transaction(DaData::ZKProof(Proof::Full(vec![4, 5, 6, 7])))
.await
.unwrap();
da.send_transaction(DaData::ZKProof(Proof::Full(vec![8, 9, 0, 1])))
.await
.unwrap();
da.send_transaction(DaData::ZKProof(Proof::Full(vec![2, 3, 4, 5])))
.await
.unwrap();

let block_1_before = da.get_block_at(1).await.unwrap();
let block_2_before = da.get_block_at(2).await.unwrap();
Expand Down Expand Up @@ -822,9 +856,15 @@ mod tests {
assert!(has_planned_fork.is_some());
}

da.send_transaction(&[1, 2, 3, 4]).await.unwrap();
da.send_transaction(&[4, 5, 6, 7]).await.unwrap();
da.send_transaction(&[8, 9, 0, 1]).await.unwrap();
da.send_transaction(DaData::ZKProof(Proof::Full(vec![1, 2, 3, 4])))
.await
.unwrap();
da.send_transaction(DaData::ZKProof(Proof::Full(vec![4, 5, 6, 7])))
.await
.unwrap();
da.send_transaction(DaData::ZKProof(Proof::Full(vec![8, 9, 0, 1])))
.await
.unwrap();

let block_1_before = da.get_block_at(1).await.unwrap();
let block_2_before = da.get_block_at(2).await.unwrap();
Expand Down Expand Up @@ -854,11 +894,21 @@ mod tests {
PlannedFork::new(4, 2, vec![vec![13, 13, 13, 13], vec![14, 14, 14, 14]]);
da.set_planned_fork(planned_fork).await.unwrap();

da.send_transaction(&[1, 1, 1, 1]).await.unwrap();
da.send_transaction(&[2, 2, 2, 2]).await.unwrap();
da.send_transaction(&[3, 3, 3, 3]).await.unwrap();
da.send_transaction(&[4, 4, 4, 4]).await.unwrap();
da.send_transaction(&[5, 5, 5, 5]).await.unwrap();
da.send_transaction(DaData::ZKProof(Proof::Full(vec![1, 1, 1, 1])))
.await
.unwrap();
da.send_transaction(DaData::ZKProof(Proof::Full(vec![2, 2, 2, 2])))
.await
.unwrap();
da.send_transaction(DaData::ZKProof(Proof::Full(vec![3, 3, 3, 3])))
.await
.unwrap();
da.send_transaction(DaData::ZKProof(Proof::Full(vec![4, 4, 4, 4])))
.await
.unwrap();
da.send_transaction(DaData::ZKProof(Proof::Full(vec![5, 5, 5, 5])))
.await
.unwrap();

let block_1_before = da.get_block_at(1).await.unwrap();
let block_2_before = da.get_block_at(2).await.unwrap();
Expand Down
Loading

0 comments on commit 633c376

Please sign in to comment.