Skip to content

Commit

Permalink
feat!(node): add a method to get all blobs using shwap (#452)
Browse files Browse the repository at this point in the history
Signed-off-by: Maciej Zwoliński <[email protected]>
Co-authored-by: Yiannis Marangos <[email protected]>
Co-authored-by: Mikołaj Florkiewicz <[email protected]>
  • Loading branch information
3 people authored Oct 31, 2024
1 parent 3ecd476 commit 9410c67
Show file tree
Hide file tree
Showing 14 changed files with 435 additions and 50 deletions.
8 changes: 3 additions & 5 deletions ci/Dockerfile.bridge
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@
# https://github.com/celestiaorg/celestia-node/blob/main/Dockerfile
FROM docker.io/alpine:3.19.1

ENV CELESTIA_HOME=/root

RUN apk update && apk add --no-cache bash jq
RUN apk update && apk add --no-cache bash jq dasel

# Copy in the binary
COPY --from=ghcr.io/celestiaorg/celestia-node:v0.16.0 /bin/celestia /bin/celestia
COPY --from=ghcr.io/celestiaorg/celestia-node:v0.16.0 /bin/cel-key /bin/cel-key
COPY --from=ghcr.io/celestiaorg/celestia-node:v0.18.3-mocha /bin/celestia /bin/celestia
COPY --from=ghcr.io/celestiaorg/celestia-node:v0.18.3-mocha /bin/cel-key /bin/cel-key

COPY ./run-bridge.sh /opt/entrypoint.sh

Expand Down
2 changes: 1 addition & 1 deletion ci/Dockerfile.validator
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ ENV CELESTIA_HOME=/root
RUN apk update && apk add --no-cache bash jq

# Copy in the binary
COPY --from=ghcr.io/celestiaorg/celestia-app:v2.1.2 /bin/celestia-appd /bin/celestia-appd
COPY --from=ghcr.io/celestiaorg/celestia-app:v2.3.0 /bin/celestia-appd /bin/celestia-appd

COPY ./run-validator.sh /opt/entrypoint.sh

Expand Down
15 changes: 13 additions & 2 deletions ci/run-bridge.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ NODE_NAME="bridge-$NODE_ID"
# a private local network
P2P_NETWORK="private"
# a bridge node configuration directory
CONFIG_DIR="$CELESTIA_HOME/.celestia-bridge-$P2P_NETWORK"
CONFIG_DIR="$HOME/.celestia-bridge-$P2P_NETWORK"
# directory and the files shared with the validator node
CREDENTIALS_DIR="/credentials"
# node credentials
Expand Down Expand Up @@ -47,6 +47,15 @@ add_trusted_genesis() {
sed -i'.bak' "s/TrustedHash = .*/TrustedHash = $genesis_hash/" "$CONFIG_DIR/config.toml"
}

whitelist_localhost_nodes() {
# to get the list of ips:
# cargo run -- node -n private -l 0.0.0.0
# docker compose -f ci/docker-compose.yml exec bridge-0 celestia p2p peer-info $lumina_peerid
dasel put -f "$CONFIG_DIR/config.toml" \
-t json -v '["172.18.0.1/24", "172.17.0.1/24", "192.168.0.0/16"]' \
'P2P.IPColocationWhitelist'
}

write_jwt_token() {
echo "Saving jwt token to $NODE_JWT_FILE"
celestia bridge auth admin --p2p.network "$P2P_NETWORK" > "$NODE_JWT_FILE"
Expand All @@ -55,6 +64,8 @@ write_jwt_token() {
main() {
# Initialize the bridge node
celestia bridge init --p2p.network "$P2P_NETWORK"
# don't allow banning nodes we create in tests by pubsub ip counting
whitelist_localhost_nodes
# Wait for a validator
wait_for_provision
# Import the key with the coins
Expand All @@ -68,7 +79,7 @@ main() {
# Start the bridge node
echo "Configuration finished. Running a bridge node..."
celestia bridge start \
--rpc.skip-auth=$SKIP_AUTH \
--rpc.skip-auth="$SKIP_AUTH" \
--rpc.addr 0.0.0.0 \
--core.ip validator \
--keyring.keyname "$NODE_NAME" \
Expand Down
5 changes: 4 additions & 1 deletion node/src/daser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ const MAX_SAMPLES_NEEDED: usize = 16;
const HOUR: u64 = 60 * 60;
const DAY: u64 = 24 * HOUR;
const DEFAULT_SAMPLING_WINDOW: Duration = Duration::from_secs(30 * DAY);
const GET_SAMPLE_TIMEOUT: Duration = Duration::from_secs(10);

type Result<T, E = DaserError> = std::result::Result<T, E>;

Expand Down Expand Up @@ -364,7 +365,9 @@ where
let p2p = p2p.clone();

async move {
let res = p2p.get_sample(row, col, height).await;
let res = p2p
.get_sample(row, col, height, Some(GET_SAMPLE_TIMEOUT))
.await;
(row, col, res)
}
})
Expand Down
28 changes: 23 additions & 5 deletions node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use celestia_types::nmt::Namespace;
use celestia_types::row::Row;
use celestia_types::row_namespace_data::RowNamespaceData;
use celestia_types::sample::Sample;
use celestia_types::ExtendedHeader;
use celestia_types::{Blob, ExtendedHeader};
use libp2p::identity::Keypair;
use libp2p::swarm::NetworkInfo;
use libp2p::{Multiaddr, PeerId};
Expand Down Expand Up @@ -338,8 +338,13 @@ where
///
/// On failure to receive a verified [`Row`] within a certain time, the
/// `NodeError::P2p(P2pError::BitswapQueryTimeout)` error will be returned.
pub async fn request_row(&self, row_index: u16, block_height: u64) -> Result<Row> {
Ok(self.p2p().get_row(row_index, block_height).await?)
pub async fn request_row(
&self,
row_index: u16,
block_height: u64,
timeout: Option<Duration>,
) -> Result<Row> {
Ok(self.p2p().get_row(row_index, block_height, timeout).await?)
}

/// Request a verified [`Sample`] from the network.
Expand All @@ -353,10 +358,11 @@ where
row_index: u16,
column_index: u16,
block_height: u64,
timeout: Option<Duration>,
) -> Result<Sample> {
Ok(self
.p2p()
.get_sample(row_index, column_index, block_height)
.get_sample(row_index, column_index, block_height, timeout)
.await?)
}

Expand All @@ -371,13 +377,25 @@ where
namespace: Namespace,
row_index: u16,
block_height: u64,
timeout: Option<Duration>,
) -> Result<RowNamespaceData> {
Ok(self
.p2p()
.get_row_namespace_data(namespace, row_index, block_height)
.get_row_namespace_data(namespace, row_index, block_height, timeout)
.await?)
}

/// Request all blobs with provided namespace in the block corresponding to this header
/// using bitswap protocol.
pub async fn request_all_blobs(
&self,
header: &ExtendedHeader,
namespace: Namespace,
timeout: Option<Duration>,
) -> Result<Vec<Blob>> {
Ok(self.p2p().get_all_blobs(header, namespace, timeout).await?)
}

/// Get current header syncing info.
pub async fn syncer_info(&self) -> Result<SyncingInfo> {
Ok(self.syncer().info().await?)
Expand Down
66 changes: 50 additions & 16 deletions node/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ use std::time::Duration;
use blockstore::Blockstore;
use celestia_proto::p2p::pb::{header_request, HeaderRequest};
use celestia_tendermint_proto::Protobuf;
use celestia_types::nmt::Namespace;
use celestia_types::nmt::{Namespace, NamespacedSha2Hasher};
use celestia_types::row::{Row, RowId};
use celestia_types::row_namespace_data::{RowNamespaceData, RowNamespaceDataId};
use celestia_types::sample::{Sample, SampleId};
use celestia_types::{fraud_proof::BadEncodingFraudProof, hash::Hash};
use celestia_types::{ExtendedHeader, FraudProof};
use celestia_types::{Blob, ExtendedHeader, FraudProof};
use cid::Cid;
use futures::StreamExt;
use futures::stream::FuturesOrdered;
use futures::{StreamExt, TryStreamExt};
use libp2p::core::transport::ListenerId;
use libp2p::{
autonat,
Expand Down Expand Up @@ -83,8 +84,6 @@ const MIN_CONNECTED_PEERS: u64 = 4;
// Maximum size of a [`Multihash`].
pub(crate) const MAX_MH_SIZE: usize = 64;

pub(crate) const GET_SAMPLE_TIMEOUT: Duration = Duration::from_secs(10);

// all fraud proofs for height bigger than head height by this threshold
// will be ignored
const FRAUD_PROOF_HEAD_HEIGHT_THRESHOLD: u64 = 20;
Expand Down Expand Up @@ -145,6 +144,10 @@ pub enum P2pError {
/// Shwap protocol error.
#[error("Shwap: {0}")]
Shwap(String),

/// An error propagated from [`celestia_types`].
#[error(transparent)]
CelestiaTypes(#[from] celestia_types::Error),
}

impl P2pError {
Expand All @@ -165,7 +168,8 @@ impl P2pError {
| P2pError::ProtoDecodeFailed(_)
| P2pError::Cid(_)
| P2pError::BitswapQueryTimeout
| P2pError::Shwap(_) => false,
| P2pError::Shwap(_)
| P2pError::CelestiaTypes(_) => false,
}
}
}
Expand Down Expand Up @@ -514,31 +518,32 @@ impl P2p {
}

/// Request a [`Row`] on bitswap protocol.
pub async fn get_row(&self, row_index: u16, block_height: u64) -> Result<Row> {
pub async fn get_row(
&self,
row_index: u16,
block_height: u64,
timeout: Option<Duration>,
) -> Result<Row> {
let id = RowId::new(row_index, block_height).map_err(P2pError::Cid)?;
let cid = convert_cid(&id.into())?;

// TODO: add timeout
let data = self.get_shwap_cid(cid, None).await?;
let data = self.get_shwap_cid(cid, timeout).await?;
let row = Row::decode(id, &data[..]).map_err(|e| P2pError::Shwap(e.to_string()))?;
Ok(row)
}

/// Request a [`Sample`] on bitswap protocol.
///
/// This method awaits for a verified `Sample` until timeout of 10 second
/// is reached. On timeout it is safe to assume that sampling of the block
/// failed.
pub async fn get_sample(
&self,
row_index: u16,
column_index: u16,
block_height: u64,
timeout: Option<Duration>,
) -> Result<Sample> {
let id = SampleId::new(row_index, column_index, block_height).map_err(P2pError::Cid)?;
let cid = convert_cid(&id.into())?;

let data = self.get_shwap_cid(cid, Some(GET_SAMPLE_TIMEOUT)).await?;
let data = self.get_shwap_cid(cid, timeout).await?;
let sample = Sample::decode(id, &data[..]).map_err(|e| P2pError::Shwap(e.to_string()))?;
Ok(sample)
}
Expand All @@ -549,18 +554,47 @@ impl P2p {
namespace: Namespace,
row_index: u16,
block_height: u64,
timeout: Option<Duration>,
) -> Result<RowNamespaceData> {
let id =
RowNamespaceDataId::new(namespace, row_index, block_height).map_err(P2pError::Cid)?;
let cid = convert_cid(&id.into())?;

// TODO: add timeout
let data = self.get_shwap_cid(cid, None).await?;
let data = self.get_shwap_cid(cid, timeout).await?;
let row_namespace_data =
RowNamespaceData::decode(id, &data[..]).map_err(|e| P2pError::Shwap(e.to_string()))?;
Ok(row_namespace_data)
}

/// Request all blobs with provided namespace in the block corresponding to this header
/// using bitswap protocol.
pub async fn get_all_blobs(
&self,
header: &ExtendedHeader,
namespace: Namespace,
timeout: Option<Duration>,
) -> Result<Vec<Blob>> {
let height = header.height().value();
let app_version = header.app_version()?;
let rows_to_fetch: Vec<_> = header
.dah
.row_roots()
.iter()
.enumerate()
.filter(|(_, row)| row.contains::<NamespacedSha2Hasher>(*namespace))
.map(|(n, _)| n as u16)
.collect();

let futs = rows_to_fetch
.into_iter()
.map(|row_idx| self.get_row_namespace_data(namespace, row_idx, height, timeout))
.collect::<FuturesOrdered<_>>();
let rows: Vec<_> = futs.try_collect().await?;
let shares = rows.iter().flat_map(|row| row.shares.iter());

Ok(Blob::reconstruct_all(shares, app_version)?)
}

/// Get the addresses where [`P2p`] listens on for incoming connections.
pub async fn listeners(&self) -> Result<Vec<Multiaddr>> {
let (tx, rx) = oneshot::channel();
Expand Down
6 changes: 3 additions & 3 deletions node/tests/header_ex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ mod utils;

#[tokio::test]
async fn request_single_header() {
let node = new_connected_node().await;
let (node, _) = new_connected_node().await;

let header = node.request_header_by_height(1).await.unwrap();
let header_by_hash = node.request_header_by_hash(&header.hash()).await.unwrap();
Expand All @@ -26,7 +26,7 @@ async fn request_single_header() {

#[tokio::test]
async fn request_verified_headers() {
let node = new_connected_node().await;
let (node, _) = new_connected_node().await;

let from = node.request_header_by_height(1).await.unwrap();
let verified_headers = node.request_verified_headers(&from, 2).await.unwrap();
Expand All @@ -41,7 +41,7 @@ async fn request_verified_headers() {

#[tokio::test]
async fn request_head() {
let node = new_connected_node().await;
let (node, _) = new_connected_node().await;

let genesis = node.request_header_by_height(1).await.unwrap();

Expand Down
2 changes: 1 addition & 1 deletion node/tests/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ mod utils;

#[tokio::test]
async fn connects_to_the_go_bridge_node() {
let node = new_connected_node().await;
let (node, _) = new_connected_node().await;

let info = node.network_info().await.unwrap();
assert_eq!(info.num_peers(), 1);
Expand Down
Loading

0 comments on commit 9410c67

Please sign in to comment.