diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9035ec6d..79d25283 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -35,6 +35,10 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v1 + + - name: Install toolchain + uses: dtolnay/rust-toolchain@nightly + - name: Install Protoc uses: arduino/setup-protoc@v2 with: @@ -43,8 +47,8 @@ jobs: - name: Run rustdoc check env: - RUSTDOCFLAGS: -D warnings - run: cargo doc + RUSTDOCFLAGS: --cfg docs_rs -D warnings + run: cargo +nightly doc fmt: diff --git a/Cargo.lock b/Cargo.lock index 1b8ebf7e..d32ded83 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -542,8 +542,9 @@ dependencies = [ [[package]] name = "beetswap" version = "0.1.0" -source = "git+https://github.com/eigerco/beetswap?rev=9316bd6#9316bd600fe745e685ef158ee54c98b5e48a3309" +source = "git+https://github.com/eigerco/beetswap?rev=f69ddd8#f69ddd8e429a4913aa3fb29b41b4a0913c221a63" dependencies = [ + "async-trait", "asynchronous-codec 0.7.0", "blockstore", "bytes", @@ -558,6 +559,7 @@ dependencies = [ "quick-protobuf", "smallvec", "thiserror", + "tracing", "unsigned-varint 0.8.0", "void", ] diff --git a/blockstore/src/lib.rs b/blockstore/src/lib.rs index 2e7021e8..da42e787 100644 --- a/blockstore/src/lib.rs +++ b/blockstore/src/lib.rs @@ -1,3 +1,4 @@ +#![cfg_attr(docs_rs, feature(doc_cfg))] #![doc = include_str!("../README.md")] use cid::CidGeneric; diff --git a/node/Cargo.toml b/node/Cargo.toml index 22d133df..893836f4 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -33,7 +33,7 @@ libp2p = { workspace = true, features = [ ] } async-trait = "0.1.73" -beetswap = { git = "https://github.com/eigerco/beetswap", rev = "9316bd6" } +beetswap = { git = "https://github.com/eigerco/beetswap", rev = "f69ddd8" } cid = { version = "0.11.0", features = ["serde-codec"] } dashmap = "5.5.3" futures = "0.3.28" diff --git a/node/src/daser.rs b/node/src/daser.rs new file mode 100644 index 00000000..23a1065d --- /dev/null +++ b/node/src/daser.rs @@ -0,0 +1,308 @@ +//! Component responsible for data availability sampling of the already synchronized block +//! headers announced on the Celestia network. +//! +//! When a new header is inserted into the [`Store`], [`Daser`] gets notified. It then fetches +//! random [`Sample`]s of the block via Shwap protocol and verifies them. If all the samples +//! get verified successfuly, then block is marked as accepted. Otherwise, if [`Daser`] doesn't +//! receive valid samples, block is marked as not accepted and data sampling continues. +//! +//! [`Sample`]: celestia_types::sample::Sample + +use std::collections::HashSet; +use std::sync::Arc; + +use celestia_types::ExtendedHeader; +use cid::Cid; +use futures::stream::FuturesUnordered; +use futures::StreamExt; +use instant::Instant; +use rand::Rng; +use tokio::select; +use tokio_util::sync::CancellationToken; +use tracing::{debug, error}; + +use crate::executor::spawn; +use crate::p2p::shwap::convert_cid; +use crate::p2p::{P2p, P2pError}; +use crate::store::{Store, StoreError}; + +const MAX_SAMPLES_NEEDED: usize = 16; + +type Result = std::result::Result; + +/// Representation of all the errors that can occur when interacting with the [`Daser`]. +#[derive(Debug, thiserror::Error)] +pub enum DaserError { + /// An error propagated from the [`P2p`] module. + #[error(transparent)] + P2p(#[from] P2pError), + + /// An error propagated from the [`Store`] module. + #[error(transparent)] + Store(#[from] StoreError), +} + +/// Component responsible for data availability sampling of blocks from the network. +pub struct Daser { + cancellation_token: CancellationToken, +} + +/// Arguments used to configure the [`Daser`]. +pub struct DaserArgs +where + S: Store, +{ + /// Handler for the peer to peer messaging. + pub p2p: Arc, + /// Headers storage. + pub store: Arc, +} + +impl Daser { + /// Create and start the [`Daser`]. + pub fn start(args: DaserArgs) -> Result + where + S: Store + 'static, + { + let cancellation_token = CancellationToken::new(); + let mut worker = Worker::new(args, cancellation_token.child_token())?; + + spawn(async move { + if let Err(e) = worker.run().await { + error!("Fatal DASer error: {e}"); + } + }); + + Ok(Daser { cancellation_token }) + } + + /// Stop the [`Daser`]. + pub fn stop(&self) { + // Singal the Worker to stop. + // TODO: Should we wait for the Worker to stop? + self.cancellation_token.cancel(); + } +} + +impl Drop for Daser { + fn drop(&mut self) { + self.cancellation_token.cancel(); + } +} + +struct Worker +where + S: Store + 'static, +{ + cancellation_token: CancellationToken, + p2p: Arc, + store: Arc, + max_samples_needed: usize, +} + +impl Worker +where + S: Store, +{ + fn new(args: DaserArgs, cancellation_token: CancellationToken) -> Result> { + Ok(Worker { + cancellation_token, + p2p: args.p2p, + store: args.store, + max_samples_needed: MAX_SAMPLES_NEEDED, + }) + } + + async fn run(&mut self) -> Result<()> { + let cancellation_token = self.cancellation_token.clone(); + + loop { + select! { + _ = cancellation_token.cancelled() => break, + res = self.sample_next_block() => res?, + } + } + + Ok(()) + } + + async fn sample_next_block(&mut self) -> Result<()> { + let height = self.store.next_unsampled_height().await?; + self.store.wait_height(height).await?; + + let header = self.store.get_by_height(height).await?; + let (sampled_cids, accepted) = self.sample_block(&header).await?; + + self.store + .update_sampling_metadata(height, accepted, sampled_cids) + .await?; + + Ok(()) + } + + async fn sample_block(&mut self, header: &ExtendedHeader) -> Result<(Vec, bool)> { + let now = Instant::now(); + let block_len = header.dah.square_len() * header.dah.square_len(); + let indexes = random_indexes(block_len, self.max_samples_needed); + let mut futs = FuturesUnordered::new(); + + for index in indexes { + let fut = self + .p2p + .get_sample(index, header.dah.square_len(), header.height().value()); + futs.push(fut); + } + + let mut cids: Vec = Vec::new(); + let mut accepted = true; + + while let Some(res) = futs.next().await { + match res { + Ok(sample) => cids.push(convert_cid(&sample.sample_id.into())?), + // Validation is done at Bitswap level, through `ShwapMultihasher`. + // If the sample is not valid, it will never be delivered to us + // as the data of the CID. Because of that, the only signal + // that data sampling verification failed is query timing out. + Err(P2pError::BitswapQueryTimeout) => accepted = false, + Err(e) => return Err(e.into()), + } + } + + debug!( + "Data sampling of {} is {}. Took {:?}", + header.height(), + if accepted { "accepted" } else { "rejected" }, + now.elapsed() + ); + + Ok((cids, accepted)) + } +} + +fn random_indexes(block_len: usize, max_samples_needed: usize) -> HashSet { + // If block length is smaller than `max_samples_needed`, we are going + // to sample the whole block. Randomness is not needed for this. + if block_len <= max_samples_needed { + return (0..block_len).collect(); + } + + let mut indexes = HashSet::with_capacity(max_samples_needed); + let mut rng = rand::thread_rng(); + + while indexes.len() < max_samples_needed { + indexes.insert(rng.gen::() % block_len); + } + + indexes +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::store::InMemoryStore; + use crate::test_utils::{async_test, dah_of_eds, generate_fake_eds, MockP2pHandle}; + use celestia_tendermint_proto::Protobuf; + use celestia_types::sample::{Sample, SampleId}; + use celestia_types::test_utils::ExtendedHeaderGenerator; + use celestia_types::{AxisType, ExtendedDataSquare}; + + #[async_test] + async fn received_valid_samples() { + let (mock, mut handle) = P2p::mocked(); + let store = Arc::new(InMemoryStore::new()); + + let _daser = Daser::start(DaserArgs { + p2p: Arc::new(mock), + store: store.clone(), + }) + .unwrap(); + + let mut gen = ExtendedHeaderGenerator::new(); + + handle.expect_no_cmd().await; + + gen_and_sample_block(&mut handle, &mut gen, &store, 2, false).await; + gen_and_sample_block(&mut handle, &mut gen, &store, 4, false).await; + gen_and_sample_block(&mut handle, &mut gen, &store, 8, false).await; + gen_and_sample_block(&mut handle, &mut gen, &store, 16, false).await; + } + + #[async_test] + async fn received_invalid_sample() { + let (mock, mut handle) = P2p::mocked(); + let store = Arc::new(InMemoryStore::new()); + + let _daser = Daser::start(DaserArgs { + p2p: Arc::new(mock), + store: store.clone(), + }) + .unwrap(); + + let mut gen = ExtendedHeaderGenerator::new(); + + handle.expect_no_cmd().await; + + gen_and_sample_block(&mut handle, &mut gen, &store, 2, false).await; + gen_and_sample_block(&mut handle, &mut gen, &store, 4, true).await; + gen_and_sample_block(&mut handle, &mut gen, &store, 8, false).await; + } + + async fn gen_and_sample_block( + handle: &mut MockP2pHandle, + gen: &mut ExtendedHeaderGenerator, + store: &InMemoryStore, + square_len: usize, + simulate_invalid_sampling: bool, + ) { + let eds = generate_fake_eds(square_len); + let dah = dah_of_eds(&eds); + let header = gen.next_with_dah(dah); + let height = header.height().value(); + + store.append_single(header).await.unwrap(); + + let mut cids = Vec::new(); + + for i in 0..(square_len * square_len).min(MAX_SAMPLES_NEEDED) { + let (cid, respond_to) = handle.expect_get_shwap_cid().await; + + // Simulate invalid sample by triggering BitswapQueryTimeout + if simulate_invalid_sampling && i == 2 { + respond_to.send(Err(P2pError::BitswapQueryTimeout)).unwrap(); + continue; + } + + let sample_id: SampleId = cid.try_into().unwrap(); + assert_eq!(sample_id.row.block_height, height); + + let sample = gen_sample_of_cid(sample_id, &eds, store).await; + let sample_bytes = sample.encode_vec().unwrap(); + + respond_to.send(Ok(sample_bytes)).unwrap(); + cids.push(cid); + } + + handle.expect_no_cmd().await; + + let sampling_metadata = store.get_sampling_metadata(height).await.unwrap().unwrap(); + assert_eq!(sampling_metadata.accepted, !simulate_invalid_sampling); + assert_eq!(sampling_metadata.cids_sampled, cids); + } + + async fn gen_sample_of_cid( + sample_id: SampleId, + eds: &ExtendedDataSquare, + store: &InMemoryStore, + ) -> Sample { + let header = store + .get_by_height(sample_id.row.block_height) + .await + .unwrap(); + + let row = sample_id.row.index as usize; + let col = sample_id.index as usize; + let index = row * header.dah.square_len() + col; + + Sample::new(AxisType::Row, index, eds, header.height().value()).unwrap() + } +} diff --git a/node/src/lib.rs b/node/src/lib.rs index 4ca3aef5..1168e32f 100644 --- a/node/src/lib.rs +++ b/node/src/lib.rs @@ -2,6 +2,7 @@ #![doc = include_str!("../README.md")] pub mod blockstore; +pub mod daser; mod executor; pub mod network; pub mod node; diff --git a/node/src/node.rs b/node/src/node.rs index 549f0e73..37ad6c75 100644 --- a/node/src/node.rs +++ b/node/src/node.rs @@ -14,11 +14,11 @@ use celestia_types::nmt::Namespace; use celestia_types::row::Row; use celestia_types::sample::Sample; use celestia_types::ExtendedHeader; -use cid::Cid; use libp2p::identity::Keypair; use libp2p::swarm::NetworkInfo; use libp2p::{Multiaddr, PeerId}; +use crate::daser::{Daser, DaserArgs, DaserError}; use crate::p2p::{P2p, P2pArgs, P2pError}; use crate::peer_tracker::PeerTrackerInfo; use crate::store::{Store, StoreError}; @@ -40,6 +40,10 @@ pub enum NodeError { /// An error propagated from the [`Store`] module. #[error(transparent)] Store(#[from] StoreError), + + /// An error propagated from the [`Daser`] module. + #[error(transparent)] + Daser(#[from] DaserError), } /// Node conifguration. @@ -72,6 +76,7 @@ where p2p: Arc, store: Arc, syncer: Arc>, + _daser: Arc, } impl Node @@ -100,7 +105,17 @@ where p2p: p2p.clone(), })?); - Ok(Node { p2p, store, syncer }) + let daser = Arc::new(Daser::start(DaserArgs { + p2p: p2p.clone(), + store: store.clone(), + })?); + + Ok(Node { + p2p, + store, + syncer, + _daser: daser, + }) } /// Get node's local peer ID. @@ -171,11 +186,6 @@ where Ok(self.p2p.get_verified_headers_range(from, amount).await?) } - /// Request data of a [`Cid`] from the network. - pub async fn request_cid(&self, cid: Cid) -> Result> { - Ok(self.p2p.get_cid(cid).await?) - } - /// Request a [`Row`] from the network. /// /// The result was not verified and [`Row::verify`] must be called. diff --git a/node/src/p2p.rs b/node/src/p2p.rs index 9f149fb0..8b900aba 100644 --- a/node/src/p2p.rs +++ b/node/src/p2p.rs @@ -11,8 +11,10 @@ //! - header-ex server use std::collections::HashMap; +use std::future::poll_fn; use std::io; use std::sync::Arc; +use std::task::Poll; use std::time::Duration; use blockstore::Blockstore; @@ -39,16 +41,17 @@ use libp2p::{ swarm::{ConnectionId, DialError, NetworkBehaviour, NetworkInfo, Swarm, SwarmEvent}, Multiaddr, PeerId, TransportError, }; +use smallvec::SmallVec; use tokio::select; use tokio::sync::{mpsc, oneshot, watch}; use tracing::{debug, info, instrument, trace, warn}; mod header_ex; mod header_session; -mod shwap; +pub(crate) mod shwap; mod swarm; -use crate::executor::{spawn, Interval}; +use crate::executor::{self, spawn, Interval}; use crate::p2p::header_ex::{HeaderExBehaviour, HeaderExConfig}; use crate::p2p::header_session::HeaderSession; use crate::p2p::shwap::{namespaced_data_cid, row_cid, sample_cid, ShwapMultihasher}; @@ -79,6 +82,8 @@ const KADEMLIA_BOOTSTRAP_PERIOD: Duration = Duration::from_secs(5 * 60); // Maximum size of a [`Multihash`]. pub(crate) const MAX_MH_SIZE: usize = 64; +const GET_SAMPLE_TIMEOUT: Duration = Duration::from_secs(10); + type Result = std::result::Result; /// Representation of all the errors that can occur when interacting with [`P2p`]. @@ -135,6 +140,10 @@ pub enum P2pError { /// An error propagated from [`celestia_types`] that is related to [`Cid`]. #[error("CID error: {0}")] Cid(celestia_types::Error), + + /// Bitswap query timed out. + #[error("Bitswap query timed out")] + BitswapQueryTimeout, } impl From for P2pError { @@ -194,9 +203,9 @@ pub(crate) enum P2pCmd { peer_id: PeerId, is_trusted: bool, }, - GetCid { + GetShwapCid { cid: Cid, - respond_to: OneshotResultSender, beetswap::Error>, + respond_to: OneshotResultSender, P2pError>, }, } @@ -388,26 +397,38 @@ impl P2p { } /// Request a [`Cid`] on bitswap protocol. - pub async fn get_cid(&self, cid: Cid) -> Result> { + async fn get_shwap_cid(&self, cid: Cid, timeout: Option) -> Result> { let (tx, rx) = oneshot::channel(); - self.send_command(P2pCmd::GetCid { + self.send_command(P2pCmd::GetShwapCid { cid, respond_to: tx, }) .await?; - Ok(rx.await??) + let data = match timeout { + Some(dur) => executor::timeout(dur, rx) + .await + .map_err(|_| P2pError::BitswapQueryTimeout)???, + None => rx.await??, + }; + + Ok(data) } /// Request a [`Row`] on bitswap protocol. pub async fn get_row(&self, row_index: u16, block_height: u64) -> Result { let cid = row_cid(row_index, block_height)?; - let data = self.get_cid(cid).await?; + // TODO: add timeout + let data = self.get_shwap_cid(cid, None).await?; Ok(Row::decode(&data[..])?) } /// Request a [`Sample`] on bitswap protocol. + /// + /// This method awaits for a verified `Sample` until timeout of 10 second + /// is reached. On timeout it is safe to assume that sampling of the block + /// failed. pub async fn get_sample( &self, index: usize, @@ -415,7 +436,7 @@ impl P2p { block_height: u64, ) -> Result { let cid = sample_cid(index, square_len, block_height)?; - let data = self.get_cid(cid).await?; + let data = self.get_shwap_cid(cid, Some(GET_SAMPLE_TIMEOUT)).await?; Ok(Sample::decode(&data[..])?) } @@ -427,7 +448,8 @@ impl P2p { block_height: u64, ) -> Result { let cid = namespaced_data_cid(namespace, row_index, block_height)?; - let data = self.get_cid(cid).await?; + // TODO: add timeout + let data = self.get_shwap_cid(cid, None).await?; Ok(NamespacedData::decode(&data[..])?) } @@ -487,7 +509,7 @@ where cmd_rx: mpsc::Receiver, peer_tracker: Arc, header_sub_watcher: watch::Sender>, - bitswap_queries: HashMap, beetswap::Error>>, + bitswap_queries: HashMap, P2pError>>, } impl Worker @@ -515,7 +537,7 @@ where let gossipsub = init_gossipsub(&args, [&header_sub_topic])?; let kademlia = init_kademlia(&args)?; - let bitswap = init_bitswap(args.blockstore, &args.network_id)?; + let bitswap = init_bitswap(args.blockstore, args.store.clone(), &args.network_id)?; let header_ex = HeaderExBehaviour::new(HeaderExConfig { network_id: &args.network_id, @@ -579,6 +601,9 @@ where kademlia_last_bootstrap = Instant::now(); } } + _ = poll_closed(&mut self.bitswap_queries) => { + self.prune_canceled_bitswap_queries(); + } ev = self.swarm.select_next_some() => { if let Err(e) = self.on_swarm_event(ev).await { warn!("Failure while handling swarm event: {e}"); @@ -593,6 +618,21 @@ where } } + fn prune_canceled_bitswap_queries(&mut self) { + let mut cancelled = SmallVec::<[_; 16]>::new(); + + for (query_id, chan) in &self.bitswap_queries { + if chan.is_closed() { + cancelled.push(*query_id); + } + } + + for query_id in cancelled { + self.bitswap_queries.remove(&query_id); + self.swarm.behaviour_mut().bitswap.cancel(query_id); + } + } + async fn on_swarm_event(&mut self, ev: SwarmEvent>) -> Result<()> { match ev { SwarmEvent::Behaviour(ev) => match ev { @@ -669,8 +709,8 @@ where self.peer_tracker.set_trusted(peer_id, is_trusted); } } - P2pCmd::GetCid { cid, respond_to } => { - self.on_get_cid(cid, respond_to); + P2pCmd::GetShwapCid { cid, respond_to } => { + self.on_get_shwap_cid(cid, respond_to); } } @@ -754,7 +794,7 @@ where } #[instrument(level = "trace", skip_all)] - fn on_get_cid(&mut self, cid: Cid, respond_to: OneshotResultSender, beetswap::Error>) { + fn on_get_shwap_cid(&mut self, cid: Cid, respond_to: OneshotResultSender, P2pError>) { trace!("Requesting CID {cid} from bitswap"); let query_id = self.swarm.behaviour_mut().bitswap.get(&cid); self.bitswap_queries.insert(query_id, respond_to); @@ -770,6 +810,7 @@ where } beetswap::Event::GetQueryError { query_id, error } => { if let Some(respond_to) = self.bitswap_queries.remove(&query_id) { + let error: P2pError = error.into(); respond_to.maybe_send_err(error); } } @@ -860,6 +901,23 @@ where } } +/// Awaits at least one channel from the `bitswap_queries` to close. +async fn poll_closed( + bitswap_queries: &mut HashMap, P2pError>>, +) { + poll_fn(|cx| { + for chan in bitswap_queries.values_mut() { + match chan.poll_closed(cx) { + Poll::Pending => continue, + Poll::Ready(_) => return Poll::Ready(()), + } + } + + Poll::Pending + }) + .await +} + fn validate_bootnode_addrs(addrs: &[Multiaddr]) -> Result<(), P2pError> { let mut invalid_addrs = Vec::new(); @@ -934,15 +992,20 @@ where Ok(kademlia) } -fn init_bitswap(blockstore: B, network_id: &str) -> Result> +fn init_bitswap( + blockstore: B, + store: Arc, + network_id: &str, +) -> Result> where B: Blockstore, + S: Store + 'static, { let protocol_prefix = format!("/celestia/{}", network_id); Ok(beetswap::Behaviour::builder(blockstore) .protocol_prefix(&protocol_prefix)? - .register_multihasher(ShwapMultihasher) + .register_multihasher(ShwapMultihasher::new(store)) .client_set_send_dont_have(false) .build()) } diff --git a/node/src/p2p/header_ex.rs b/node/src/p2p/header_ex.rs index 758b5396..4b1dfd9c 100644 --- a/node/src/p2p/header_ex.rs +++ b/node/src/p2p/header_ex.rs @@ -533,20 +533,14 @@ fn parse_header_request(buf: &[u8]) -> Option { #[cfg(test)] mod tests { use super::*; + use crate::test_utils::async_test; use bytes::BytesMut; - use celestia_proto::p2p::pb::{header_request::Data, HeaderRequest, HeaderResponse}; - use futures::io::{AsyncRead, AsyncReadExt, Cursor, Error}; - use futures::task::{Context, Poll}; - use libp2p::{request_response::Codec, swarm::StreamProtocol}; - use prost::{encode_length_delimiter, Message}; + use celestia_proto::p2p::pb::header_request::Data; + use futures::io::{Cursor, Error}; + use prost::encode_length_delimiter; use std::io::ErrorKind; use std::pin::Pin; - #[cfg(not(target_arch = "wasm32"))] - use tokio::test as async_test; - #[cfg(target_arch = "wasm32")] - use wasm_bindgen_test::wasm_bindgen_test as async_test; - #[async_test] async fn test_decode_header_request_empty() { let header_request = HeaderRequest { diff --git a/node/src/p2p/header_ex/client.rs b/node/src/p2p/header_ex/client.rs index 10c15a93..3020187a 100644 --- a/node/src/p2p/header_ex/client.rs +++ b/node/src/p2p/header_ex/client.rs @@ -304,7 +304,7 @@ async fn decode_and_verify_responses( mod tests { use super::*; use crate::p2p::header_ex::utils::ExtendedHeaderExt; - use celestia_proto::p2p::pb::header_request::Data; + use crate::test_utils::async_test; use celestia_proto::p2p::pb::StatusCode; use celestia_types::consts::HASH_SIZE; use celestia_types::hash::Hash; @@ -314,11 +314,6 @@ mod tests { use std::io; use std::sync::atomic::{AtomicU64, Ordering}; - #[cfg(not(target_arch = "wasm32"))] - use tokio::test as async_test; - #[cfg(target_arch = "wasm32")] - use wasm_bindgen_test::wasm_bindgen_test as async_test; - #[async_test] async fn request_height() { let peer_tracker = peer_tracker_with_n_peers(15); diff --git a/node/src/p2p/header_ex/server.rs b/node/src/p2p/header_ex/server.rs index c9358d02..191b9f74 100644 --- a/node/src/p2p/header_ex/server.rs +++ b/node/src/p2p/header_ex/server.rs @@ -205,23 +205,15 @@ fn parse_request(request: HeaderRequest) -> Option<(u64, header_request::Data)> #[cfg(test)] mod tests { use super::*; - use crate::p2p::header_ex::utils::HeaderRequestExt; - use crate::test_utils::gen_filled_store; + use crate::test_utils::{async_test, gen_filled_store}; use celestia_proto::p2p::pb::header_request::Data; - use celestia_proto::p2p::pb::{HeaderRequest, StatusCode}; + use celestia_proto::p2p::pb::StatusCode; use celestia_tendermint_proto::Protobuf; use celestia_types::ExtendedHeader; - use libp2p::PeerId; use std::future::poll_fn; - use std::sync::Arc; use tokio::select; use tokio::sync::oneshot; - #[cfg(not(target_arch = "wasm32"))] - use tokio::test as async_test; - #[cfg(target_arch = "wasm32")] - use wasm_bindgen_test::wasm_bindgen_test as async_test; - #[async_test] async fn request_head_test() { let (store, _) = gen_filled_store(4); diff --git a/node/src/p2p/header_session.rs b/node/src/p2p/header_session.rs index c476f247..4a48e436 100644 --- a/node/src/p2p/header_session.rs +++ b/node/src/p2p/header_session.rs @@ -140,15 +140,10 @@ impl HeaderSession { #[cfg(test)] mod tests { use super::*; - use crate::executor::spawn; use crate::p2p::P2p; + use crate::test_utils::async_test; use celestia_types::test_utils::ExtendedHeaderGenerator; - #[cfg(not(target_arch = "wasm32"))] - use tokio::test as async_test; - #[cfg(target_arch = "wasm32")] - use wasm_bindgen_test::wasm_bindgen_test as async_test; - #[async_test] async fn retry_on_missing_range() { let (_p2p, mut p2p_mock) = P2p::mocked(); diff --git a/node/src/p2p/shwap.rs b/node/src/p2p/shwap.rs index 00246aaa..7aab13d9 100644 --- a/node/src/p2p/shwap.rs +++ b/node/src/p2p/shwap.rs @@ -1,33 +1,111 @@ -use beetswap::multihasher::Multihasher; +use std::sync::Arc; + +use async_trait::async_trait; +use beetswap::multihasher::{Multihasher, MultihasherError}; use blockstore::block::CidError; -use celestia_proto::share::p2p::shwap::{ - Data as RawNamespacedData, Row as RawRow, Sample as RawSample, +use celestia_tendermint_proto::Protobuf; +use celestia_types::namespaced_data::{ + NamespacedData, NamespacedDataId, NAMESPACED_DATA_ID_MULTIHASH_CODE, }; -use celestia_types::namespaced_data::{NamespacedDataId, NAMESPACED_DATA_ID_MULTIHASH_CODE}; use celestia_types::nmt::Namespace; -use celestia_types::row::{RowId, ROW_ID_MULTIHASH_CODE}; -use celestia_types::sample::{SampleId, SAMPLE_ID_MULTIHASH_CODE}; +use celestia_types::row::{Row, RowId, ROW_ID_MULTIHASH_CODE}; +use celestia_types::sample::{Sample, SampleId, SAMPLE_ID_MULTIHASH_CODE}; use cid::{Cid, CidGeneric}; use libp2p::multihash::Multihash; -use prost::Message; - -use crate::p2p::Result; -use super::{P2pError, MAX_MH_SIZE}; +use crate::p2p::{P2pError, Result, MAX_MH_SIZE}; +use crate::store::Store; /// Multihasher for Shwap types. -pub(super) struct ShwapMultihasher; - -impl Multihasher for ShwapMultihasher { - fn digest(&self, multihash_code: u64, input: &[u8]) -> Option> { - let data = match multihash_code { - NAMESPACED_DATA_ID_MULTIHASH_CODE => RawNamespacedData::decode(input).ok()?.data_id, - ROW_ID_MULTIHASH_CODE => RawRow::decode(input).ok()?.row_id, - SAMPLE_ID_MULTIHASH_CODE => RawSample::decode(input).ok()?.sample_id, - _ => return None, - }; - - Multihash::wrap(multihash_code, &data).ok() +pub(super) struct ShwapMultihasher +where + S: Store + 'static, +{ + header_store: Arc, +} + +impl ShwapMultihasher +where + S: Store + 'static, +{ + pub(super) fn new(header_store: Arc) -> Self { + ShwapMultihasher { header_store } + } +} + +#[async_trait] +impl Multihasher for ShwapMultihasher +where + S: Store + 'static, +{ + async fn hash( + &self, + multihash_code: u64, + input: &[u8], + ) -> Result, MultihasherError> { + match multihash_code { + NAMESPACED_DATA_ID_MULTIHASH_CODE => { + let ns_data = + NamespacedData::decode(input).map_err(MultihasherError::custom_fatal)?; + + let hash = convert_cid(&ns_data.namespaced_data_id.into()) + .map_err(MultihasherError::custom_fatal)? + .hash() + .to_owned(); + + let header = self + .header_store + .get_by_height(ns_data.namespaced_data_id.row.block_height) + .await + .map_err(MultihasherError::custom_fatal)?; + + ns_data + .verify(&header.dah) + .map_err(MultihasherError::custom_fatal)?; + + Ok(hash) + } + ROW_ID_MULTIHASH_CODE => { + let row = Row::decode(input).map_err(MultihasherError::custom_fatal)?; + + let hash = convert_cid(&row.row_id.into()) + .map_err(MultihasherError::custom_fatal)? + .hash() + .to_owned(); + + let header = self + .header_store + .get_by_height(row.row_id.block_height) + .await + .map_err(MultihasherError::custom_fatal)?; + + row.verify(&header.dah) + .map_err(MultihasherError::custom_fatal)?; + + Ok(hash) + } + SAMPLE_ID_MULTIHASH_CODE => { + let sample = Sample::decode(input).map_err(MultihasherError::custom_fatal)?; + + let hash = convert_cid(&sample.sample_id.into()) + .map_err(MultihasherError::custom_fatal)? + .hash() + .to_owned(); + + let header = self + .header_store + .get_by_height(sample.sample_id.row.block_height) + .await + .map_err(MultihasherError::custom_fatal)?; + + sample + .verify(&header.dah) + .map_err(MultihasherError::custom_fatal)?; + + Ok(hash) + } + _ => Err(MultihasherError::UnknownMultihashCode), + } } } @@ -51,7 +129,7 @@ pub(super) fn namespaced_data_cid( convert_cid(&data_id.into()) } -fn convert_cid(cid: &CidGeneric) -> Result { +pub(crate) fn convert_cid(cid: &CidGeneric) -> Result { beetswap::utils::convert_cid(cid).ok_or(P2pError::Cid(celestia_types::Error::CidError( CidError::InvalidMultihashLength(64), ))) @@ -60,24 +138,33 @@ fn convert_cid(cid: &CidGeneric) -> Result { #[cfg(test)] mod tests { use super::*; + use crate::store::InMemoryStore; + use crate::test_utils::{async_test, dah_of_eds, generate_fake_eds}; + use celestia_types::test_utils::ExtendedHeaderGenerator; + use celestia_types::AxisType; - #[test] - fn digest() { - let hash = ShwapMultihasher - .digest( - 0x7821, - &[ - 10, 39, 6, 0, 0, 0, 0, 0, 0, 0, 5, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 26, 0, - ], - ) - .unwrap(); + #[async_test] + async fn hash() { + let store = Arc::new(InMemoryStore::new()); + + let eds = generate_fake_eds(4); + let dah = dah_of_eds(&eds); + + let mut gen = ExtendedHeaderGenerator::new(); + let header = gen.next_with_dah(dah.clone()); + + let sample = Sample::new(AxisType::Row, 0, &eds, header.header.height.value()).unwrap(); + let sample_bytes = sample.encode_vec().unwrap(); + let cid = sample_cid(0, eds.square_len(), 1).unwrap(); + + sample.verify(&dah).unwrap(); + store.append_single(header).await.unwrap(); - let cid = "bagqpaanb6aasobqaaaaaaaaaaacqaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaq" - .parse::() + let hash = ShwapMultihasher::new(store) + .hash(SAMPLE_ID_MULTIHASH_CODE, &sample_bytes) + .await .unwrap(); - let expected_hash = cid.hash(); - assert_eq!(hash, *expected_hash); + assert_eq!(hash, *cid.hash()); } } diff --git a/node/src/store.rs b/node/src/store.rs index 5df11f5c..61580048 100644 --- a/node/src/store.rs +++ b/node/src/store.rs @@ -57,6 +57,9 @@ pub trait Store: Send + Sync + Debug { /// Returns the header of a specific height. async fn get_by_height(&self, height: u64) -> Result; + /// Returns when `height` is available in the `Store`. + async fn wait_height(&self, height: u64) -> Result<()>; + /// Returns the headers from the given heights range. /// /// If start of the range is unbounded, the first returned header will be of height 1. diff --git a/node/src/store/in_memory_store.rs b/node/src/store/in_memory_store.rs index 43a80f4a..ddb1b1f4 100644 --- a/node/src/store/in_memory_store.rs +++ b/node/src/store/in_memory_store.rs @@ -1,3 +1,4 @@ +use std::pin::pin; use std::sync::atomic::{AtomicU64, Ordering}; use async_trait::async_trait; @@ -6,6 +7,7 @@ use celestia_types::ExtendedHeader; use cid::Cid; use dashmap::mapref::entry::Entry; use dashmap::DashMap; +use tokio::sync::Notify; use tracing::{debug, info}; use crate::store::{Result, SamplingMetadata, Store, StoreError}; @@ -23,6 +25,8 @@ pub struct InMemoryStore { head_height: AtomicU64, /// Cached height of the lowest header that wasn't sampled yet lowest_unsampled_height: AtomicU64, + /// Notify when a new header is added + header_added_notifier: Notify, } impl InMemoryStore { @@ -34,6 +38,7 @@ impl InMemoryStore { height_to_hash: DashMap::new(), head_height: AtomicU64::new(0), lowest_unsampled_height: AtomicU64::new(1), + header_added_notifier: Notify::new(), } } @@ -89,6 +94,7 @@ impl InMemoryStore { height_entry.insert(hash); self.head_height.store(height, Ordering::Release); + self.header_added_notifier.notify_waiters(); Ok(()) } @@ -211,6 +217,22 @@ impl Store for InMemoryStore { self.get_by_height(height) } + async fn wait_height(&self, height: u64) -> Result<()> { + let mut notifier = pin!(self.header_added_notifier.notified()); + + loop { + if self.contains_height(height) { + return Ok(()); + } + + // Await for a notification + notifier.as_mut().await; + + // Reset notifier + notifier.set(self.header_added_notifier.notified()); + } + } + async fn head_height(&self) -> Result { self.get_head_height() } @@ -261,6 +283,7 @@ impl Clone for InMemoryStore { lowest_unsampled_height: AtomicU64::new( self.lowest_unsampled_height.load(Ordering::Acquire), ), + header_added_notifier: Notify::new(), } } } @@ -268,14 +291,10 @@ impl Clone for InMemoryStore { #[cfg(test)] pub mod tests { use super::*; + use crate::test_utils::async_test; use celestia_types::test_utils::ExtendedHeaderGenerator; use celestia_types::Height; - #[cfg(not(target_arch = "wasm32"))] - use tokio::test as async_test; - #[cfg(target_arch = "wasm32")] - use wasm_bindgen_test::wasm_bindgen_test as async_test; - #[async_test] async fn test_contains_height() { let s = gen_filled_store(2).0; diff --git a/node/src/store/indexed_db_store.rs b/node/src/store/indexed_db_store.rs index a6b42430..0d168479 100644 --- a/node/src/store/indexed_db_store.rs +++ b/node/src/store/indexed_db_store.rs @@ -1,5 +1,6 @@ use std::cell::RefCell; use std::convert::Infallible; +use std::pin::pin; use async_trait::async_trait; use celestia_tendermint_proto::Protobuf; @@ -10,6 +11,7 @@ use rexie::{Direction, Index, KeyRange, ObjectStore, Rexie, TransactionMode}; use send_wrapper::SendWrapper; use serde::{Deserialize, Serialize}; use serde_wasm_bindgen::{from_value, to_value}; +use tokio::sync::Notify; use crate::store::{Result, SamplingMetadata, Store, StoreError}; @@ -42,6 +44,7 @@ pub struct IndexedDbStore { head: SendWrapper>>, lowest_unsampled_height: SendWrapper>, db: SendWrapper, + header_added_notifier: Notify, } impl IndexedDbStore { @@ -75,6 +78,7 @@ impl IndexedDbStore { head: SendWrapper::new(RefCell::new(db_head)), lowest_unsampled_height: SendWrapper::new(RefCell::new(last_sampled)), db: SendWrapper::new(rexie), + header_added_notifier: Notify::new(), }) } @@ -198,6 +202,7 @@ impl IndexedDbStore { // this shouldn't panic, we don't borrow across await points and wasm is single threaded self.head.replace(Some(header)); + self.header_added_notifier.notify_waiters(); Ok(()) } @@ -322,6 +327,22 @@ impl Store for IndexedDbStore { fut.await } + async fn wait_height(&self, height: u64) -> Result<()> { + let mut notifier = pin!(self.header_added_notifier.notified()); + + loop { + if self.contains_height(height) { + return Ok(()); + } + + // Await for a notification + notifier.as_mut().await; + + // Reset notifier + notifier.set(self.header_added_notifier.notified()); + } + } + async fn head_height(&self) -> Result { self.get_head_height() } diff --git a/node/src/store/sled_store.rs b/node/src/store/sled_store.rs index adfdaadb..7d170713 100644 --- a/node/src/store/sled_store.rs +++ b/node/src/store/sled_store.rs @@ -1,5 +1,6 @@ use std::convert::Infallible; use std::ops::Deref; +use std::pin::pin; use std::sync::Arc; use async_trait::async_trait; @@ -9,8 +10,8 @@ use celestia_types::ExtendedHeader; use cid::Cid; use sled::transaction::{abort, ConflictableTransactionError, TransactionError}; use sled::{Db, Error as SledError, Transactional, Tree}; -use tokio::task::spawn_blocking; -use tokio::task::JoinError; +use tokio::sync::Notify; +use tokio::task::{spawn_blocking, JoinError}; use tracing::{debug, info}; use crate::store::{Result, SamplingMetadata, Store, StoreError}; @@ -37,6 +38,8 @@ struct Inner { height_to_hash: Tree, /// sub-tree which maps header height to its metadata sampling_metadata: Tree, + /// Notify when a new header is added + header_added_notifier: Notify, } impl SledStore { @@ -64,6 +67,7 @@ impl SledStore { headers, height_to_hash, sampling_metadata, + header_added_notifier: Notify::new(), }), }) }) @@ -183,6 +187,8 @@ impl SledStore { }) .await??; + self.inner.header_added_notifier.notify_waiters(); + debug!("Inserting header {hash} with height {height}"); Ok(()) } @@ -321,6 +327,22 @@ impl Store for SledStore { self.get_by_height(height).await } + async fn wait_height(&self, height: u64) -> Result<()> { + let mut notifier = pin!(self.inner.header_added_notifier.notified()); + + loop { + if self.contains_height(height).await { + return Ok(()); + } + + // Await for a notification + notifier.as_mut().await; + + // Reset notifier + notifier.set(self.inner.header_added_notifier.notified()); + } + } + async fn head_height(&self) -> Result { self.head_height().await } diff --git a/node/src/syncer.rs b/node/src/syncer.rs index c2327c97..2b8ec00c 100644 --- a/node/src/syncer.rs +++ b/node/src/syncer.rs @@ -527,18 +527,9 @@ where #[cfg(test)] mod tests { use super::*; - use crate::{ - executor::sleep, - store::InMemoryStore, - test_utils::{gen_filled_store, MockP2pHandle}, - }; + use crate::store::InMemoryStore; + use crate::test_utils::{async_test, gen_filled_store, MockP2pHandle}; use celestia_types::test_utils::ExtendedHeaderGenerator; - use std::time::Duration; - - #[cfg(not(target_arch = "wasm32"))] - use tokio::test as async_test; - #[cfg(target_arch = "wasm32")] - use wasm_bindgen_test::wasm_bindgen_test as async_test; #[async_test] async fn init_without_genesis_hash() { diff --git a/node/src/test_utils.rs b/node/src/test_utils.rs index 1a72a187..b4dc303b 100644 --- a/node/src/test_utils.rs +++ b/node/src/test_utils.rs @@ -3,7 +3,10 @@ use std::time::Duration; use celestia_proto::p2p::pb::{header_request::Data, HeaderRequest}; -use celestia_types::{hash::Hash, test_utils::ExtendedHeaderGenerator, ExtendedHeader}; +use celestia_types::hash::Hash; +use celestia_types::test_utils::ExtendedHeaderGenerator; +use celestia_types::ExtendedHeader; +use cid::Cid; use libp2p::identity::{self, Keypair}; use tokio::sync::{mpsc, watch}; @@ -17,6 +20,9 @@ use crate::{ utils::OneshotResultSender, }; +#[cfg(test)] +pub(crate) use self::private::{async_test, dah_of_eds, generate_fake_eds}; + /// Generate a store pre-filled with headers. pub fn gen_filled_store(amount: u64) -> (InMemoryStore, ExtendedHeaderGenerator) { let s = InMemoryStore::new(); @@ -187,4 +193,72 @@ impl MockP2pHandle { cmd => panic!("Expecting InitHeaderSub, but received: {cmd:?}"), } } + + /// Assert that a CID request was sent to the [`P2p`] worker and obtain a response channel. + /// + /// [`P2p`]: crate::p2p::P2p + pub async fn expect_get_shwap_cid(&mut self) -> (Cid, OneshotResultSender, P2pError>) { + match self.expect_cmd().await { + P2pCmd::GetShwapCid { cid, respond_to } => (cid, respond_to), + cmd => panic!("Expecting GetShwapCid, but received: {cmd:?}"), + } + } +} + +/// Test utils only for this crate +#[cfg(test)] +mod private { + use celestia_types::consts::appconsts::SHARE_SIZE; + use celestia_types::nmt::{Namespace, NS_SIZE}; + use celestia_types::{DataAvailabilityHeader, ExtendedDataSquare}; + use rand::RngCore; + + #[cfg(not(target_arch = "wasm32"))] + pub(crate) use tokio::test as async_test; + #[cfg(target_arch = "wasm32")] + pub(crate) use wasm_bindgen_test::wasm_bindgen_test as async_test; + + pub(crate) fn generate_fake_eds(square_len: usize) -> ExtendedDataSquare { + let mut shares = Vec::new(); + let ns = Namespace::const_v0(rand::random()); + + for row in 0..square_len { + for col in 0..square_len { + let share = if row < square_len / 2 && col < square_len / 2 { + // ODS share + [ns.as_bytes(), &random_bytes(SHARE_SIZE - NS_SIZE)[..]].concat() + } else { + // Parity share + random_bytes(SHARE_SIZE) + }; + + shares.push(share); + } + } + + ExtendedDataSquare::new(shares, "fake".to_string()).unwrap() + } + + pub(crate) fn dah_of_eds(eds: &ExtendedDataSquare) -> DataAvailabilityHeader { + let mut dah = DataAvailabilityHeader { + row_roots: Vec::new(), + column_roots: Vec::new(), + }; + + for i in 0..eds.square_len() { + let row_root = eds.row_nmt(i).unwrap().root(); + dah.row_roots.push(row_root); + + let column_root = eds.column_nmt(i).unwrap().root(); + dah.column_roots.push(column_root); + } + + dah + } + + fn random_bytes(len: usize) -> Vec { + let mut buf = vec![0u8; len]; + rand::thread_rng().fill_bytes(&mut buf); + buf + } } diff --git a/proto/src/serializers/option_any.rs b/proto/src/serializers/option_any.rs index b728dbd0..266cde4e 100644 --- a/proto/src/serializers/option_any.rs +++ b/proto/src/serializers/option_any.rs @@ -51,7 +51,6 @@ where #[cfg(test)] mod tests { use super::*; - use prost_types::Any; #[cfg(target_arch = "wasm32")] use wasm_bindgen_test::wasm_bindgen_test as test; diff --git a/rpc/tests/share.rs b/rpc/tests/share.rs index 4dd4ca9d..4660faae 100644 --- a/rpc/tests/share.rs +++ b/rpc/tests/share.rs @@ -5,9 +5,8 @@ use celestia_types::consts::appconsts::{ CONTINUATION_SPARSE_SHARE_CONTENT_SIZE, FIRST_SPARSE_SHARE_CONTENT_SIZE, SEQUENCE_LEN_BYTES, SHARE_INFO_BYTES, }; -use celestia_types::nmt::{Namespace, NamespacedSha2Hasher, Nmt}; -use celestia_types::{Blob, Share}; -use nmt_rs::NamespaceMerkleHasher; +use celestia_types::nmt::{Namespace, NamespacedSha2Hasher}; +use celestia_types::Blob; pub mod utils; @@ -148,27 +147,13 @@ async fn get_eds() { let submitted_height = blob_submit(&client, &[blob]).await.unwrap(); let header = client.header_get_by_height(submitted_height).await.unwrap(); - let eds = client.share_get_eds(&header).await.unwrap(); - let width = header.dah.square_len(); - for (y, chunk) in eds.data_square.chunks(width).enumerate() { - let mut nmt = Nmt::with_hasher(NamespacedSha2Hasher::with_ignore_max_ns(true)); + for i in 0..header.dah.square_len() { + let row_root = eds.row_nmt(i).unwrap().root(); + assert_eq!(row_root, header.dah.row_root(i).unwrap()); - for (x, leaf) in chunk.iter().enumerate() { - if x < width / 2 && y < width / 2 { - // the `OriginalDataSquare` part of the `EDS` - let share = Share::from_raw(leaf).unwrap(); - let ns = share.namespace(); - nmt.push_leaf(share.as_ref(), *ns).unwrap(); - } else { - // the parity data computed using `eds.codec` - nmt.push_leaf(leaf, *Namespace::PARITY_SHARE).unwrap(); - } - } - - // check if the root corresponds to the one from the dah - let root = nmt.root(); - assert_eq!(root, header.dah.row_root(y).unwrap()); + let column_root = eds.column_nmt(i).unwrap().root(); + assert_eq!(column_root, header.dah.column_root(i).unwrap()); } } diff --git a/types/src/nmt.rs b/types/src/nmt.rs index 1fc65884..ba37b823 100644 --- a/types/src/nmt.rs +++ b/types/src/nmt.rs @@ -25,7 +25,6 @@ use cid::CidGeneric; use multihash::Multihash; use nmt_rs::simple_merkle::db::MemDb; use nmt_rs::simple_merkle::tree::MerkleHash; -use nmt_rs::NamespaceMerkleHasher; use serde::{Deserialize, Deserializer, Serialize, Serializer}; mod namespace_proof; @@ -37,6 +36,8 @@ pub use self::namespaced_hash::{ }; use crate::{Error, Result}; +pub use nmt_rs::NamespaceMerkleHasher; + /// Namespace version size in bytes. pub const NS_VER_SIZE: usize = 1; /// Namespace id size in bytes. diff --git a/types/src/row.rs b/types/src/row.rs index 251eb463..5fdff1b5 100644 --- a/types/src/row.rs +++ b/types/src/row.rs @@ -217,7 +217,6 @@ impl From for CidGeneric { mod tests { use super::*; use crate::consts::appconsts::SHARE_SIZE; - use crate::nmt::{Namespace, NS_SIZE}; #[test] fn round_trip_test() { @@ -241,9 +240,9 @@ mod tests { Row::new(1, &eds, height).unwrap(); Row::new(7, &eds, height).unwrap(); let row_err = Row::new(8, &eds, height).unwrap_err(); - assert!(matches!(row_err, Error::EdsIndexOutOfRange(8))); + assert!(matches!(row_err, Error::EdsIndexOutOfRange(_))); let row_err = Row::new(100, &eds, height).unwrap_err(); - assert!(matches!(row_err, Error::EdsIndexOutOfRange(100))); + assert!(matches!(row_err, Error::EdsIndexOutOfRange(_))); } #[test] diff --git a/types/src/rsmt2d.rs b/types/src/rsmt2d.rs index 3ffb7560..3098f0ef 100644 --- a/types/src/rsmt2d.rs +++ b/types/src/rsmt2d.rs @@ -1,12 +1,16 @@ -use std::result::Result as StdResult; +use std::cmp::Ordering; use nmt_rs::NamespaceMerkleHasher; use serde::{Deserialize, Deserializer, Serialize}; +use crate::consts::appconsts::SHARE_SIZE; use crate::namespaced_data::{NamespacedData, NamespacedDataId}; use crate::nmt::{Namespace, NamespacedSha2Hasher, Nmt, NS_SIZE}; use crate::row::RowId; -use crate::{DataAvailabilityHeader, Error, Result}; +use crate::{bail_validation, DataAvailabilityHeader, Error, Result}; + +const MIN_SQUARE_WIDTH: usize = 2; +const MIN_SHARES: usize = MIN_SQUARE_WIDTH * MIN_SQUARE_WIDTH; /// Represents either column or row of the [`ExtendedDataSquare`]. /// @@ -23,7 +27,7 @@ pub enum AxisType { impl TryFrom for AxisType { type Error = Error; - fn try_from(value: u8) -> StdResult { + fn try_from(value: u8) -> Result { match value { 0 => Ok(AxisType::Row), 1 => Ok(AxisType::Col), @@ -35,6 +39,7 @@ impl TryFrom for AxisType { /// The data matrix in Celestia blocks extended with parity data. /// /// It is created by a fixed size chunks of data, called [`Share`]s. + /// Each share is a cell of the [`ExtendedDataSquare`]. /// /// # Structure @@ -107,7 +112,7 @@ impl TryFrom for AxisType { /// let width = header.dah.square_len(); /// /// // for each row of the data square, build an nmt -/// for (y, row) in eds.data_square.chunks(width).enumerate() { +/// for (y, row) in eds.data_square().chunks(width).enumerate() { /// let mut nmt = Nmt::with_hasher(NamespacedSha2Hasher::with_ignore_max_ns(true)); /// /// for (x, leaf) in row.iter().enumerate() { @@ -135,69 +140,152 @@ impl TryFrom for AxisType { pub struct ExtendedDataSquare { /// The raw data of the EDS. #[serde(with = "celestia_tendermint_proto::serializers::bytes::vec_base64string")] - pub data_square: Vec>, + data_square: Vec>, /// The codec used to encode parity shares. - pub codec: String, + codec: String, /// pre-calculated square length #[serde(skip)] square_len: usize, } impl ExtendedDataSquare { - /// Create a new EDS out of the provided shares. Returns error if number of shares isn't - /// a square number + /// Create a new EDS out of the provided shares. + /// + /// Returns error if number of shares isn't a square number. pub fn new(shares: Vec>, codec: String) -> Result { + if shares.len() < MIN_SHARES { + bail_validation!( + "shares len ({}) < MIN_SHARES ({})", + shares.len(), + MIN_SHARES + ); + } + let square_len = f64::sqrt(shares.len() as f64) as usize; + if square_len * square_len != shares.len() { return Err(Error::EdsInvalidDimentions); } - Ok(Self { + let eds = ExtendedDataSquare { data_square: shares, codec, square_len, - }) + }; + + // Validate that namespaces of each row are sorted + for row in 0..eds.square_len() { + let mut prev_ns = None; + + for col in 0..eds.square_len() { + let share = eds.share(row, col)?; + + if share.len() != SHARE_SIZE { + bail_validation!("share len ({}) != SHARE_SIZE ({})", share.len(), SHARE_SIZE); + } + + let ns = if is_ods_square(row, col, eds.square_len()) { + Namespace::from_raw(&share[..NS_SIZE])? + } else { + Namespace::PARITY_SHARE + }; + + if prev_ns.map_or(false, |prev_ns| ns < prev_ns) { + bail_validation!("Shares of row {row} are not sorted by their namespace"); + } + + prev_ns = Some(ns); + } + } + + Ok(eds) } - /// Return row with index + /// The raw data of the EDS. + pub fn data_square(&self) -> &[Vec] { + &self.data_square + } + + /// The codec used to encode parity shares. + pub fn codec(&self) -> &str { + self.codec.as_str() + } + + /// Returns the share of the provided coordinates. + pub fn share(&self, row: usize, column: usize) -> Result<&[u8]> { + let index = row * self.square_len + column; + + self.data_square + .get(index) + .map(Vec::as_slice) + .ok_or(Error::EdsIndexOutOfRange(index)) + } + + /// Returns the shares of a row. pub fn row(&self, index: usize) -> Result>> { - Ok(self - .data_square - .get(index * self.square_len..(index + 1) * self.square_len) - .ok_or(Error::EdsIndexOutOfRange(index))? - .to_vec()) + self.axis(AxisType::Row, index) } - /// Return colum with index - pub fn column(&self, mut index: usize) -> Result>> { - let mut r = Vec::with_capacity(self.square_len); - while index < self.data_square.len() { - r.push( - self.data_square - .get(index) - .ok_or(Error::EdsIndexOutOfRange(index))? - .to_vec(), - ); - index += self.square_len; - } - Ok(r) + /// Returns the [`Nmt`] of a row. + pub fn row_nmt(&self, index: usize) -> Result { + self.axis_nmt(AxisType::Row, index) + } + + /// Returns the shares of a column. + pub fn column(&self, index: usize) -> Result>> { + self.axis(AxisType::Col, index) } - /// Return column or row with the provided index + /// Returns the [`Nmt`] of a column. + pub fn column_nmt(&self, index: usize) -> Result { + self.axis_nmt(AxisType::Col, index) + } + + /// Returns the shares of column or row. pub fn axis(&self, axis: AxisType, index: usize) -> Result>> { - match axis { - AxisType::Col => self.column(index), - AxisType::Row => self.row(index), + (0..self.square_len) + .map(|i| { + let (row, col) = match axis { + AxisType::Row => (index, i), + AxisType::Col => (i, index), + }; + + self.share(row, col).map(ToOwned::to_owned) + }) + .collect() + } + + /// Returns the [`Nmt`] of column or row. + pub fn axis_nmt(&self, axis: AxisType, index: usize) -> Result { + let mut tree = Nmt::with_hasher(NamespacedSha2Hasher::with_ignore_max_ns(true)); + + for i in 0..self.square_len { + let (row, col) = match axis { + AxisType::Row => (index, i), + AxisType::Col => (i, index), + }; + + let share = self.share(row, col)?; + + let ns = if is_ods_square(col, row, self.square_len) { + Namespace::from_raw(&share[..NS_SIZE])? + } else { + Namespace::PARITY_SHARE + }; + + tree.push_leaf(share, *ns).map_err(Error::Nmt)?; } + + Ok(tree) } - /// Get EDS square length + /// Get EDS square length. pub fn square_len(&self) -> usize { self.square_len } /// Return all the shares that belong to the provided namespace in the EDS. - /// Results are returned as a list of rows of shares with the inclusion proof + /// Results are returned as a list of rows of shares with the inclusion proof. pub fn get_namespaced_data( &self, namespace: Namespace, @@ -206,29 +294,37 @@ impl ExtendedDataSquare { ) -> Result> { let mut data = Vec::new(); - for i in 0u16..self.square_len as u16 { - let row_root = dah.row_root(i.into()).unwrap(); + for row in 0..self.square_len { + let Some(row_root) = dah.row_root(row) else { + break; + }; + if !row_root.contains::(*namespace) { continue; } let mut shares = Vec::with_capacity(self.square_len); - let mut tree = Nmt::with_hasher(NamespacedSha2Hasher::with_ignore_max_ns(true)); - for (col, s) in self.row(i.into())?.iter().enumerate() { - let ns = if col < self.square_len / 2 { - Namespace::from_raw(&s[..NS_SIZE])? + + for col in 0..self.square_len { + let share = self.share(row, col)?; + + let ns = if is_ods_square(row, col, self.square_len) { + Namespace::from_raw(&share[..NS_SIZE])? } else { Namespace::PARITY_SHARE }; - tree.push_leaf(s, *ns).map_err(Error::Nmt)?; - if ns == namespace { - shares.push(s.clone()); + // Shares in each row of EDS are sorted by namespace, so we + // can stop search the row if we reach to a bigger namespace. + match ns.cmp(&namespace) { + Ordering::Less => {} + Ordering::Equal => shares.push(share.to_owned()), + Ordering::Greater => break, } } - let row = RowId::new(i, height)?; - let proof = tree.get_namespace_proof(*namespace); + let proof = self.row_nmt(row)?.get_namespace_proof(*namespace); + let row = RowId::new(row as u16, height)?; let namespaced_data_id = NamespacedDataId { row, namespace }; data.push(NamespacedData { @@ -265,6 +361,13 @@ impl<'de> Deserialize<'de> for ExtendedDataSquare { } } +/// Returns true if and only if the provided coordinates belongs to Original Data Square +/// (i.e. first quadrant of Extended Data Square). +pub(crate) fn is_ods_square(row: usize, column: usize, square_len: usize) -> bool { + let half_square_len = square_len / 2; + row < half_square_len && column < half_square_len +} + #[cfg(test)] mod tests { use super::*; @@ -290,6 +393,7 @@ mod tests { fn get_namespaced_data() { let eds_json = include_str!("../test_data/shwap_samples/eds.json"); let eds: ExtendedDataSquare = serde_json::from_str(eds_json).unwrap(); + let dah_json = include_str!("../test_data/shwap_samples/dah.json"); let dah: DataAvailabilityHeader = serde_json::from_str(dah_json).unwrap(); @@ -313,4 +417,233 @@ mod tests { row.verify(&dah).unwrap(); } } + + #[test] + fn nmt_roots() { + let eds_json = include_str!("../test_data/shwap_samples/eds.json"); + let eds: ExtendedDataSquare = serde_json::from_str(eds_json).unwrap(); + + let dah_json = include_str!("../test_data/shwap_samples/dah.json"); + let dah: DataAvailabilityHeader = serde_json::from_str(dah_json).unwrap(); + + assert_eq!(dah.row_roots.len(), eds.square_len()); + assert_eq!(dah.column_roots.len(), eds.square_len()); + + for (i, root) in dah.row_roots.iter().enumerate() { + let mut tree = eds.row_nmt(i).unwrap(); + assert_eq!(*root, tree.root()); + + let mut tree = eds.axis_nmt(AxisType::Row, i).unwrap(); + assert_eq!(*root, tree.root()); + } + + for (i, root) in dah.column_roots.iter().enumerate() { + let mut tree = eds.column_nmt(i).unwrap(); + assert_eq!(*root, tree.root()); + + let mut tree = eds.axis_nmt(AxisType::Col, i).unwrap(); + assert_eq!(*root, tree.root()); + } + } + + #[test] + fn ods_square() { + assert!(is_ods_square(0, 0, 4)); + assert!(is_ods_square(0, 1, 4)); + assert!(is_ods_square(1, 0, 4)); + assert!(is_ods_square(1, 1, 4)); + + assert!(!is_ods_square(0, 2, 4)); + assert!(!is_ods_square(0, 3, 4)); + assert!(!is_ods_square(1, 2, 4)); + assert!(!is_ods_square(1, 3, 4)); + + assert!(!is_ods_square(2, 0, 4)); + assert!(!is_ods_square(2, 1, 4)); + assert!(!is_ods_square(3, 0, 4)); + assert!(!is_ods_square(3, 1, 4)); + + assert!(!is_ods_square(2, 2, 4)); + assert!(!is_ods_square(2, 3, 4)); + assert!(!is_ods_square(3, 2, 4)); + assert!(!is_ods_square(3, 3, 4)); + } + + #[test] + fn get_row_and_col() { + let share = |x, y| { + [ + Namespace::new_v0(&[x, y]).unwrap().as_bytes(), + &[0u8; SHARE_SIZE - NS_SIZE][..], + ] + .concat() + }; + + #[rustfmt::skip] + let shares = vec![ + share(0, 0), share(0, 1), share(0, 2), share(0, 3), + share(1, 0), share(1, 1), share(1, 2), share(1, 3), + share(2, 0), share(2, 1), share(2, 2), share(2, 3), + share(3, 0), share(3, 1), share(3, 2), share(3, 3), + ]; + + let eds = ExtendedDataSquare::new(shares, "fake".to_string()).unwrap(); + + assert_eq!( + eds.row(0).unwrap(), + vec![share(0, 0), share(0, 1), share(0, 2), share(0, 3)] + ); + assert_eq!( + eds.row(1).unwrap(), + vec![share(1, 0), share(1, 1), share(1, 2), share(1, 3)] + ); + assert_eq!( + eds.row(2).unwrap(), + vec![share(2, 0), share(2, 1), share(2, 2), share(2, 3)] + ); + assert_eq!( + eds.row(3).unwrap(), + vec![share(3, 0), share(3, 1), share(3, 2), share(3, 3)] + ); + + assert_eq!( + eds.axis(AxisType::Row, 0).unwrap(), + vec![share(0, 0), share(0, 1), share(0, 2), share(0, 3)] + ); + assert_eq!( + eds.axis(AxisType::Row, 1).unwrap(), + vec![share(1, 0), share(1, 1), share(1, 2), share(1, 3)] + ); + assert_eq!( + eds.axis(AxisType::Row, 2).unwrap(), + vec![share(2, 0), share(2, 1), share(2, 2), share(2, 3)] + ); + assert_eq!( + eds.axis(AxisType::Row, 3).unwrap(), + vec![share(3, 0), share(3, 1), share(3, 2), share(3, 3)] + ); + + assert_eq!( + eds.column(0).unwrap(), + vec![share(0, 0), share(1, 0), share(2, 0), share(3, 0)] + ); + assert_eq!( + eds.column(1).unwrap(), + vec![share(0, 1), share(1, 1), share(2, 1), share(3, 1)] + ); + assert_eq!( + eds.column(2).unwrap(), + vec![share(0, 2), share(1, 2), share(2, 2), share(3, 2)] + ); + assert_eq!( + eds.column(3).unwrap(), + vec![share(0, 3), share(1, 3), share(2, 3), share(3, 3)] + ); + + assert_eq!( + eds.axis(AxisType::Col, 0).unwrap(), + vec![share(0, 0), share(1, 0), share(2, 0), share(3, 0)] + ); + assert_eq!( + eds.axis(AxisType::Col, 1).unwrap(), + vec![share(0, 1), share(1, 1), share(2, 1), share(3, 1)] + ); + assert_eq!( + eds.axis(AxisType::Col, 2).unwrap(), + vec![share(0, 2), share(1, 2), share(2, 2), share(3, 2)] + ); + assert_eq!( + eds.axis(AxisType::Col, 3).unwrap(), + vec![share(0, 3), share(1, 3), share(2, 3), share(3, 3)] + ); + } + + #[test] + fn validation() { + ExtendedDataSquare::new(vec![], "fake".to_string()).unwrap_err(); + ExtendedDataSquare::new(vec![vec![]], "fake".to_string()).unwrap_err(); + ExtendedDataSquare::new(vec![vec![]; 4], "fake".to_string()).unwrap_err(); + + ExtendedDataSquare::new(vec![vec![0u8; SHARE_SIZE]; 4], "fake".to_string()).unwrap(); + ExtendedDataSquare::new(vec![vec![0u8; SHARE_SIZE]; 6], "fake".to_string()).unwrap_err(); + ExtendedDataSquare::new(vec![vec![0u8; SHARE_SIZE]; 16], "fake".to_string()).unwrap(); + + let share = |n| { + [ + Namespace::new_v0(&[n]).unwrap().as_bytes(), + &[0u8; SHARE_SIZE - NS_SIZE][..], + ] + .concat() + }; + + // Parity share can be anything + let parity_share = vec![0u8; SHARE_SIZE]; + + ExtendedDataSquare::new( + vec![ + // row 0 + share(0), // ODS + parity_share.clone(), + // row 1 + parity_share.clone(), + parity_share.clone(), + ], + "fake".to_string(), + ) + .unwrap(); + + ExtendedDataSquare::new( + vec![ + // row 0 + share(1), // ODS + share(2), // ODS + parity_share.clone(), + parity_share.clone(), + // row 1 + share(1), // ODS + share(1), // ODS + parity_share.clone(), + parity_share.clone(), + // row 2 + parity_share.clone(), + parity_share.clone(), + parity_share.clone(), + parity_share.clone(), + // row 3 + parity_share.clone(), + parity_share.clone(), + parity_share.clone(), + parity_share.clone(), + ], + "fake".to_string(), + ) + .unwrap(); + + ExtendedDataSquare::new( + vec![ + // row 0 + share(1), // ODS + share(2), // ODS + parity_share.clone(), + parity_share.clone(), + // row 1 + share(1), // ODS + share(0), // ODS - This procudes the error because it has smaller namespace + parity_share.clone(), + parity_share.clone(), + // row 2 + parity_share.clone(), + parity_share.clone(), + parity_share.clone(), + parity_share.clone(), + // row 3 + parity_share.clone(), + parity_share.clone(), + parity_share.clone(), + parity_share.clone(), + ], + "fake".to_string(), + ) + .unwrap_err(); + } } diff --git a/types/src/sample.rs b/types/src/sample.rs index 05d701f8..50ece608 100644 --- a/types/src/sample.rs +++ b/types/src/sample.rs @@ -16,12 +16,11 @@ use celestia_tendermint_proto::Protobuf; use cid::CidGeneric; use multihash::Multihash; use nmt_rs::nmt_proof::NamespaceProof as NmtNamespaceProof; -use nmt_rs::NamespaceMerkleHasher; use serde::{Deserialize, Serialize}; -use crate::nmt::{Namespace, NamespaceProof, NamespacedSha2Hasher, Nmt, NS_SIZE}; +use crate::nmt::{Namespace, NamespaceProof, NS_SIZE}; use crate::row::RowId; -use crate::rsmt2d::{AxisType, ExtendedDataSquare}; +use crate::rsmt2d::{is_ods_square, AxisType, ExtendedDataSquare}; use crate::{DataAvailabilityHeader, Error, Result}; /// The size of the [`SampleId`] hash in `multihash`. @@ -45,7 +44,7 @@ pub struct SampleId { } /// Represents Sample, with proof of its inclusion and location on EDS -#[derive(Serialize, Deserialize, Clone)] +#[derive(Debug, Serialize, Deserialize, Clone)] #[serde(try_from = "RawSample", into = "RawSample")] pub struct Sample { /// Location of the sample in the EDS and associated block height @@ -111,20 +110,14 @@ impl Sample { AxisType::Col => (index % square_len, index / square_len), }; - let mut tree = Nmt::with_hasher(NamespacedSha2Hasher::with_ignore_max_ns(true)); - - let shares = eds.axis(axis_type, axis_index)?; - let (data_shares, parity_shares) = shares.split_at(square_len / 2); + let (row_index, column_index) = match axis_type { + AxisType::Row => (axis_index, sample_index), + AxisType::Col => (sample_index, axis_index), + }; - for s in data_shares { - let ns = Namespace::from_raw(&s[..NS_SIZE])?; - tree.push_leaf(s, *ns).map_err(Error::Nmt)?; - } + let share = eds.share(row_index, column_index)?.to_owned(); - for s in parity_shares { - tree.push_leaf(s, *Namespace::PARITY_SHARE) - .map_err(Error::Nmt)?; - } + let mut tree = eds.axis_nmt(axis_type, axis_index)?; let proof = NmtNamespaceProof::PresenceProof { proof: tree.build_range_proof(sample_index..sample_index + 1), @@ -136,7 +129,7 @@ impl Sample { Ok(Sample { sample_id, sample_proof_type: axis_type, - share: shares[sample_index].clone(), + share, proof: proof.into(), }) } @@ -153,7 +146,11 @@ impl Sample { .root(self.sample_proof_type, index) .ok_or(Error::EdsIndexOutOfRange(index))?; - let ns = if self.is_ods_sample(dah.square_len()) { + let ns = if is_ods_square( + self.sample_id.row.index.into(), + self.sample_id.index.into(), + dah.square_len(), + ) { Namespace::from_raw(&self.share[..NS_SIZE])? } else { Namespace::PARITY_SHARE @@ -163,17 +160,6 @@ impl Sample { .verify_range(&root, &[&self.share], *ns) .map_err(Error::RangeProofError) } - - /// Returns true if and only if provided sample belongs to Original Data Square, first - /// quadrant of Extended Data Square - fn is_ods_sample(&self, square_len: usize) -> bool { - let row_index = usize::from(self.sample_id.row.index); - let column_index = usize::from(self.sample_id.index); - - let half_square_len = square_len / 2; - - row_index < half_square_len && column_index < half_square_len - } } impl Protobuf for Sample {} @@ -340,7 +326,6 @@ impl From for CidGeneric { #[cfg(test)] mod tests { use super::*; - use crate::nmt::Namespace; #[test] fn round_trip() { diff --git a/types/src/test_utils.rs b/types/src/test_utils.rs index 91490f99..87558cd7 100644 --- a/types/src/test_utils.rs +++ b/types/src/test_utils.rs @@ -55,18 +55,47 @@ impl ExtendedHeaderGenerator { gen.current_header = if prev_height == 0 { None } else { - Some(generate_new(prev_height, &gen.chain_id, &gen.key)) + Some(generate_new(prev_height, &gen.chain_id, &gen.key, None)) }; gen } /// Generates the next header. + /// + /// ``` + /// use celestia_types::test_utils::ExtendedHeaderGenerator; + /// + /// let mut gen = ExtendedHeaderGenerator::new(); + /// let header1 = gen.next(); + /// ``` #[allow(clippy::should_implement_trait)] pub fn next(&mut self) -> ExtendedHeader { let header = match self.current_header { - Some(ref header) => generate_next(1, header, &self.key), - None => generate_new(GENESIS_HEIGHT, &self.chain_id, &self.key), + Some(ref header) => generate_next(1, header, &self.key, None), + None => generate_new(GENESIS_HEIGHT, &self.chain_id, &self.key, None), + }; + + self.current_header = Some(header.clone()); + header + } + + /// Generates the next header with the given [`DataAvailabilityHeader`] + /// + /// ```no_run + /// use celestia_types::test_utils::ExtendedHeaderGenerator; + /// # fn generate_dah() -> celestia_types::DataAvailabilityHeader { + /// # unimplemented!(); + /// # } + /// + /// let mut gen = ExtendedHeaderGenerator::new(); + /// let header1 = gen.next_with_dah(generate_dah()); + /// ``` + #[allow(clippy::should_implement_trait)] + pub fn next_with_dah(&mut self, dah: DataAvailabilityHeader) -> ExtendedHeader { + let header = match self.current_header { + Some(ref header) => generate_next(1, header, &self.key, Some(dah)), + None => generate_new(GENESIS_HEIGHT, &self.chain_id, &self.key, Some(dah)), }; self.current_header = Some(header.clone()); @@ -101,7 +130,34 @@ impl ExtendedHeaderGenerator { /// /// This method does not change the state of `ExtendedHeaderGenerator`. pub fn next_of(&self, header: &ExtendedHeader) -> ExtendedHeader { - generate_next(1, header, &self.key) + generate_next(1, header, &self.key, None) + } + + /// Generates the next header of the provided header with the given [`DataAvailabilityHeader`]. + /// + /// This can be used to create two headers of same height but different hash. + /// + /// ```no_run + /// use celestia_types::test_utils::ExtendedHeaderGenerator; + /// # fn generate_dah() -> celestia_types::DataAvailabilityHeader { + /// # unimplemented!(); + /// # } + /// + /// let mut gen = ExtendedHeaderGenerator::new(); + /// let header1 = gen.next(); + /// let header2 = gen.next(); + /// let another_header2 = gen.next_of_with_dah(&header1, generate_dah()); + /// ``` + /// + /// # Note + /// + /// This method does not change the state of `ExtendedHeaderGenerator`. + pub fn next_of_with_dah( + &self, + header: &ExtendedHeader, + dah: DataAvailabilityHeader, + ) -> ExtendedHeader { + generate_next(1, header, &self.key, Some(dah)) } /// Generates the next amount of headers of the provided header. @@ -167,8 +223,8 @@ impl ExtendedHeaderGenerator { } let header = match self.current_header { - Some(ref header) => generate_next(amount, header, &self.key), - None => generate_new(amount, &self.chain_id, &self.key), + Some(ref header) => generate_next(amount, header, &self.key, None), + None => generate_new(amount, &self.chain_id, &self.key, None), }; self.current_header = Some(header.clone()); @@ -257,7 +313,12 @@ pub fn unverify(header: &mut ExtendedHeader) { } } -fn generate_new(height: u64, chain_id: &chain::Id, signing_key: &SigningKey) -> ExtendedHeader { +fn generate_new( + height: u64, + chain_id: &chain::Id, + signing_key: &SigningKey, + dah: Option, +) -> ExtendedHeader { assert!(height >= GENESIS_HEIGHT); let pub_key_bytes = signing_key.verification_key().to_bytes(); @@ -328,10 +389,10 @@ fn generate_new(height: u64, chain_id: &chain::Id, signing_key: &SigningKey) -> proposer_priority: 0_i64.into(), }), ), - dah: DataAvailabilityHeader { + dah: dah.unwrap_or_else(|| DataAvailabilityHeader { row_roots: vec![NamespacedHash::empty_root(), NamespacedHash::empty_root()], column_roots: vec![NamespacedHash::empty_root(), NamespacedHash::empty_root()], - }, + }), }; hash_and_sign(&mut header, signing_key); @@ -344,6 +405,7 @@ fn generate_next( increment: u64, current: &ExtendedHeader, signing_key: &SigningKey, + dah: Option, ) -> ExtendedHeader { assert!(increment > 0); @@ -399,10 +461,10 @@ fn generate_next( }], }, validator_set: current.validator_set.clone(), - dah: DataAvailabilityHeader { + dah: dah.unwrap_or_else(|| DataAvailabilityHeader { row_roots: vec![NamespacedHash::empty_root(), NamespacedHash::empty_root()], column_roots: vec![NamespacedHash::empty_root(), NamespacedHash::empty_root()], - }, + }), }; hash_and_sign(&mut header, signing_key);