diff --git a/bin/citrea/src/rollup/bitcoin.rs b/bin/citrea/src/rollup/bitcoin.rs index 080eb7e52..bd4d2fb5e 100644 --- a/bin/citrea/src/rollup/bitcoin.rs +++ b/bin/citrea/src/rollup/bitcoin.rs @@ -17,7 +17,7 @@ use sov_modules_rollup_blueprint::RollupBlueprint; use sov_modules_stf_blueprint::StfBlueprint; use sov_prover_storage_manager::ProverStorageManager; use sov_rollup_interface::da::DaVerifier; -use sov_rollup_interface::services::da::BlobWithNotifier; +use sov_rollup_interface::services::da::SenderWithNotifier; use sov_rollup_interface::zk::{Zkvm, ZkvmHost}; use sov_state::{DefaultStorageSpec, Storage, ZkStorage}; use sov_stf_runner::{FullNodeConfig, ProverConfig}; @@ -110,7 +110,7 @@ impl RollupBlueprint for BitcoinRollup { &self, rollup_config: &FullNodeConfig, ) -> Result, anyhow::Error> { - let (tx, rx) = unbounded_channel::>(); + let (tx, rx) = unbounded_channel::>(); let service = Arc::new( BitcoinService::new( diff --git a/crates/bitcoin-da/src/service.rs b/crates/bitcoin-da/src/service.rs index 3c8c5bbbb..e8fe489d2 100644 --- a/crates/bitcoin-da/src/service.rs +++ b/crates/bitcoin-da/src/service.rs @@ -19,8 +19,8 @@ use bitcoin::{merkle_tree, Amount, BlockHash, CompactTarget, Transaction, Txid, use bitcoincore_rpc::jsonrpc_async::Error as RpcError; use bitcoincore_rpc::{Auth, Client, Error, RpcApi}; use serde::{Deserialize, Serialize}; -use sov_rollup_interface::da::DaSpec; -use sov_rollup_interface::services::da::{BlobWithNotifier, DaService}; +use sov_rollup_interface::da::{DaData, DaSpec}; +use sov_rollup_interface::services::da::{DaService, SenderWithNotifier}; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::sync::oneshot::channel as oneshot_channel; use tracing::{debug, error, info, instrument, trace}; @@ -49,7 +49,7 @@ pub struct BitcoinService { network: bitcoin::Network, da_private_key: Option, reveal_tx_id_prefix: Vec, - inscribes_queue: UnboundedSender>, + inscribes_queue: UnboundedSender>, } /// Runtime configuration for the DA service @@ -78,7 +78,7 @@ impl BitcoinService { pub async fn new( config: DaServiceConfig, chain_params: RollupParams, - tx: UnboundedSender>, + tx: UnboundedSender>, ) -> Result { let client = Client::new( &config.node_url, @@ -105,7 +105,7 @@ impl BitcoinService { pub fn spawn_da_queue( self: Arc, - mut rx: UnboundedReceiver>, + mut rx: UnboundedReceiver>, ) { // This is a queue of inscribe requests tokio::task::spawn_blocking(|| { @@ -136,7 +136,7 @@ impl BitcoinService { let prev = prev_tx.take(); loop { // Build and send tx with retries: - let blob = request.blob.clone(); + let blob = borsh::to_vec(&request.da_data).expect("Should serialize"); let fee_sat_per_vbyte = match self.get_fee_rate().await { Ok(rate) => rate, Err(e) => { @@ -207,7 +207,7 @@ impl BitcoinService { network: bitcoin::Network, da_private_key: Option, reveal_tx_id_prefix: Vec, - inscribes_queue: UnboundedSender>, + inscribes_queue: UnboundedSender>, ) -> Self { let wallets = client .list_wallets() @@ -565,18 +565,20 @@ impl DaService for BitcoinService { #[instrument(level = "trace", skip_all)] async fn send_transaction( &self, - blob: &[u8], + da_data: DaData, ) -> Result<::TransactionId, Self::Error> { let queue = self.get_send_transaction_queue(); let (tx, rx) = oneshot_channel(); - queue.send(BlobWithNotifier { - blob: blob.to_vec(), + queue.send(SenderWithNotifier { + da_data, notify: tx, })?; rx.await? } - fn get_send_transaction_queue(&self) -> UnboundedSender> { + fn get_send_transaction_queue( + &self, + ) -> UnboundedSender> { self.inscribes_queue.clone() } diff --git a/crates/prover/src/prover_service/parallel/mod.rs b/crates/prover/src/prover_service/parallel/mod.rs index b82bea9d7..0c8cd0e6b 100644 --- a/crates/prover/src/prover_service/parallel/mod.rs +++ b/crates/prover/src/prover_service/parallel/mod.rs @@ -189,11 +189,7 @@ where let da_data = DaData::ZKProof(proof.clone()); let tx_id = da_service - .send_transaction( - borsh::to_vec(&da_data) - .expect("Should serialize") - .as_slice(), - ) + .send_transaction(da_data) .await .map_err(|e| anyhow::anyhow!(e))?; break Ok((tx_id, proof)); diff --git a/crates/sequencer/src/sequencer.rs b/crates/sequencer/src/sequencer.rs index 8984b8468..97fa58fce 100644 --- a/crates/sequencer/src/sequencer.rs +++ b/crates/sequencer/src/sequencer.rs @@ -37,7 +37,7 @@ use sov_modules_api::{ }; use sov_modules_stf_blueprint::StfBlueprintTrait; use sov_rollup_interface::da::{BlockHeaderTrait, DaData, DaSpec, SequencerCommitment}; -use sov_rollup_interface::services::da::{BlobWithNotifier, DaService}; +use sov_rollup_interface::services::da::{DaService, SenderWithNotifier}; use sov_rollup_interface::stf::{SoftConfirmationReceipt, StateTransitionFunction}; use sov_rollup_interface::storage::HierarchicalStorageManager; use sov_rollup_interface::zk::ZkvmHost; @@ -717,10 +717,9 @@ where debug!("Sequencer: submitting commitment: {:?}", commitment); - let blob = borsh::to_vec(&DaData::SequencerCommitment(commitment.clone())) - .map_err(|e| anyhow!(e))?; + let da_data = DaData::SequencerCommitment(commitment.clone()); let (notify, rx) = oneshot_channel(); - let request = BlobWithNotifier { blob, notify }; + let request = SenderWithNotifier { da_data, notify }; self.da_service .get_send_transaction_queue() .send(request) diff --git a/crates/sovereign-sdk/adapters/mock-da/src/service.rs b/crates/sovereign-sdk/adapters/mock-da/src/service.rs index 468d86ebc..a5d1c58aa 100644 --- a/crates/sovereign-sdk/adapters/mock-da/src/service.rs +++ b/crates/sovereign-sdk/adapters/mock-da/src/service.rs @@ -7,8 +7,8 @@ use std::time::Duration; use async_trait::async_trait; use pin_project::pin_project; use sha2::Digest; -use sov_rollup_interface::da::{BlockHeaderTrait, DaSpec, Time}; -use sov_rollup_interface::services::da::{BlobWithNotifier, DaService, SlotData}; +use sov_rollup_interface::da::{BlockHeaderTrait, DaData, DaSpec, Time}; +use sov_rollup_interface::services::da::{DaService, SenderWithNotifier, SlotData}; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; use tokio::sync::{broadcast, Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard}; use tokio::time; @@ -149,7 +149,9 @@ impl MockDaService { blocks.prune_above(height); for blob in blobs { - let _ = self.add_blob(&blocks, &blob, Default::default())?; + use sov_rollup_interface::zk::Proof; + self.send_transaction(DaData::ZKProof(Proof::Full(blob))) + .await?; } Ok(()) @@ -190,14 +192,14 @@ impl MockDaService { pub async fn publish_test_block(&self) -> anyhow::Result<()> { let blocks = self.blocks.lock().await; let blob = vec![]; - let _ = self.add_blob(&blocks, &blob, Default::default())?; + let _ = self.add_blob(&blocks, blob, Default::default())?; Ok(()) } fn add_blob( &self, blocks: &AsyncMutexGuard<'_, DbConnector>, - blob: &[u8], + blob: Vec, zkp_proof: Vec, ) -> anyhow::Result { let (previous_block_hash, height) = match blocks.last().map(|b| b.header().clone()) { @@ -205,7 +207,7 @@ impl MockDaService { Some(block_header) => (block_header.hash(), block_header.height + 1), }; - let data_hash = hash_to_array(blob); + let data_hash = hash_to_array(&blob); let proof_hash = hash_to_array(&zkp_proof); // Hash only from single blob let block_hash = block_hash(height, data_hash, proof_hash, previous_block_hash.into()); @@ -324,7 +326,7 @@ impl DaService for MockDaService { let len = blocks.len() as u64; if len == 0 && height == 1 { - let _ = self.add_blob(&blocks, &[] as &[u8], Default::default())?; + let _ = self.add_blob(&blocks, Default::default(), Default::default())?; } // if wait for height doesn't lock its own blocks, can't make it async @@ -393,18 +395,21 @@ impl DaService for MockDaService { ([0u8; 32], ()) } - async fn send_transaction(&self, blob: &[u8]) -> Result { + async fn send_transaction(&self, da_data: DaData) -> Result { + let blob = borsh::to_vec(&da_data).unwrap(); let blocks = self.blocks.lock().await; let _ = self.add_blob(&blocks, blob, Default::default())?; Ok(MockHash([0; 32])) } - fn get_send_transaction_queue(&self) -> UnboundedSender> { - let (tx, mut rx) = unbounded_channel::>(); + fn get_send_transaction_queue( + &self, + ) -> UnboundedSender> { + let (tx, mut rx) = unbounded_channel::>(); let this = self.clone(); tokio::spawn(async move { while let Some(req) = rx.recv().await { - let res = this.send_transaction(&req.blob).await; + let res = this.send_transaction(req.da_data).await; let _ = req.notify.send(res); } }); @@ -472,6 +477,7 @@ fn block_hash( #[cfg(test)] mod tests { use sov_rollup_interface::da::{BlobReaderTrait, BlockHeaderTrait}; + use sov_rollup_interface::zk::Proof; use tokio::task::JoinHandle; use tokio_stream::StreamExt; @@ -551,10 +557,10 @@ mod tests { get_finalized_headers_collector(&mut da, number_of_finalized_blocks).await; for i in 0..num_blocks { - let published_blob: Vec = vec![i as u8; i + 1]; + let published_blob = DaData::ZKProof(Proof::Full(vec![i as u8; i + 1])); let height = (i + 1) as u64; - da.send_transaction(&published_blob).await.unwrap(); + da.send_transaction(published_blob.clone()).await.unwrap(); let mut block = da.get_block_at(height).await.unwrap(); @@ -562,6 +568,7 @@ mod tests { assert_eq!(1, block.blobs.len()); let blob = &mut block.blobs[0]; let retrieved_data = blob.full_data().to_vec(); + let retrieved_data = borsh::from_slice(&retrieved_data).unwrap(); assert_eq!(published_blob, retrieved_data); let last_finalized_block_response = da.get_last_finalized_block_header().await; @@ -598,7 +605,9 @@ mod tests { for (i, blob) in blobs.iter().enumerate() { let height = (i + 1) as u64; // Send transaction should pass - da.send_transaction(blob).await.unwrap(); + da.send_transaction(DaData::ZKProof(Proof::Full(blob.to_owned()))) + .await + .unwrap(); let last_finalized_block_response = da.get_last_finalized_block_header().await; validate_get_finalized_header_response( height, @@ -624,7 +633,10 @@ mod tests { let last_finalized_header = da.get_last_finalized_block_header().await.unwrap(); assert_eq!(expected_finalized_height, last_finalized_header.height()); - assert_eq!(&blob, fetched_block.blobs[0].full_data()); + let da_data = DaData::ZKProof(Proof::Full(blob)); + let retrieved_data = fetched_block.blobs[0].full_data(); + let retrieved_data = borsh::from_slice(retrieved_data).unwrap(); + assert_eq!(da_data, retrieved_data); let head_block_header = da.get_head_block_header().await.unwrap(); assert_eq!(expected_head_height, head_block_header.height()); @@ -676,9 +688,15 @@ mod tests { // 1 -> 2 -> 3 - da.send_transaction(&[1, 2, 3, 4]).await.unwrap(); - da.send_transaction(&[4, 5, 6, 7]).await.unwrap(); - da.send_transaction(&[8, 9, 0, 1]).await.unwrap(); + da.send_transaction(DaData::ZKProof(Proof::Full(vec![1, 2, 3, 4]))) + .await + .unwrap(); + da.send_transaction(DaData::ZKProof(Proof::Full(vec![4, 5, 6, 7]))) + .await + .unwrap(); + da.send_transaction(DaData::ZKProof(Proof::Full(vec![8, 9, 0, 1]))) + .await + .unwrap(); let block_1_before = da.get_block_at(1).await.unwrap(); let block_2_before = da.get_block_at(2).await.unwrap(); @@ -727,13 +745,21 @@ mod tests { // \ -> 3.2 -> 4.2 // 1 - da.send_transaction(&[1, 2, 3, 4]).await.unwrap(); + da.send_transaction(DaData::ZKProof(Proof::Full(vec![1, 2, 3, 4]))) + .await + .unwrap(); // 2 - da.send_transaction(&[4, 5, 6, 7]).await.unwrap(); + da.send_transaction(DaData::ZKProof(Proof::Full(vec![4, 5, 6, 7]))) + .await + .unwrap(); // 3.1 - da.send_transaction(&[8, 9, 0, 1]).await.unwrap(); + da.send_transaction(DaData::ZKProof(Proof::Full(vec![8, 9, 0, 1]))) + .await + .unwrap(); // 4.1 - da.send_transaction(&[2, 3, 4, 5]).await.unwrap(); + da.send_transaction(DaData::ZKProof(Proof::Full(vec![2, 3, 4, 5]))) + .await + .unwrap(); let _block_1 = da.get_block_at(1).await.unwrap(); let block_2 = da.get_block_at(2).await.unwrap(); @@ -761,10 +787,18 @@ mod tests { // 1 -> 2 -> 3 -> 4 - da.send_transaction(&[1, 2, 3, 4]).await.unwrap(); - da.send_transaction(&[4, 5, 6, 7]).await.unwrap(); - da.send_transaction(&[8, 9, 0, 1]).await.unwrap(); - da.send_transaction(&[2, 3, 4, 5]).await.unwrap(); + da.send_transaction(DaData::ZKProof(Proof::Full(vec![1, 2, 3, 4]))) + .await + .unwrap(); + da.send_transaction(DaData::ZKProof(Proof::Full(vec![4, 5, 6, 7]))) + .await + .unwrap(); + da.send_transaction(DaData::ZKProof(Proof::Full(vec![8, 9, 0, 1]))) + .await + .unwrap(); + da.send_transaction(DaData::ZKProof(Proof::Full(vec![2, 3, 4, 5]))) + .await + .unwrap(); let block_1_before = da.get_block_at(1).await.unwrap(); let block_2_before = da.get_block_at(2).await.unwrap(); @@ -822,9 +856,15 @@ mod tests { assert!(has_planned_fork.is_some()); } - da.send_transaction(&[1, 2, 3, 4]).await.unwrap(); - da.send_transaction(&[4, 5, 6, 7]).await.unwrap(); - da.send_transaction(&[8, 9, 0, 1]).await.unwrap(); + da.send_transaction(DaData::ZKProof(Proof::Full(vec![1, 2, 3, 4]))) + .await + .unwrap(); + da.send_transaction(DaData::ZKProof(Proof::Full(vec![4, 5, 6, 7]))) + .await + .unwrap(); + da.send_transaction(DaData::ZKProof(Proof::Full(vec![8, 9, 0, 1]))) + .await + .unwrap(); let block_1_before = da.get_block_at(1).await.unwrap(); let block_2_before = da.get_block_at(2).await.unwrap(); @@ -854,11 +894,21 @@ mod tests { PlannedFork::new(4, 2, vec![vec![13, 13, 13, 13], vec![14, 14, 14, 14]]); da.set_planned_fork(planned_fork).await.unwrap(); - da.send_transaction(&[1, 1, 1, 1]).await.unwrap(); - da.send_transaction(&[2, 2, 2, 2]).await.unwrap(); - da.send_transaction(&[3, 3, 3, 3]).await.unwrap(); - da.send_transaction(&[4, 4, 4, 4]).await.unwrap(); - da.send_transaction(&[5, 5, 5, 5]).await.unwrap(); + da.send_transaction(DaData::ZKProof(Proof::Full(vec![1, 1, 1, 1]))) + .await + .unwrap(); + da.send_transaction(DaData::ZKProof(Proof::Full(vec![2, 2, 2, 2]))) + .await + .unwrap(); + da.send_transaction(DaData::ZKProof(Proof::Full(vec![3, 3, 3, 3]))) + .await + .unwrap(); + da.send_transaction(DaData::ZKProof(Proof::Full(vec![4, 4, 4, 4]))) + .await + .unwrap(); + da.send_transaction(DaData::ZKProof(Proof::Full(vec![5, 5, 5, 5]))) + .await + .unwrap(); let block_1_before = da.get_block_at(1).await.unwrap(); let block_2_before = da.get_block_at(2).await.unwrap(); diff --git a/crates/sovereign-sdk/rollup-interface/src/node/services/da.rs b/crates/sovereign-sdk/rollup-interface/src/node/services/da.rs index 32a4584b2..330ec2109 100644 --- a/crates/sovereign-sdk/rollup-interface/src/node/services/da.rs +++ b/crates/sovereign-sdk/rollup-interface/src/node/services/da.rs @@ -9,14 +9,14 @@ use tokio::sync::oneshot::Sender as OneshotSender; use crate::da::BlockHeaderTrait; #[cfg(feature = "native")] -use crate::da::{DaSpec, DaVerifier}; +use crate::da::{DaData, DaSpec, DaVerifier}; use crate::zk::ValidityCondition; /// This type represents a queued request to send_transaction #[cfg(feature = "native")] -pub struct BlobWithNotifier { - /// Blob to send. - pub blob: Vec, +pub struct SenderWithNotifier { + /// Data to send. + pub da_data: DaData, /// Channel to receive result of the operation. pub notify: OneshotSender>, } @@ -128,10 +128,12 @@ pub trait DaService: Send + Sync + 'static { /// Send a transaction directly to the DA layer. /// blob is the serialized and signed transaction. /// Returns nothing if the transaction was successfully sent. - async fn send_transaction(&self, blob: &[u8]) -> Result; + async fn send_transaction(&self, da_data: DaData) -> Result; /// A tx part of the queue to send transactions in order - fn get_send_transaction_queue(&self) -> UnboundedSender> { + fn get_send_transaction_queue( + &self, + ) -> UnboundedSender> { unimplemented!() } diff --git a/crates/sovereign-sdk/utils/rng-da-service/src/lib.rs b/crates/sovereign-sdk/utils/rng-da-service/src/lib.rs index 780f27796..ce88374d4 100644 --- a/crates/sovereign-sdk/utils/rng-da-service/src/lib.rs +++ b/crates/sovereign-sdk/utils/rng-da-service/src/lib.rs @@ -13,7 +13,7 @@ use sov_modules_api::default_context::DefaultContext; use sov_modules_api::default_signature::private_key::DefaultPrivateKey; use sov_modules_api::transaction::Transaction; use sov_modules_api::{Address, AddressBech32, EncodeCall, PrivateKey, PublicKey, Spec}; -use sov_rollup_interface::da::{BlockHeaderTrait, DaSpec, DaVerifier, Time}; +use sov_rollup_interface::da::{BlockHeaderTrait, DaData, DaSpec, DaVerifier, Time}; use sov_rollup_interface::services::da::{DaService, SlotData}; const DEFAULT_CHAIN_ID: u64 = 0; @@ -179,7 +179,7 @@ impl DaService for RngDaService { unimplemented!() } - async fn send_transaction(&self, _blob: &[u8]) -> Result { + async fn send_transaction(&self, _blob: DaData) -> Result { unimplemented!() }