Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement JsonRpc transport for batch requests #600

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions examples/jsonrpc_batch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use starknet::{
core::types::{BlockId, BlockTag},
providers::{
jsonrpc::{HttpTransport, JsonRpcClient},
Provider, Url,
},
};
use starknet_core::types::requests::{GetBlockWithTxHashesRequestRef, GetBlockWithTxsRequestRef};
use starknet_providers::jsonrpc::{JsonRpcMethod, JsonRpcRequestParams};

#[tokio::main]
async fn main() {
// Create a new JSON RPC client using HTTP transport with the specified URL
let provider = JsonRpcClient::new(HttpTransport::new(
Url::parse("https://starknet-sepolia.public.blastapi.io/rpc/v0_7").unwrap(),
));

// batch_requests allows to define a vector of requests for batch processing, ensuring each request specifies its corresponding JsonRpcMethod and JsonRpcRequestParams.
// This approach allows for a generic way to handle batch requests.
let batch_mixed_results = provider
.batch_requests(vec![
// Request 1: Retrieve block data including transaction hashes.
(
JsonRpcMethod::GetBlockWithTxHashes,
JsonRpcRequestParams::GetBlockWithTxHashes(GetBlockWithTxHashesRequestRef {
block_id: BlockId::Tag(BlockTag::Latest).as_ref(),
}),
),
// Request 2: Retrieve block data including full transaction details.
(
JsonRpcMethod::GetBlockWithTxs,
JsonRpcRequestParams::GetBlockWithTxs(GetBlockWithTxsRequestRef {
block_id: BlockId::Tag(BlockTag::Latest).as_ref(),
}),
),
])
.await;

match batch_mixed_results {
Ok(v) => println!("{v:#?}"),
Err(e) => println!("Error: {e:#?}"),
}

// The following example demonstrates the process of sending a batch request to retrieve multiple blocks, each including transaction hashes.
// get_block_with_tx_hashes_batch utilizes a vector of BlockId parameters to construct the batch request.
let batched_blocks = provider
.get_block_with_tx_hashes_batch(vec![
BlockId::Tag(BlockTag::Latest),
BlockId::Tag(BlockTag::Latest),
])
.await;

match batched_blocks {
Ok(v) => println!("{v:#?}"),
Err(e) => println!("Error: {e:#?}"),
}
}
30 changes: 29 additions & 1 deletion starknet-providers/src/any.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use async_trait::async_trait;
use serde::Serialize;
use starknet_core::types::{
BlockHashAndNumber, BlockId, BroadcastedDeclareTransaction,
BroadcastedDeployAccountTransaction, BroadcastedInvokeTransaction, BroadcastedTransaction,
Expand All @@ -11,7 +12,7 @@ use starknet_core::types::{
};

use crate::{
jsonrpc::{HttpTransport, JsonRpcClient},
jsonrpc::{HttpTransport, JsonRpcClient, JsonRpcMethod},
Provider, ProviderError, SequencerGatewayProvider,
};

Expand Down Expand Up @@ -665,4 +666,31 @@ impl Provider for AnyProvider {
}
}
}

async fn batch_requests<I, P>(
&self,
requests: I,
) -> Result<Vec<serde_json::Value>, ProviderError>
where
I: IntoIterator<Item = (JsonRpcMethod, P)> + Send + Sync,
P: Serialize + Send + Sync,
{
match self {
Self::JsonRpcHttp(inner) => inner.batch_requests(requests).await,
Self::SequencerGateway(inner) => inner.batch_requests(requests).await,
}
}

async fn get_block_with_tx_hashes_batch<B>(
&self,
block_ids: Vec<B>,
) -> Result<Vec<MaybePendingBlockWithTxHashes>, ProviderError>
where
B: AsRef<BlockId> + Send + Sync,
{
match self {
Self::JsonRpcHttp(inner) => inner.get_block_with_tx_hashes_batch(block_ids).await,
Self::SequencerGateway(inner) => inner.get_block_with_tx_hashes_batch(block_ids).await,
}
}
}
72 changes: 72 additions & 0 deletions starknet-providers/src/jsonrpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,16 @@ pub enum JsonRpcMethod {
}

/// JSON-RPC request.
#[derive(Debug, Clone, Serialize)]
#[serde(untagged)]
pub enum JsonRpcRequestParams<'a> {
/// Parameters for getting a block with transaction hashes.
GetBlockWithTxHashes(GetBlockWithTxHashesRequestRef<'a>),
/// Parameters for getting a block with full transactions.
GetBlockWithTxs(GetBlockWithTxsRequestRef<'a>),
}

/// Represents a JSON-RPC request with a unique identifier.
#[derive(Debug, Clone)]
pub struct JsonRpcRequest {
/// ID of the request. Useful for identifying responses in certain transports like `WebSocket`.
Expand All @@ -136,6 +146,13 @@ pub struct JsonRpcRequest {

/// Typed request data for Starknet JSON-RPC requests.
#[derive(Debug, Clone)]
pub struct JsonRpcRequests {
/// A list of JSON-RPC requests.
pub requests: Vec<JsonRpcRequest>,
}

/// Represents the data for various JSON-RPC requests
#[derive(Debug, Clone, Serialize)]
pub enum JsonRpcRequestData {
/// Request data for `starknet_specVersion`.
SpecVersion(SpecVersionRequest),
Expand Down Expand Up @@ -303,6 +320,32 @@ where
}
}
}

async fn send_requests<I, P, R>(&self, requests: I) -> Result<Vec<R>, ProviderError>
where
I: IntoIterator<Item = (JsonRpcMethod, P)> + Send + Sync,
P: Serialize + Send + Sync,
R: DeserializeOwned,
{
let responses = self
.transport
.send_requests(requests)
.await
.map_err(JsonRpcClientError::TransportError)?;

responses
.into_iter()
.map(|response| match response {
JsonRpcResponse::Success { result, .. } => Ok(result),
JsonRpcResponse::Error { error, .. } => {
Err(match TryInto::<StarknetError>::try_into(&error) {
Ok(error) => ProviderError::StarknetError(error),
Err(_) => JsonRpcClientError::<T::Error>::JsonRpcError(error).into(),
})
}
})
.collect()
}
}

#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
Expand All @@ -317,6 +360,35 @@ where
.await
}

async fn batch_requests<I, P>(
&self,
requests: I,
) -> Result<Vec<serde_json::Value>, ProviderError>
where
I: IntoIterator<Item = (JsonRpcMethod, P)> + Send + Sync,
P: Serialize + Send + Sync,
{
self.send_requests(requests).await
}

async fn get_block_with_tx_hashes_batch<B>(
&self,
block_ids: Vec<B>,
) -> Result<Vec<MaybePendingBlockWithTxHashes>, ProviderError>
where
B: AsRef<BlockId> + Send + Sync,
{
let requests = block_ids.iter().map(|block_id| {
(
JsonRpcMethod::GetBlockWithTxHashes,
GetBlockWithTxHashesRequestRef {
block_id: block_id.as_ref(),
},
)
});
self.send_requests(requests).await
}

/// Get block information with transaction hashes given the block id
async fn get_block_with_tx_hashes<B>(
&self,
Expand Down
42 changes: 42 additions & 0 deletions starknet-providers/src/jsonrpc/transports/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,46 @@ impl JsonRpcTransport for HttpTransport {

Ok(parsed_response)
}

async fn send_requests<I, P, R>(
&self,
requests: I,
) -> Result<Vec<JsonRpcResponse<R>>, Self::Error>
where
I: IntoIterator<Item = (JsonRpcMethod, P)> + Send,
P: Serialize + Send,
R: DeserializeOwned,
{
let batch_requests: Vec<_> = requests
.into_iter()
.enumerate()
.map(|(id, (method, params))| JsonRpcRequest {
id: id as u64 + 1,
jsonrpc: "2.0",
method,
params,
})
.collect();

let serialized_batch = serde_json::to_string(&batch_requests).map_err(Self::Error::Json)?;

let mut request = self
.client
.post(self.url.clone())
.body(serialized_batch)
.header("Content-Type", "application/json");

for (name, value) in &self.headers {
request = request.header(name, value);
}

let response = request.send().await.map_err(Self::Error::Reqwest)?;

let response_body = response.text().await.map_err(Self::Error::Reqwest)?;
trace!("Response from JSON-RPC: {}", response_body);

let parsed_response = serde_json::from_str(&response_body).map_err(Self::Error::Json)?;

Ok(parsed_response)
thetheveloper marked this conversation as resolved.
Show resolved Hide resolved
}
}
10 changes: 10 additions & 0 deletions starknet-providers/src/jsonrpc/transports/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,14 @@ pub trait JsonRpcTransport {
where
P: Serialize + Send + Sync,
R: DeserializeOwned;

/// Sends multiple JSON-RPC requests and retrieves their responses.
async fn send_requests<I, P, R>(
&self,
requests: I,
) -> Result<Vec<JsonRpcResponse<R>>, Self::Error>
where
I: IntoIterator<Item = (JsonRpcMethod, P)> + Send + Sync,
P: Serialize + Send + Sync,
R: DeserializeOwned;
}
32 changes: 32 additions & 0 deletions starknet-providers/src/provider.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use async_trait::async_trait;
use auto_impl::auto_impl;
use serde::Serialize;
use starknet_core::types::{
BlockHashAndNumber, BlockId, BroadcastedDeclareTransaction,
BroadcastedDeployAccountTransaction, BroadcastedInvokeTransaction, BroadcastedTransaction,
Expand All @@ -22,6 +23,9 @@ use std::{any::Any, error::Error, fmt::Debug};
/// The legacy [`SequencerGatewayProvider`](crate::sequencer::SequencerGatewayProvider) still
/// implements this trait for backward compatibility reasons, but most of its methods no longer work
/// in practice, as public sequencer servers have generally block access to most methods.
use crate::jsonrpc::JsonRpcMethod;

/// Represents a provider interface for interacting with the Starknet network.
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[auto_impl(&, Box, Arc)]
Expand Down Expand Up @@ -311,6 +315,34 @@ pub trait Provider {
Err(ProviderError::ArrayLengthMismatch)
}
}

/// Executes a batch of JSON-RPC requests concurrently.
///
/// This method takes an iterator of requests, where each request is a tuple consisting of a
/// JSON-RPC method and its parameters. It returns a vector of results, each encoded as a
/// `serde_json::Value`.
///
/// # Type Parameters
/// - `I`: An iterator type where each item is a tuple containing a `JsonRpcMethod` and parameters `P`.
/// - `P`: The type of the parameters to be serialized for the JSON-RPC request.
///
/// # Errors
/// Returns `ProviderError` if any of the requests fail.
async fn batch_requests<I, P>(
&self,
requests: I,
) -> Result<Vec<serde_json::Value>, ProviderError>
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to resort to serde_json::Value here? I generally dislike having that as part of the interface.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. Not necessarily, however this function was specifically introduced for this approach - to return generic JSON type based on the type returned. Otherwise, we'd have to create custom types for this and that would limit the ways we can call it

It's possible to change it if you prefer any other direction

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adrienlacombe For me this PR is ready to be merged but I think this discussion still is opened

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ty @tcoratger , @xJonathanLEI can you please let us know your thoughts? ty

where
I: IntoIterator<Item = (JsonRpcMethod, P)> + Send + Sync,
P: Serialize + Send + Sync;

/// Retrieves blocks information with transaction hashes for a batch of block IDs.
async fn get_block_with_tx_hashes_batch<B>(
&self,
block_ids: Vec<B>,
) -> Result<Vec<MaybePendingBlockWithTxHashes>, ProviderError>
where
B: AsRef<BlockId> + Send + Sync;
}

/// Trait for implementation-specific error type. These errors are irrelevant in most cases,
Expand Down
23 changes: 23 additions & 0 deletions starknet-providers/src/sequencer/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use std::any::Any;

use async_trait::async_trait;
use serde::Serialize;
use starknet_core::types::{
BlockHashAndNumber, BlockId, BroadcastedDeclareTransaction,
BroadcastedDeployAccountTransaction, BroadcastedInvokeTransaction, BroadcastedTransaction,
Expand All @@ -15,6 +16,7 @@ use starknet_core::types::{
};

use crate::{
jsonrpc::JsonRpcMethod,
provider::ProviderImplError,
sequencer::{models::conversions::ConversionError, GatewayClientError},
Provider, ProviderError, SequencerGatewayProvider,
Expand All @@ -30,6 +32,27 @@ impl Provider for SequencerGatewayProvider {
Ok(String::from("0.7.1"))
}

async fn batch_requests<I, P>(
&self,
requests: I,
) -> Result<Vec<serde_json::Value>, ProviderError>
where
I: IntoIterator<Item = (JsonRpcMethod, P)> + Send + Sync,
P: Serialize + Send + Sync,
{
Ok(self.batch_requests(requests).await?)
}

async fn get_block_with_tx_hashes_batch<B>(
&self,
block_ids: Vec<B>,
) -> Result<Vec<MaybePendingBlockWithTxHashes>, ProviderError>
where
B: AsRef<BlockId> + Send + Sync,
{
Ok(self.get_block_with_tx_hashes_batch(block_ids).await?)
}

async fn get_block_with_tx_hashes<B>(
&self,
block_id: B,
Expand Down
Loading