From 9410c67f0bee9fc5199e908a7869cf2b45603823 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Zwoli=C5=84ski?= Date: Thu, 31 Oct 2024 15:55:05 +0100 Subject: [PATCH] feat!(node): add a method to get all blobs using shwap (#452) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Maciej Zwoliński Co-authored-by: Yiannis Marangos Co-authored-by: Mikołaj Florkiewicz --- ci/Dockerfile.bridge | 8 +- ci/Dockerfile.validator | 2 +- ci/run-bridge.sh | 15 +- node/src/daser.rs | 5 +- node/src/node.rs | 28 +++- node/src/p2p.rs | 66 ++++++-- node/tests/header_ex.rs | 6 +- node/tests/node.rs | 2 +- node/tests/shwap.rs | 277 ++++++++++++++++++++++++++++++++ node/tests/utils/mod.rs | 27 +++- types/src/error.rs | 4 + types/src/extended_header.rs | 21 ++- types/src/row.rs | 5 +- types/src/row_namespace_data.rs | 19 ++- 14 files changed, 435 insertions(+), 50 deletions(-) create mode 100644 node/tests/shwap.rs diff --git a/ci/Dockerfile.bridge b/ci/Dockerfile.bridge index 9996c179..48bed6f2 100644 --- a/ci/Dockerfile.bridge +++ b/ci/Dockerfile.bridge @@ -3,13 +3,11 @@ # https://github.com/celestiaorg/celestia-node/blob/main/Dockerfile FROM docker.io/alpine:3.19.1 -ENV CELESTIA_HOME=/root - -RUN apk update && apk add --no-cache bash jq +RUN apk update && apk add --no-cache bash jq dasel # Copy in the binary -COPY --from=ghcr.io/celestiaorg/celestia-node:v0.16.0 /bin/celestia /bin/celestia -COPY --from=ghcr.io/celestiaorg/celestia-node:v0.16.0 /bin/cel-key /bin/cel-key +COPY --from=ghcr.io/celestiaorg/celestia-node:v0.18.3-mocha /bin/celestia /bin/celestia +COPY --from=ghcr.io/celestiaorg/celestia-node:v0.18.3-mocha /bin/cel-key /bin/cel-key COPY ./run-bridge.sh /opt/entrypoint.sh diff --git a/ci/Dockerfile.validator b/ci/Dockerfile.validator index 147c008b..94a4b87c 100644 --- a/ci/Dockerfile.validator +++ b/ci/Dockerfile.validator @@ -8,7 +8,7 @@ ENV CELESTIA_HOME=/root RUN apk update && apk add --no-cache bash jq # Copy in the binary -COPY --from=ghcr.io/celestiaorg/celestia-app:v2.1.2 /bin/celestia-appd /bin/celestia-appd +COPY --from=ghcr.io/celestiaorg/celestia-app:v2.3.0 /bin/celestia-appd /bin/celestia-appd COPY ./run-validator.sh /opt/entrypoint.sh diff --git a/ci/run-bridge.sh b/ci/run-bridge.sh index 85fcd15d..d14b3fb5 100755 --- a/ci/run-bridge.sh +++ b/ci/run-bridge.sh @@ -9,7 +9,7 @@ NODE_NAME="bridge-$NODE_ID" # a private local network P2P_NETWORK="private" # a bridge node configuration directory -CONFIG_DIR="$CELESTIA_HOME/.celestia-bridge-$P2P_NETWORK" +CONFIG_DIR="$HOME/.celestia-bridge-$P2P_NETWORK" # directory and the files shared with the validator node CREDENTIALS_DIR="/credentials" # node credentials @@ -47,6 +47,15 @@ add_trusted_genesis() { sed -i'.bak' "s/TrustedHash = .*/TrustedHash = $genesis_hash/" "$CONFIG_DIR/config.toml" } +whitelist_localhost_nodes() { + # to get the list of ips: + # cargo run -- node -n private -l 0.0.0.0 + # docker compose -f ci/docker-compose.yml exec bridge-0 celestia p2p peer-info $lumina_peerid + dasel put -f "$CONFIG_DIR/config.toml" \ + -t json -v '["172.18.0.1/24", "172.17.0.1/24", "192.168.0.0/16"]' \ + 'P2P.IPColocationWhitelist' +} + write_jwt_token() { echo "Saving jwt token to $NODE_JWT_FILE" celestia bridge auth admin --p2p.network "$P2P_NETWORK" > "$NODE_JWT_FILE" @@ -55,6 +64,8 @@ write_jwt_token() { main() { # Initialize the bridge node celestia bridge init --p2p.network "$P2P_NETWORK" + # don't allow banning nodes we create in tests by pubsub ip counting + whitelist_localhost_nodes # Wait for a validator wait_for_provision # Import the key with the coins @@ -68,7 +79,7 @@ main() { # Start the bridge node echo "Configuration finished. Running a bridge node..." celestia bridge start \ - --rpc.skip-auth=$SKIP_AUTH \ + --rpc.skip-auth="$SKIP_AUTH" \ --rpc.addr 0.0.0.0 \ --core.ip validator \ --keyring.keyname "$NODE_NAME" \ diff --git a/node/src/daser.rs b/node/src/daser.rs index 96c2595f..201cdfed 100644 --- a/node/src/daser.rs +++ b/node/src/daser.rs @@ -54,6 +54,7 @@ const MAX_SAMPLES_NEEDED: usize = 16; const HOUR: u64 = 60 * 60; const DAY: u64 = 24 * HOUR; const DEFAULT_SAMPLING_WINDOW: Duration = Duration::from_secs(30 * DAY); +const GET_SAMPLE_TIMEOUT: Duration = Duration::from_secs(10); type Result = std::result::Result; @@ -364,7 +365,9 @@ where let p2p = p2p.clone(); async move { - let res = p2p.get_sample(row, col, height).await; + let res = p2p + .get_sample(row, col, height, Some(GET_SAMPLE_TIMEOUT)) + .await; (row, col, res) } }) diff --git a/node/src/node.rs b/node/src/node.rs index e4b7b1ff..e27e6f4d 100644 --- a/node/src/node.rs +++ b/node/src/node.rs @@ -13,7 +13,7 @@ use celestia_types::nmt::Namespace; use celestia_types::row::Row; use celestia_types::row_namespace_data::RowNamespaceData; use celestia_types::sample::Sample; -use celestia_types::ExtendedHeader; +use celestia_types::{Blob, ExtendedHeader}; use libp2p::identity::Keypair; use libp2p::swarm::NetworkInfo; use libp2p::{Multiaddr, PeerId}; @@ -338,8 +338,13 @@ where /// /// On failure to receive a verified [`Row`] within a certain time, the /// `NodeError::P2p(P2pError::BitswapQueryTimeout)` error will be returned. - pub async fn request_row(&self, row_index: u16, block_height: u64) -> Result { - Ok(self.p2p().get_row(row_index, block_height).await?) + pub async fn request_row( + &self, + row_index: u16, + block_height: u64, + timeout: Option, + ) -> Result { + Ok(self.p2p().get_row(row_index, block_height, timeout).await?) } /// Request a verified [`Sample`] from the network. @@ -353,10 +358,11 @@ where row_index: u16, column_index: u16, block_height: u64, + timeout: Option, ) -> Result { Ok(self .p2p() - .get_sample(row_index, column_index, block_height) + .get_sample(row_index, column_index, block_height, timeout) .await?) } @@ -371,13 +377,25 @@ where namespace: Namespace, row_index: u16, block_height: u64, + timeout: Option, ) -> Result { Ok(self .p2p() - .get_row_namespace_data(namespace, row_index, block_height) + .get_row_namespace_data(namespace, row_index, block_height, timeout) .await?) } + /// Request all blobs with provided namespace in the block corresponding to this header + /// using bitswap protocol. + pub async fn request_all_blobs( + &self, + header: &ExtendedHeader, + namespace: Namespace, + timeout: Option, + ) -> Result> { + Ok(self.p2p().get_all_blobs(header, namespace, timeout).await?) + } + /// Get current header syncing info. pub async fn syncer_info(&self) -> Result { Ok(self.syncer().info().await?) diff --git a/node/src/p2p.rs b/node/src/p2p.rs index 2cd9e9fb..c01395e2 100644 --- a/node/src/p2p.rs +++ b/node/src/p2p.rs @@ -22,14 +22,15 @@ use std::time::Duration; use blockstore::Blockstore; use celestia_proto::p2p::pb::{header_request, HeaderRequest}; use celestia_tendermint_proto::Protobuf; -use celestia_types::nmt::Namespace; +use celestia_types::nmt::{Namespace, NamespacedSha2Hasher}; use celestia_types::row::{Row, RowId}; use celestia_types::row_namespace_data::{RowNamespaceData, RowNamespaceDataId}; use celestia_types::sample::{Sample, SampleId}; use celestia_types::{fraud_proof::BadEncodingFraudProof, hash::Hash}; -use celestia_types::{ExtendedHeader, FraudProof}; +use celestia_types::{Blob, ExtendedHeader, FraudProof}; use cid::Cid; -use futures::StreamExt; +use futures::stream::FuturesOrdered; +use futures::{StreamExt, TryStreamExt}; use libp2p::core::transport::ListenerId; use libp2p::{ autonat, @@ -83,8 +84,6 @@ const MIN_CONNECTED_PEERS: u64 = 4; // Maximum size of a [`Multihash`]. pub(crate) const MAX_MH_SIZE: usize = 64; -pub(crate) const GET_SAMPLE_TIMEOUT: Duration = Duration::from_secs(10); - // all fraud proofs for height bigger than head height by this threshold // will be ignored const FRAUD_PROOF_HEAD_HEIGHT_THRESHOLD: u64 = 20; @@ -145,6 +144,10 @@ pub enum P2pError { /// Shwap protocol error. #[error("Shwap: {0}")] Shwap(String), + + /// An error propagated from [`celestia_types`]. + #[error(transparent)] + CelestiaTypes(#[from] celestia_types::Error), } impl P2pError { @@ -165,7 +168,8 @@ impl P2pError { | P2pError::ProtoDecodeFailed(_) | P2pError::Cid(_) | P2pError::BitswapQueryTimeout - | P2pError::Shwap(_) => false, + | P2pError::Shwap(_) + | P2pError::CelestiaTypes(_) => false, } } } @@ -514,31 +518,32 @@ impl P2p { } /// Request a [`Row`] on bitswap protocol. - pub async fn get_row(&self, row_index: u16, block_height: u64) -> Result { + pub async fn get_row( + &self, + row_index: u16, + block_height: u64, + timeout: Option, + ) -> Result { let id = RowId::new(row_index, block_height).map_err(P2pError::Cid)?; let cid = convert_cid(&id.into())?; - // TODO: add timeout - let data = self.get_shwap_cid(cid, None).await?; + let data = self.get_shwap_cid(cid, timeout).await?; let row = Row::decode(id, &data[..]).map_err(|e| P2pError::Shwap(e.to_string()))?; Ok(row) } /// Request a [`Sample`] on bitswap protocol. - /// - /// This method awaits for a verified `Sample` until timeout of 10 second - /// is reached. On timeout it is safe to assume that sampling of the block - /// failed. pub async fn get_sample( &self, row_index: u16, column_index: u16, block_height: u64, + timeout: Option, ) -> Result { let id = SampleId::new(row_index, column_index, block_height).map_err(P2pError::Cid)?; let cid = convert_cid(&id.into())?; - let data = self.get_shwap_cid(cid, Some(GET_SAMPLE_TIMEOUT)).await?; + let data = self.get_shwap_cid(cid, timeout).await?; let sample = Sample::decode(id, &data[..]).map_err(|e| P2pError::Shwap(e.to_string()))?; Ok(sample) } @@ -549,18 +554,47 @@ impl P2p { namespace: Namespace, row_index: u16, block_height: u64, + timeout: Option, ) -> Result { let id = RowNamespaceDataId::new(namespace, row_index, block_height).map_err(P2pError::Cid)?; let cid = convert_cid(&id.into())?; - // TODO: add timeout - let data = self.get_shwap_cid(cid, None).await?; + let data = self.get_shwap_cid(cid, timeout).await?; let row_namespace_data = RowNamespaceData::decode(id, &data[..]).map_err(|e| P2pError::Shwap(e.to_string()))?; Ok(row_namespace_data) } + /// Request all blobs with provided namespace in the block corresponding to this header + /// using bitswap protocol. + pub async fn get_all_blobs( + &self, + header: &ExtendedHeader, + namespace: Namespace, + timeout: Option, + ) -> Result> { + let height = header.height().value(); + let app_version = header.app_version()?; + let rows_to_fetch: Vec<_> = header + .dah + .row_roots() + .iter() + .enumerate() + .filter(|(_, row)| row.contains::(*namespace)) + .map(|(n, _)| n as u16) + .collect(); + + let futs = rows_to_fetch + .into_iter() + .map(|row_idx| self.get_row_namespace_data(namespace, row_idx, height, timeout)) + .collect::>(); + let rows: Vec<_> = futs.try_collect().await?; + let shares = rows.iter().flat_map(|row| row.shares.iter()); + + Ok(Blob::reconstruct_all(shares, app_version)?) + } + /// Get the addresses where [`P2p`] listens on for incoming connections. pub async fn listeners(&self) -> Result> { let (tx, rx) = oneshot::channel(); diff --git a/node/tests/header_ex.rs b/node/tests/header_ex.rs index 1cc8a074..4f7f71ef 100644 --- a/node/tests/header_ex.rs +++ b/node/tests/header_ex.rs @@ -16,7 +16,7 @@ mod utils; #[tokio::test] async fn request_single_header() { - let node = new_connected_node().await; + let (node, _) = new_connected_node().await; let header = node.request_header_by_height(1).await.unwrap(); let header_by_hash = node.request_header_by_hash(&header.hash()).await.unwrap(); @@ -26,7 +26,7 @@ async fn request_single_header() { #[tokio::test] async fn request_verified_headers() { - let node = new_connected_node().await; + let (node, _) = new_connected_node().await; let from = node.request_header_by_height(1).await.unwrap(); let verified_headers = node.request_verified_headers(&from, 2).await.unwrap(); @@ -41,7 +41,7 @@ async fn request_verified_headers() { #[tokio::test] async fn request_head() { - let node = new_connected_node().await; + let (node, _) = new_connected_node().await; let genesis = node.request_header_by_height(1).await.unwrap(); diff --git a/node/tests/node.rs b/node/tests/node.rs index b8323316..0be30aae 100644 --- a/node/tests/node.rs +++ b/node/tests/node.rs @@ -26,7 +26,7 @@ mod utils; #[tokio::test] async fn connects_to_the_go_bridge_node() { - let node = new_connected_node().await; + let (node, _) = new_connected_node().await; let info = node.network_info().await.unwrap(); assert_eq!(info.num_peers(), 1); diff --git a/node/tests/shwap.rs b/node/tests/shwap.rs new file mode 100644 index 00000000..177da24e --- /dev/null +++ b/node/tests/shwap.rs @@ -0,0 +1,277 @@ +#![cfg(not(target_arch = "wasm32"))] + +use std::{collections::HashSet, time::Duration}; + +use celestia_rpc::ShareClient; +use celestia_types::{ + nmt::{Namespace, NamespacedSha2Hasher}, + AppVersion, Blob, +}; +use lumina_node::{events::NodeEvent, node::P2pError, NodeError}; +use rand::RngCore; +use tokio::time::timeout; + +use crate::utils::{blob_submit, bridge_client, new_connected_node}; + +mod utils; + +#[tokio::test] +async fn shwap_sampling_forward() { + let (node, _) = new_connected_node().await; + + // create new events sub to ignore all previous events + let mut events = node.event_subscriber(); + + for _ in 0..5 { + // wait for new block + let get_new_head = async { + loop { + let ev = events.recv().await.unwrap(); + let NodeEvent::AddedHeaderFromHeaderSub { height, .. } = ev.event else { + continue; + }; + break height; + } + }; + let new_head = timeout(Duration::from_secs(2), get_new_head).await.unwrap(); + + // wait for height to be sampled + let wait_height_sampled = async { + loop { + let ev = events.recv().await.unwrap(); + let NodeEvent::SamplingFinished { + height, accepted, .. + } = ev.event + else { + continue; + }; + + if height == new_head { + assert!(accepted); + break; + } + } + }; + timeout(Duration::from_secs(1), wait_height_sampled) + .await + .unwrap(); + } +} + +#[tokio::test] +async fn shwap_sampling_backward() { + let (node, mut events) = new_connected_node().await; + + let current_head = node.get_local_head_header().await.unwrap().height().value(); + + // wait for some past headers to be synchronized + let new_batch_synced = async { + loop { + let ev = events.recv().await.unwrap(); + let NodeEvent::FetchingHeadersFinished { + from_height, + to_height, + .. + } = ev.event + else { + continue; + }; + if to_height < current_head { + break (from_height, to_height); + } + } + }; + let (from_height, to_height) = timeout(Duration::from_secs(4), new_batch_synced) + .await + .unwrap(); + + // take just first N headers because batch size can be big + let mut headers_to_sample: HashSet<_> = (from_height..to_height).rev().take(50).collect(); + + // wait for all heights to be sampled + timeout(Duration::from_secs(10), async { + loop { + let ev = events.recv().await.unwrap(); + let NodeEvent::SamplingFinished { + height, accepted, .. + } = ev.event + else { + continue; + }; + + assert!(accepted); + headers_to_sample.remove(&height); + + if headers_to_sample.is_empty() { + break; + } + } + }) + .await + .unwrap(); +} + +#[tokio::test] +async fn shwap_request_sample() { + let (node, _) = new_connected_node().await; + let client = bridge_client().await; + + let ns = Namespace::const_v0(rand::random()); + let blob_len = rand::random::() % 4096 + 1; + let blob = Blob::new(ns, random_bytes(blob_len), AppVersion::V2).unwrap(); + + let height = blob_submit(&client, &[blob]).await; + let header = node.get_header_by_height(height).await.unwrap(); + let square_width = header.dah.square_width(); + + // check existing sample + let expected = client.share_get_share(&header, 0, 0).await.unwrap(); + let sample = node + .request_sample(0, 0, height, Some(Duration::from_millis(500))) + .await + .unwrap(); + assert_eq!(expected, sample.share); + + // check nonexisting sample + let err = node + .request_sample( + square_width + 1, + square_width + 1, + height, + Some(Duration::from_millis(500)), + ) + .await + .unwrap_err(); + assert!(matches!(err, NodeError::P2p(P2pError::BitswapQueryTimeout))); +} + +#[tokio::test] +async fn shwap_request_row() { + let (node, _) = new_connected_node().await; + let client = bridge_client().await; + + let ns = Namespace::const_v0(rand::random()); + let blob_len = rand::random::() % 4096 + 1; + let blob = Blob::new(ns, random_bytes(blob_len), AppVersion::V2).unwrap(); + + let height = blob_submit(&client, &[blob]).await; + let header = node.get_header_by_height(height).await.unwrap(); + let eds = client.share_get_eds(&header).await.unwrap(); + let square_width = header.dah.square_width(); + + // check existing row + let row = node + .request_row(0, height, Some(Duration::from_secs(1))) + .await + .unwrap(); + assert_eq!(eds.row(0).unwrap(), row.shares); + + // check nonexisting row + let err = node + .request_row(square_width + 1, height, Some(Duration::from_secs(1))) + .await + .unwrap_err(); + assert!(matches!(err, NodeError::P2p(P2pError::BitswapQueryTimeout))); +} + +#[tokio::test] +async fn shwap_request_row_namespace_data() { + let (node, _) = new_connected_node().await; + let client = bridge_client().await; + + let ns = Namespace::const_v0(rand::random()); + let blob_len = rand::random::() % 4096 + 1; + let blob = Blob::new(ns, random_bytes(blob_len), AppVersion::V2).unwrap(); + + let height = blob_submit(&client, &[blob]).await; + let header = node.get_header_by_height(height).await.unwrap(); + let eds = client.share_get_eds(&header).await.unwrap(); + let square_width = header.dah.square_width(); + + // check existing row namespace data + let rows_with_ns: Vec<_> = header + .dah + .row_roots() + .iter() + .enumerate() + .filter_map(|(n, hash)| { + hash.contains::(*ns) + .then_some(n as u16) + }) + .collect(); + let eds_ns_data = eds.get_namespace_data(ns, &header.dah, height).unwrap(); + + for (n, &row) in rows_with_ns.iter().enumerate() { + let row_ns_data = node + .request_row_namespace_data(ns, row, height, Some(Duration::from_secs(1))) + .await + .unwrap(); + assert_eq!(eds_ns_data[n].1, row_ns_data); + } + + // check nonexisting row row namespace data + let err = node + .request_row_namespace_data(ns, square_width + 1, height, Some(Duration::from_secs(1))) + .await + .unwrap_err(); + assert!(matches!(err, NodeError::P2p(P2pError::BitswapQueryTimeout))); + + // check nonexisting namespace row namespace data + // for namespace that row actually contains + // PFB (0x04) < 0x05 < Primary ns padding (0x255) + let unknown_ns = Namespace::const_v0([0, 0, 0, 0, 0, 0, 0, 0, 0, 5]); + let row = node + .request_row_namespace_data(unknown_ns, 0, height, Some(Duration::from_secs(1))) + .await + .unwrap(); + assert!(row.shares.is_empty()); + + // check nonexisting namespace row namespace data + // for namespace that row doesn't contain + let unknown_ns = Namespace::TAIL_PADDING; + let err = node + .request_row_namespace_data(unknown_ns, 0, height, Some(Duration::from_secs(1))) + .await + .unwrap_err(); + assert!(matches!(err, NodeError::P2p(P2pError::BitswapQueryTimeout))); +} + +#[tokio::test] +async fn shwap_request_all_blobs() { + let (node, _) = new_connected_node().await; + let client = bridge_client().await; + + let ns = Namespace::const_v0(rand::random()); + let blobs: Vec<_> = (0..5) + .map(|_| { + let blob_len = rand::random::() % 4096 + 1; + Blob::new(ns, random_bytes(blob_len), AppVersion::V2).unwrap() + }) + .collect(); + + let height = blob_submit(&client, &blobs).await; + let header = node.get_header_by_height(height).await.unwrap(); + + // check existing namespace + let received = node + .request_all_blobs(&header, ns, Some(Duration::from_secs(2))) + .await + .unwrap(); + + assert_eq!(blobs, received); + + // check nonexisting namespace + let ns = Namespace::const_v0(rand::random()); + let received = node + .request_all_blobs(&header, ns, Some(Duration::from_secs(2))) + .await + .unwrap(); + + assert!(received.is_empty()); +} + +fn random_bytes(len: usize) -> Vec { + let mut bytes = vec![0u8; len]; + rand::thread_rng().fill_bytes(&mut bytes); + bytes +} diff --git a/node/tests/utils/mod.rs b/node/tests/utils/mod.rs index 01da4299..8929e42f 100644 --- a/node/tests/utils/mod.rs +++ b/node/tests/utils/mod.rs @@ -1,23 +1,31 @@ #![allow(dead_code)] use std::env; +use std::sync::OnceLock; use std::time::Duration; use celestia_rpc::{prelude::*, Client}; +use celestia_types::{Blob, TxConfig}; use libp2p::{multiaddr::Protocol, Multiaddr, PeerId}; use lumina_node::blockstore::InMemoryBlockstore; +use lumina_node::events::EventSubscriber; use lumina_node::node::NodeConfig; use lumina_node::test_utils::test_node_config; use lumina_node::{node::Node, store::InMemoryStore}; +use tokio::sync::Mutex; use tokio::time::sleep; const WS_URL: &str = "ws://localhost:26658"; -pub async fn fetch_bridge_info() -> (PeerId, Multiaddr) { +pub async fn bridge_client() -> Client { let _ = dotenvy::dotenv(); let auth_token = env::var("CELESTIA_NODE_AUTH_TOKEN_ADMIN").unwrap(); - let client = Client::new(WS_URL, Some(&auth_token)).await.unwrap(); + Client::new(WS_URL, Some(&auth_token)).await.unwrap() +} + +pub async fn fetch_bridge_info() -> (PeerId, Multiaddr) { + let client = bridge_client().await; let bridge_info = client.p2p_info().await.unwrap(); let mut ma = bridge_info @@ -33,10 +41,10 @@ pub async fn fetch_bridge_info() -> (PeerId, Multiaddr) { (bridge_info.id.into(), ma) } -pub async fn new_connected_node() -> Node { +pub async fn new_connected_node() -> (Node, EventSubscriber) { let (_, bridge_ma) = fetch_bridge_info().await; - let node = Node::new(NodeConfig { + let (node, events) = Node::new_subscribed(NodeConfig { p2p_bootnodes: vec![bridge_ma], ..test_node_config() }) @@ -56,5 +64,14 @@ pub async fn new_connected_node() -> Node { sleep(Duration::from_secs(1)).await; } - node + (node, events) +} + +pub async fn blob_submit(client: &Client, blobs: &[Blob]) -> u64 { + static LOCK: OnceLock> = OnceLock::new(); + let _guard = LOCK.get_or_init(|| Mutex::new(())).lock().await; + client + .blob_submit(blobs, TxConfig::default()) + .await + .unwrap() } diff --git a/types/src/error.rs b/types/src/error.rs index b85513d4..86ad0aa4 100644 --- a/types/src/error.rs +++ b/types/src/error.rs @@ -14,6 +14,10 @@ pub enum Error { #[error("Unsupported namespace version: {0}")] UnsupportedNamespaceVersion(u8), + /// Unsupported app version. + #[error("Unsupported app version: {0}")] + UnsupportedAppVersion(u64), + /// Invalid namespace size. #[error("Invalid namespace size")] InvalidNamespaceSize, diff --git a/types/src/extended_header.rs b/types/src/extended_header.rs index ab300c4e..3211cfb3 100644 --- a/types/src/extended_header.rs +++ b/types/src/extended_header.rs @@ -17,8 +17,8 @@ use crate::consts::appconsts::AppVersion; use crate::trust_level::DEFAULT_TRUST_LEVEL; use crate::validator_set::ValidatorSetExt; use crate::{ - bail_validation, bail_verification, validation_error, DataAvailabilityHeader, Error, Result, - ValidateBasic, ValidateBasicWithAppVersion, + bail_validation, bail_verification, DataAvailabilityHeader, Error, Result, ValidateBasic, + ValidateBasicWithAppVersion, }; /// Information about a tendermint validator. @@ -86,6 +86,17 @@ impl ExtendedHeader { Ok(header) } + /// Get the app version. + /// + /// # Errors + /// + /// This function returns an error if the app version set in header + /// is not currently supported. + pub fn app_version(&self) -> Result { + let app_version = self.header.version.app; + AppVersion::from_u64(app_version).ok_or(Error::UnsupportedAppVersion(app_version)) + } + /// Get the block chain id. pub fn chain_id(&self) -> &Id { &self.header.chain_id @@ -184,11 +195,7 @@ impl ExtendedHeader { &self.commit, )?; - let app_version = self.header.version.app; - let app_version = AppVersion::from_u64(app_version).ok_or_else(|| { - validation_error!("Invalid or unsupported AppVersion in header: {app_version}") - })?; - + let app_version = self.app_version()?; self.dah.validate_basic(app_version)?; Ok(()) diff --git a/types/src/row.rs b/types/src/row.rs index d6ed4d50..4dd8bd35 100644 --- a/types/src/row.rs +++ b/types/src/row.rs @@ -13,13 +13,12 @@ use bytes::{Buf, BufMut, BytesMut}; use celestia_proto::shwap::{row::HalfSide as RawHalfSide, Row as RawRow, Share as RawShare}; use cid::CidGeneric; use multihash::Multihash; -use nmt_rs::NamespaceMerkleHasher; use prost::Message; use serde::{Deserialize, Serialize}; use crate::consts::appconsts::SHARE_SIZE; use crate::eds::ExtendedDataSquare; -use crate::nmt::{NamespacedSha2Hasher, Nmt}; +use crate::nmt::{Nmt, NmtExt}; use crate::{DataAvailabilityHeader, Error, Result, Share}; /// Number of bytes needed to represent [`EdsId`] in `multihash`. @@ -68,7 +67,7 @@ impl Row { /// Verify the row against roots from DAH pub fn verify(&self, id: RowId, dah: &DataAvailabilityHeader) -> Result<()> { let row = id.index; - let mut tree = Nmt::with_hasher(NamespacedSha2Hasher::with_ignore_max_ns(true)); + let mut tree = Nmt::default(); for share in &self.shares { tree.push_leaf(share.as_ref(), *share.namespace()) diff --git a/types/src/row_namespace_data.rs b/types/src/row_namespace_data.rs index 9af9dae1..fc54a48f 100644 --- a/types/src/row_namespace_data.rs +++ b/types/src/row_namespace_data.rs @@ -79,7 +79,9 @@ impl RowNamespaceData { /// /// [`DataAvailabilityHeader`]: crate::DataAvailabilityHeader pub fn verify(&self, id: RowNamespaceDataId, dah: &DataAvailabilityHeader) -> Result<()> { - if self.shares.is_empty() { + if (self.shares.is_empty() && self.proof.is_of_presence()) + || (!self.shares.is_empty() && self.proof.is_of_absence()) + { return Err(Error::WrongProofType); } @@ -408,6 +410,21 @@ mod tests { } } + #[test] + fn verify_absent_ns() { + // parity share + let eds = generate_dummy_eds(2 << (rand::random::() % 8), AppVersion::V2); + let dah = DataAvailabilityHeader::from_eds(&eds); + + // namespace bigger than pay for blob, smaller than primary reserved padding, that is not + // used + let ns = Namespace::const_v0([0, 0, 0, 0, 0, 0, 0, 0, 0, 5]); + for (id, row) in eds.get_namespace_data(ns, &dah, 1).unwrap() { + assert!(row.shares.is_empty()); + row.verify(id, &dah).unwrap(); + } + } + #[test] fn reconstruct_all() { for _ in 0..3 {