Skip to content

Commit

Permalink
feat(node-wasm, types)!: Add method to get blobs for wasm (#468)
Browse files Browse the repository at this point in the history
Co-authored-by: zvolin <[email protected]>
  • Loading branch information
fl0rek and zvolin authored Jan 7, 2025
1 parent 17a08d3 commit a572ac4
Show file tree
Hide file tree
Showing 11 changed files with 322 additions and 36 deletions.
77 changes: 55 additions & 22 deletions node-wasm/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
use std::time::Duration;

use blockstore::EitherBlockstore;
use celestia_types::ExtendedHeader;
use celestia_types::nmt::Namespace;
use celestia_types::{Blob, ExtendedHeader};
use js_sys::Array;
use lumina_node::blockstore::{InMemoryBlockstore, IndexedDbBlockstore};
use lumina_node::network;
Expand Down Expand Up @@ -250,13 +251,31 @@ impl NodeClient {
amount: u64,
) -> Result<Vec<ExtendedHeader>> {
let command = NodeCommand::GetVerifiedHeaders {
from: from.to_owned(),
from: from.clone(),
amount,
};
let response = self.worker.exec(command).await?;
response.into_headers().check_variant()?
}

/// Request all blobs with provided namespace in the block corresponding to this header
/// using bitswap protocol.
#[wasm_bindgen(js_name = requestAllBlobs)]
pub async fn request_all_blobs(
&self,
header: &ExtendedHeader,
namespace: &Namespace,
timeout_secs: Option<f64>,
) -> Result<Vec<Blob>> {
let command = NodeCommand::RequestAllBlobs {
header: header.clone(),
namespace: *namespace,
timeout_secs,
};
let response = self.worker.exec(command).await?;
response.into_blobs().check_variant()?
}

/// Get current header syncing info.
#[wasm_bindgen(js_name = syncerInfo)]
pub async fn syncer_info(&self) -> Result<SyncingInfoSnapshot> {
Expand All @@ -268,9 +287,6 @@ impl NodeClient {
}

/// Get the latest header announced in the network.
///
/// Returns a javascript object with given structure:
/// https://docs.rs/celestia-types/latest/celestia_types/struct.ExtendedHeader.html
#[wasm_bindgen(js_name = getNetworkHeadHeader)]
pub async fn get_network_head_header(&self) -> Result<Option<ExtendedHeader>> {
let command = NodeCommand::LastSeenNetworkHead;
Expand All @@ -279,9 +295,6 @@ impl NodeClient {
}

/// Get the latest locally synced header.
///
/// Returns a javascript object with given structure:
/// https://docs.rs/celestia-types/latest/celestia_types/struct.ExtendedHeader.html
#[wasm_bindgen(js_name = getLocalHeadHeader)]
pub async fn get_local_head_header(&self) -> Result<ExtendedHeader> {
let command = NodeCommand::GetHeader(SingleHeaderQuery::Head);
Expand All @@ -290,9 +303,6 @@ impl NodeClient {
}

/// Get a synced header for the block with a given hash.
///
/// Returns a javascript object with given structure:
/// https://docs.rs/celestia-types/latest/celestia_types/struct.ExtendedHeader.html
#[wasm_bindgen(js_name = getHeaderByHash)]
pub async fn get_header_by_hash(&self, hash: &str) -> Result<ExtendedHeader> {
let command = NodeCommand::GetHeader(SingleHeaderQuery::ByHash(hash.parse()?));
Expand All @@ -301,9 +311,6 @@ impl NodeClient {
}

/// Get a synced header for the block with a given height.
///
/// Returns a javascript object with given structure:
/// https://docs.rs/celestia-types/latest/celestia_types/struct.ExtendedHeader.html
#[wasm_bindgen(js_name = getHeaderByHeight)]
pub async fn get_header_by_height(&self, height: u64) -> Result<ExtendedHeader> {
let command = NodeCommand::GetHeader(SingleHeaderQuery::ByHeight(height));
Expand All @@ -320,9 +327,6 @@ impl NodeClient {
/// # Errors
///
/// If range contains a height of a header that is not found in the store.
///
/// Returns an array of javascript objects with given structure:
/// https://docs.rs/celestia-types/latest/celestia_types/struct.ExtendedHeader.html
#[wasm_bindgen(js_name = getHeaders)]
pub async fn get_headers(
&self,
Expand All @@ -338,9 +342,6 @@ impl NodeClient {
}

/// Get data sampling metadata of an already sampled height.
///
/// Returns a javascript object with given structure:
/// https://docs.rs/lumina-node/latest/lumina_node/store/struct.SamplingMetadata.html
#[wasm_bindgen(js_name = getSamplingMetadata)]
pub async fn get_sampling_metadata(&self, height: u64) -> Result<Option<SamplingMetadata>> {
let command = NodeCommand::GetSamplingMetadata { height };
Expand Down Expand Up @@ -435,9 +436,9 @@ mod tests {

use std::time::Duration;

use celestia_rpc::{prelude::*, Client};
use celestia_rpc::{prelude::*, Client, TxConfig};
use celestia_types::p2p::PeerId;
use celestia_types::ExtendedHeader;
use celestia_types::{AppVersion, ExtendedHeader};
use gloo_timers::future::sleep;
use libp2p::{multiaddr::Protocol, Multiaddr};
use rexie::Rexie;
Expand Down Expand Up @@ -497,6 +498,38 @@ mod tests {
.unwrap();
}

#[wasm_bindgen_test]
async fn get_blob() {
remove_database().await.expect("failed to clear db");
let rpc_client = Client::new(WS_URL).await.unwrap();
let namespace = Namespace::new_v0(&[0xCD, 0xDC, 0xCD, 0xDC, 0xCD, 0xDC]).unwrap();
let data = b"Hello, World";
let blobs = vec![Blob::new(namespace, data.to_vec(), AppVersion::V3).unwrap()];

let submitted_height = rpc_client
.blob_submit(&blobs, TxConfig::default())
.await
.expect("successful submission");

let header = rpc_client
.header_get_by_height(submitted_height)
.await
.expect("header for blob");

let bridge_ma = fetch_bridge_webtransport_multiaddr(&rpc_client).await;
let client = spawn_connected_node(vec![bridge_ma.to_string()]).await;

let mut blobs = client
.request_all_blobs(&header, &namespace, None)
.await
.expect("to fetch blob");

assert_eq!(blobs.len(), 1);
let blob = blobs.pop().unwrap();
assert_eq!(blob.data, data);
assert_eq!(blob.namespace, namespace);
}

async fn spawn_connected_node(bootnodes: Vec<String>) -> NodeClient {
let message_channel = MessageChannel::new().unwrap();
let mut worker = NodeWorker::new(message_channel.port1().into());
Expand Down
8 changes: 8 additions & 0 deletions node-wasm/src/commands.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::fmt::Debug;

use celestia_types::nmt::Namespace;
use celestia_types::Blob;
use enum_as_inner::EnumAsInner;
use libp2p::Multiaddr;
use libp2p::PeerId;
Expand Down Expand Up @@ -51,6 +53,11 @@ pub(crate) enum NodeCommand {
GetSamplingMetadata {
height: u64,
},
RequestAllBlobs {
header: ExtendedHeader,
namespace: Namespace,
timeout_secs: Option<f64>,
},
}

#[derive(Serialize, Deserialize, Debug)]
Expand Down Expand Up @@ -80,6 +87,7 @@ pub(crate) enum WorkerResponse {
Headers(Result<Vec<ExtendedHeader>, Error>),
LastSeenNetworkHead(Result<Option<ExtendedHeader>, Error>),
SamplingMetadata(Result<Option<SamplingMetadata>>),
Blobs(Result<Vec<Blob>>),
}

pub(crate) trait CheckableResponseExt {
Expand Down
24 changes: 24 additions & 0 deletions node-wasm/src/worker.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::fmt::Debug;
use std::time::Duration;

use blockstore::EitherBlockstore;
use celestia_types::nmt::Namespace;
use celestia_types::Blob;
use libp2p::{Multiaddr, PeerId};
use serde::{Deserialize, Serialize};
use serde_wasm_bindgen::to_value;
Expand Down Expand Up @@ -224,6 +227,19 @@ impl NodeWorkerInstance {
Ok(self.node.get_sampling_metadata(height).await?)
}

async fn request_all_blobs(
&mut self,
header: ExtendedHeader,
namespace: Namespace,
timeout_secs: Option<f64>,
) -> Result<Vec<Blob>> {
let timeout = timeout_secs.map(Duration::from_secs_f64);
Ok(self
.node
.request_all_blobs(&header, namespace, timeout)
.await?)
}

async fn process_command(&mut self, command: NodeCommand) -> WorkerResponse {
match command {
NodeCommand::IsRunning => WorkerResponse::IsRunning(true),
Expand Down Expand Up @@ -273,6 +289,14 @@ impl NodeWorkerInstance {
NodeCommand::GetSamplingMetadata { height } => {
WorkerResponse::SamplingMetadata(self.get_sampling_metadata(height).await)
}
NodeCommand::RequestAllBlobs {
header,
namespace,
timeout_secs,
} => WorkerResponse::Blobs(
self.request_all_blobs(header, namespace, timeout_secs)
.await,
),
NodeCommand::InternalPing => WorkerResponse::InternalPong,
}
}
Expand Down
2 changes: 1 addition & 1 deletion node/src/store/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl From<ExtendedHeader> for VerifiedExtendedHeaders {
}
}

impl From<&'_ ExtendedHeader> for VerifiedExtendedHeaders {
impl From<&ExtendedHeader> for VerifiedExtendedHeaders {
fn from(value: &ExtendedHeader) -> Self {
Self(vec![value.to_owned()])
}
Expand Down
2 changes: 1 addition & 1 deletion rpc/tests/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ async fn blob_get_get_proof_wrong_commitment() {
let namespace = random_ns();
let data = random_bytes(5);
let blob = Blob::new(namespace, data, AppVersion::V2).unwrap();
let commitment = Commitment(random_bytes_array());
let commitment = Commitment::new(random_bytes_array());

let submitted_height = blob_submit(&client, &[blob.clone()]).await.unwrap();

Expand Down
7 changes: 4 additions & 3 deletions types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ thiserror = "1.0.61"
time = { version = "0.3.36", default-features = false }

[target.'cfg(target_arch = "wasm32")'.dependencies]
wasm-bindgen = { version = "0.2.95", optional = true }
serde-wasm-bindgen = { version = "0.6.5", optional = true }
js-sys = { version = "0.3.76", optional = true }
serde-wasm-bindgen = { version = "0.6.5", optional = true }
wasm-bindgen = { version = "0.2.95", optional = true }

[dev-dependencies]
ed25519-consensus = "2.1.0"
Expand All @@ -63,7 +63,8 @@ wasm-bindgen-test = "0.3.42"
default = ["p2p"]
p2p = ["dep:libp2p-identity", "dep:multiaddr", "dep:serde_repr"]
test-utils = ["dep:ed25519-consensus", "dep:rand"]
wasm-bindgen = ["time/wasm-bindgen", "dep:wasm-bindgen", "dep:serde-wasm-bindgen", "dep:js-sys", "nmt-rs/serde"]
tonic = ["celestia-proto/tonic"]
wasm-bindgen = ["dep:js-sys", "dep:serde-wasm-bindgen", "dep:wasm-bindgen", "nmt-rs/serde", "time/wasm-bindgen"]

[package.metadata.docs.rs]
features = ["p2p", "test-utils"]
Expand Down
22 changes: 21 additions & 1 deletion types/src/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,18 @@ pub use self::msg_pay_for_blobs::MsgPayForBlobs;
pub use celestia_proto::celestia::blob::v1::MsgPayForBlobs as RawMsgPayForBlobs;
pub use celestia_proto::proto::blob::v1::BlobProto as RawBlob;
pub use celestia_proto::proto::blob::v1::BlobTx as RawBlobTx;
#[cfg(all(feature = "wasm-bindgen", target_arch = "wasm32"))]
use wasm_bindgen::prelude::*;

/// Arbitrary data that can be stored in the network within certain [`Namespace`].
// NOTE: We don't use the `serde(try_from)` pattern for this type
// becase JSON representation needs to have `commitment` field but
// Protobuf definition doesn't.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[cfg_attr(
all(feature = "wasm-bindgen", target_arch = "wasm32"),
wasm_bindgen(getter_with_clone, inspectable)
)]
pub struct Blob {
/// A [`Namespace`] the [`Blob`] belongs to.
pub namespace: Namespace,
Expand Down Expand Up @@ -321,6 +327,20 @@ impl From<Blob> for RawBlob {
}
}

#[cfg(all(feature = "wasm-bindgen", target_arch = "wasm32"))]
#[wasm_bindgen]
impl Blob {
/// Create a new blob with the given data within the [`Namespace`].
#[wasm_bindgen(constructor)]
pub fn js_new(
namespace: &Namespace,
data: Vec<u8>,
app_version: &appconsts::JsAppVersion,
) -> Result<Blob> {
Self::new(*namespace, data, (*app_version).into())
}
}

fn shares_needed_for_blob(blob_len: usize) -> usize {
let Some(without_first_share) =
blob_len.checked_sub(appconsts::FIRST_SPARSE_SHARE_CONTENT_SIZE)
Expand Down Expand Up @@ -395,7 +415,7 @@ mod tests {
#[test]
fn validate_blob_commitment_mismatch() {
let mut blob = sample_blob();
blob.commitment.0.fill(7);
blob.commitment = Commitment::new([7; 32]);

blob.validate(AppVersion::V2).unwrap_err();
}
Expand Down
43 changes: 39 additions & 4 deletions types/src/blob/commitment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use nmt_rs::NamespaceMerkleHasher;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use tendermint::crypto::sha256::HASH_SIZE;
use tendermint::{crypto, merkle};
#[cfg(all(feature = "wasm-bindgen", target_arch = "wasm32"))]
use wasm_bindgen::prelude::*;

use crate::consts::appconsts;
use crate::nmt::{Namespace, NamespacedHashExt, NamespacedSha2Hasher, Nmt, RawNamespacedHash};
Expand Down Expand Up @@ -51,9 +53,21 @@ use crate::{InfoByte, Share};
/// [`ExtendedDataSquare`]: crate::ExtendedDataSquare
/// [`share commitment rules`]: https://github.com/celestiaorg/celestia-app/blob/main/specs/src/specs/data_square_layout.md#blob-share-commitment-rules
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct Commitment(pub merkle::Hash);
#[cfg_attr(
all(feature = "wasm-bindgen", target_arch = "wasm32"),
wasm_bindgen(inspectable)
)]
pub struct Commitment {
/// Hash of the commitment
hash: merkle::Hash,
}

impl Commitment {
/// Create a new commitment with hash
pub fn new(hash: merkle::Hash) -> Self {
Commitment { hash }
}

/// Generate the share commitment from the given blob data.
pub fn from_blob(
namespace: Namespace,
Expand Down Expand Up @@ -101,7 +115,28 @@ impl Commitment {

let hash = merkle::simple_hash_from_byte_vectors::<crypto::default::Sha256>(&subtree_roots);

Ok(Commitment(hash))
Ok(Commitment { hash })
}

/// Hash of the commitment
pub fn hash(&self) -> &merkle::Hash {
&self.hash
}
}

#[cfg(all(feature = "wasm-bindgen", target_arch = "wasm32"))]
#[wasm_bindgen]
impl Commitment {
/// Hash of the commitment
#[wasm_bindgen(js_name = hash)]
pub fn js_hash(&self) -> Vec<u8> {
self.hash.to_vec()
}
}

impl From<Commitment> for merkle::Hash {
fn from(commitment: Commitment) -> Self {
commitment.hash
}
}

Expand All @@ -110,7 +145,7 @@ impl Serialize for Commitment {
where
S: Serializer,
{
let s = BASE64_STANDARD.encode(self.0);
let s = BASE64_STANDARD.encode(self.hash);
serializer.serialize_str(&s)
}
}
Expand All @@ -133,7 +168,7 @@ impl<'de> Deserialize<'de> for Commitment {
.try_into()
.map_err(|_| serde::de::Error::custom("commitment is not a size of a sha256"))?;

Ok(Commitment(hash))
Ok(Commitment { hash })
}
}

Expand Down
Loading

0 comments on commit a572ac4

Please sign in to comment.