Skip to content

Commit

Permalink
add blob.subscribe
Browse files Browse the repository at this point in the history
  • Loading branch information
zvolin committed Aug 21, 2024
1 parent f737a0e commit 6555c0e
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 5 deletions.
4 changes: 2 additions & 2 deletions rpc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ This crate builds on top of the [`jsonrpsee`](https://docs.rs/jsonrpsee) clients
```rust,no_run
use celestia_rpc::{BlobClient, Client};
use celestia_types::{Blob, nmt::Namespace};
use celestia_types::TxOptions;
use celestia_types::TxConfig;
async fn submit_blob() {
// create a client to the celestia node
Expand All @@ -22,7 +22,7 @@ async fn submit_blob() {
.expect("Failed to create a blob");
// submit it
client.blob_submit(&[blob], TxOptions::default())
client.blob_submit(&[blob], TxConfig::default())
.await
.expect("Failed submitting the blob");
}
Expand Down
21 changes: 21 additions & 0 deletions rpc/src/blob.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
//! celestia-node rpc types and methods related to blobs
use celestia_types::nmt::{Namespace, NamespaceProof};
use celestia_types::{Blob, Commitment, TxConfig};
use jsonrpsee::proc_macros::rpc;
use serde::{Deserialize, Serialize};

/// Response type for [`BlobClient::blob_subscribe`].
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct BlobsAtHeight {
/// Blobs submitted at given height.
pub blobs: Option<Vec<Blob>>,
/// A height for which the blobs were returned.
pub height: u64,
}

#[rpc(client)]
pub trait Blob {
Expand Down Expand Up @@ -43,4 +56,12 @@ pub trait Blob {
/// Submit sends Blobs and reports the height in which they were included. Allows sending multiple Blobs atomically synchronously. Uses default wallet registered on the Node.
#[method(name = "blob.Submit")]
async fn blob_submit(&self, blobs: &[Blob], opts: TxConfig) -> Result<u64, Error>;

/// Subscribe to published blobs from the given namespace as they are included.
///
/// # Notes
///
/// Unsubscribe is not implemented by Celestia nodes.
#[subscription(name = "blob.Subscribe", unsubscribe = "blob.Unsubscribe", item = BlobsAtHeight)]
async fn blob_subscribe(&self, namespace: Namespace) -> SubcriptionResult;
}
2 changes: 1 addition & 1 deletion rpc/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![cfg_attr(docsrs, feature(doc_cfg))]
#![doc = include_str!("../README.md")]

mod blob;
pub mod blob;
pub mod client;
mod error;
mod header;
Expand Down
58 changes: 58 additions & 0 deletions rpc/tests/blob.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
#![cfg(not(target_arch = "wasm32"))]

use std::cmp::Ordering;
use std::time::Duration;

use celestia_rpc::blob::BlobsAtHeight;
use celestia_rpc::prelude::*;
use celestia_types::{Blob, Commitment};
use jsonrpsee::core::client::Subscription;

pub mod utils;

Expand Down Expand Up @@ -114,6 +117,46 @@ async fn blob_submit_and_get_large() {
// because without it we can't know how many shares there are in each row
}

#[tokio::test]
async fn blob_subscribe() {
let client = new_test_client(AuthLevel::Write).await.unwrap();
let namespace = random_ns();

let mut incoming_blobs = client.blob_subscribe(namespace).await.unwrap();

// nothing was submitted
let received_blobs = incoming_blobs.next().await.unwrap().unwrap();
assert!(received_blobs.blobs.is_none());

// submit and receive blob
let blob = Blob::new(namespace, random_bytes(10)).unwrap();
let current_height = blob_submit(&client, &[blob.clone()]).await.unwrap();

let received = blobs_at_height(current_height, &mut incoming_blobs).await;
assert_eq!(received.len(), 1);
assert_blob_equal_to_sent(&received[0], &blob);

// submit blob to another ns
let blob_another_ns = Blob::new(random_ns(), random_bytes(10)).unwrap();
let current_height = blob_submit(&client, &[blob_another_ns]).await.unwrap();

let received = blobs_at_height(current_height, &mut incoming_blobs).await;
assert!(received.is_empty());

// submit and receive few blobs
let blob1 = Blob::new(namespace, random_bytes(10)).unwrap();
let blob2 = Blob::new(random_ns(), random_bytes(10)).unwrap(); // different ns
let blob3 = Blob::new(namespace, random_bytes(10)).unwrap();
let current_height = blob_submit(&client, &[blob1.clone(), blob2, blob3.clone()])
.await
.unwrap();

let received = blobs_at_height(current_height, &mut incoming_blobs).await;
assert_eq!(received.len(), 2);
assert_blob_equal_to_sent(&received[0], &blob1);
assert_blob_equal_to_sent(&received[1], &blob3);
}

#[tokio::test]
async fn blob_submit_too_large() {
let client = new_test_client(AuthLevel::Write).await.unwrap();
Expand Down Expand Up @@ -174,6 +217,21 @@ async fn blob_get_all_with_no_blobs() {
assert!(blobs.is_none());
}

// helps asserting certain block heights with blob subscription. as we drastically
// reduce celestia block time to speed up tests, we need to drain unwanted events
// from the subscription.
async fn blobs_at_height(height: u64, sub: &mut Subscription<BlobsAtHeight>) -> Vec<Blob> {
while let Some(received) = sub.next().await {
let received = received.unwrap();
match received.height.cmp(&height) {
Ordering::Less => continue,
Ordering::Equal => return received.blobs.unwrap_or_default(),
Ordering::Greater => panic!("height {height} missed"),
}
}
panic!("subscription error");
}

/// Blobs received from chain have index field set, so to
/// compare if they are equal to the ones we sent, we need
/// to overwrite the index field with received one.
Expand Down
4 changes: 2 additions & 2 deletions rpc/tests/utils/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use celestia_rpc::prelude::*;
use celestia_rpc::Client;
use celestia_types::Blob;
use celestia_types::TxConfig;
use jsonrpsee::core::client::ClientT;
use jsonrpsee::core::client::SubscriptionClientT;
use jsonrpsee::core::ClientError;
use tokio::sync::{Mutex, MutexGuard};

Expand Down Expand Up @@ -53,7 +53,7 @@ pub async fn new_test_client(auth_level: AuthLevel) -> Result<Client> {

pub async fn blob_submit<C>(client: &C, blobs: &[Blob]) -> Result<u64, ClientError>
where
C: ClientT + Sync,
C: SubscriptionClientT + Sync,
{
let _guard = write_lock().await;
client.blob_submit(blobs, TxConfig::default()).await
Expand Down

0 comments on commit 6555c0e

Please sign in to comment.