From 3e084a53a97bf5d876d5540824e4d26030034cdd Mon Sep 17 00:00:00 2001 From: thetheveloper Date: Sat, 15 Jun 2024 14:00:08 +0200 Subject: [PATCH] feat: implement JsonRpc transport for batch requests --- examples/jsonrpc_batch.rs | 57 +++++++++++++++ starknet-providers/src/any.rs | 30 +++++++- starknet-providers/src/jsonrpc/mod.rs | 72 +++++++++++++++++++ .../src/jsonrpc/transports/http.rs | 42 +++++++++++ .../src/jsonrpc/transports/mod.rs | 10 +++ starknet-providers/src/provider.rs | 32 +++++++++ starknet-providers/src/sequencer/provider.rs | 23 ++++++ 7 files changed, 265 insertions(+), 1 deletion(-) create mode 100644 examples/jsonrpc_batch.rs diff --git a/examples/jsonrpc_batch.rs b/examples/jsonrpc_batch.rs new file mode 100644 index 00000000..4fbd2f68 --- /dev/null +++ b/examples/jsonrpc_batch.rs @@ -0,0 +1,57 @@ +use starknet::{ + core::types::{BlockId, BlockTag}, + providers::{ + jsonrpc::{HttpTransport, JsonRpcClient}, + Provider, Url, + }, +}; +use starknet_core::types::requests::{GetBlockWithTxHashesRequestRef, GetBlockWithTxsRequestRef}; +use starknet_providers::jsonrpc::{JsonRpcMethod, JsonRpcRequestParams}; + +#[tokio::main] +async fn main() { + // Create a new JSON RPC client using HTTP transport with the specified URL + let provider = JsonRpcClient::new(HttpTransport::new( + Url::parse("https://starknet-sepolia.public.blastapi.io/rpc/v0_7").unwrap(), + )); + + // batch_requests allows to define a vector of requests for batch processing, ensuring each request specifies its corresponding JsonRpcMethod and JsonRpcRequestParams. + // This approach allows for a generic way to handle batch requests. + let batch_mixed_results = provider + .batch_requests(vec![ + // Request 1: Retrieve block data including transaction hashes. + ( + JsonRpcMethod::GetBlockWithTxHashes, + JsonRpcRequestParams::GetBlockWithTxHashes(GetBlockWithTxHashesRequestRef { + block_id: BlockId::Tag(BlockTag::Latest).as_ref(), + }), + ), + // Request 2: Retrieve block data including full transaction details. + ( + JsonRpcMethod::GetBlockWithTxs, + JsonRpcRequestParams::GetBlockWithTxs(GetBlockWithTxsRequestRef { + block_id: BlockId::Tag(BlockTag::Latest).as_ref(), + }), + ), + ]) + .await; + + match batch_mixed_results { + Ok(v) => println!("{v:#?}"), + Err(e) => println!("Error: {e:#?}"), + } + + // The following example demonstrates the process of sending a batch request to retrieve multiple blocks, each including transaction hashes. + // get_block_with_tx_hashes_batch utilizes a vector of BlockId parameters to construct the batch request. + let batched_blocks = provider + .get_block_with_tx_hashes_batch(vec![ + BlockId::Tag(BlockTag::Latest), + BlockId::Tag(BlockTag::Latest), + ]) + .await; + + match batched_blocks { + Ok(v) => println!("{v:#?}"), + Err(e) => println!("Error: {e:#?}"), + } +} diff --git a/starknet-providers/src/any.rs b/starknet-providers/src/any.rs index 5a10711f..99f1ba33 100644 --- a/starknet-providers/src/any.rs +++ b/starknet-providers/src/any.rs @@ -1,4 +1,5 @@ use async_trait::async_trait; +use serde::Serialize; use starknet_core::types::{ BlockHashAndNumber, BlockId, BroadcastedDeclareTransaction, BroadcastedDeployAccountTransaction, BroadcastedInvokeTransaction, BroadcastedTransaction, @@ -11,7 +12,7 @@ use starknet_core::types::{ }; use crate::{ - jsonrpc::{HttpTransport, JsonRpcClient}, + jsonrpc::{HttpTransport, JsonRpcClient, JsonRpcMethod}, Provider, ProviderError, SequencerGatewayProvider, }; @@ -665,4 +666,31 @@ impl Provider for AnyProvider { } } } + + async fn batch_requests( + &self, + requests: I, + ) -> Result, ProviderError> + where + I: IntoIterator + Send + Sync, + P: Serialize + Send + Sync, + { + match self { + Self::JsonRpcHttp(inner) => inner.batch_requests(requests).await, + Self::SequencerGateway(inner) => inner.batch_requests(requests).await, + } + } + + async fn get_block_with_tx_hashes_batch( + &self, + block_ids: Vec, + ) -> Result, ProviderError> + where + B: AsRef + Send + Sync, + { + match self { + Self::JsonRpcHttp(inner) => inner.get_block_with_tx_hashes_batch(block_ids).await, + Self::SequencerGateway(inner) => inner.get_block_with_tx_hashes_batch(block_ids).await, + } + } } diff --git a/starknet-providers/src/jsonrpc/mod.rs b/starknet-providers/src/jsonrpc/mod.rs index 2d6013ed..595d5b50 100644 --- a/starknet-providers/src/jsonrpc/mod.rs +++ b/starknet-providers/src/jsonrpc/mod.rs @@ -126,6 +126,16 @@ pub enum JsonRpcMethod { } /// JSON-RPC request. +#[derive(Debug, Clone, Serialize)] +#[serde(untagged)] +pub enum JsonRpcRequestParams<'a> { + /// Parameters for getting a block with transaction hashes. + GetBlockWithTxHashes(GetBlockWithTxHashesRequestRef<'a>), + /// Parameters for getting a block with full transactions. + GetBlockWithTxs(GetBlockWithTxsRequestRef<'a>), +} + +/// Represents a JSON-RPC request with a unique identifier. #[derive(Debug, Clone)] pub struct JsonRpcRequest { /// ID of the request. Useful for identifying responses in certain transports like `WebSocket`. @@ -136,6 +146,13 @@ pub struct JsonRpcRequest { /// Typed request data for Starknet JSON-RPC requests. #[derive(Debug, Clone)] +pub struct JsonRpcRequests { + /// A list of JSON-RPC requests. + pub requests: Vec, +} + +/// Represents the data for various JSON-RPC requests +#[derive(Debug, Clone, Serialize)] pub enum JsonRpcRequestData { /// Request data for `starknet_specVersion`. SpecVersion(SpecVersionRequest), @@ -303,6 +320,32 @@ where } } } + + async fn send_requests(&self, requests: I) -> Result, ProviderError> + where + I: IntoIterator + Send + Sync, + P: Serialize + Send + Sync, + R: DeserializeOwned, + { + let responses = self + .transport + .send_requests(requests) + .await + .map_err(JsonRpcClientError::TransportError)?; + + responses + .into_iter() + .map(|response| match response { + JsonRpcResponse::Success { result, .. } => Ok(result), + JsonRpcResponse::Error { error, .. } => { + Err(match TryInto::::try_into(&error) { + Ok(error) => ProviderError::StarknetError(error), + Err(_) => JsonRpcClientError::::JsonRpcError(error).into(), + }) + } + }) + .collect() + } } #[cfg_attr(not(target_arch = "wasm32"), async_trait)] @@ -317,6 +360,35 @@ where .await } + async fn batch_requests( + &self, + requests: I, + ) -> Result, ProviderError> + where + I: IntoIterator + Send + Sync, + P: Serialize + Send + Sync, + { + self.send_requests(requests).await + } + + async fn get_block_with_tx_hashes_batch( + &self, + block_ids: Vec, + ) -> Result, ProviderError> + where + B: AsRef + Send + Sync, + { + let requests = block_ids.iter().map(|block_id| { + ( + JsonRpcMethod::GetBlockWithTxHashes, + GetBlockWithTxHashesRequestRef { + block_id: block_id.as_ref(), + }, + ) + }); + self.send_requests(requests).await + } + /// Get block information with transaction hashes given the block id async fn get_block_with_tx_hashes( &self, diff --git a/starknet-providers/src/jsonrpc/transports/http.rs b/starknet-providers/src/jsonrpc/transports/http.rs index 4663b859..c7067af7 100644 --- a/starknet-providers/src/jsonrpc/transports/http.rs +++ b/starknet-providers/src/jsonrpc/transports/http.rs @@ -110,4 +110,46 @@ impl JsonRpcTransport for HttpTransport { Ok(parsed_response) } + + async fn send_requests( + &self, + requests: I, + ) -> Result>, Self::Error> + where + I: IntoIterator + Send, + P: Serialize + Send, + R: DeserializeOwned, + { + let batch_requests: Vec<_> = requests + .into_iter() + .enumerate() + .map(|(id, (method, params))| JsonRpcRequest { + id: id as u64 + 1, + jsonrpc: "2.0", + method, + params, + }) + .collect(); + + let serialized_batch = serde_json::to_string(&batch_requests).map_err(Self::Error::Json)?; + + let mut request = self + .client + .post(self.url.clone()) + .body(serialized_batch) + .header("Content-Type", "application/json"); + + for (name, value) in &self.headers { + request = request.header(name, value); + } + + let response = request.send().await.map_err(Self::Error::Reqwest)?; + + let response_body = response.text().await.map_err(Self::Error::Reqwest)?; + trace!("Response from JSON-RPC: {}", response_body); + + let parsed_response = serde_json::from_str(&response_body).map_err(Self::Error::Json)?; + + Ok(parsed_response) + } } diff --git a/starknet-providers/src/jsonrpc/transports/mod.rs b/starknet-providers/src/jsonrpc/transports/mod.rs index c7602c98..04816c02 100644 --- a/starknet-providers/src/jsonrpc/transports/mod.rs +++ b/starknet-providers/src/jsonrpc/transports/mod.rs @@ -26,4 +26,14 @@ pub trait JsonRpcTransport { where P: Serialize + Send + Sync, R: DeserializeOwned; + + /// Sends multiple JSON-RPC requests and retrieves their responses. + async fn send_requests( + &self, + requests: I, + ) -> Result>, Self::Error> + where + I: IntoIterator + Send + Sync, + P: Serialize + Send + Sync, + R: DeserializeOwned; } diff --git a/starknet-providers/src/provider.rs b/starknet-providers/src/provider.rs index 94d853c3..36cc17da 100644 --- a/starknet-providers/src/provider.rs +++ b/starknet-providers/src/provider.rs @@ -1,5 +1,6 @@ use async_trait::async_trait; use auto_impl::auto_impl; +use serde::Serialize; use starknet_core::types::{ BlockHashAndNumber, BlockId, BroadcastedDeclareTransaction, BroadcastedDeployAccountTransaction, BroadcastedInvokeTransaction, BroadcastedTransaction, @@ -22,6 +23,9 @@ use std::{any::Any, error::Error, fmt::Debug}; /// The legacy [`SequencerGatewayProvider`](crate::sequencer::SequencerGatewayProvider) still /// implements this trait for backward compatibility reasons, but most of its methods no longer work /// in practice, as public sequencer servers have generally block access to most methods. +use crate::jsonrpc::JsonRpcMethod; + +/// Represents a provider interface for interacting with the Starknet network. #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] #[auto_impl(&, Box, Arc)] @@ -311,6 +315,34 @@ pub trait Provider { Err(ProviderError::ArrayLengthMismatch) } } + + /// Executes a batch of JSON-RPC requests concurrently. + /// + /// This method takes an iterator of requests, where each request is a tuple consisting of a + /// JSON-RPC method and its parameters. It returns a vector of results, each encoded as a + /// `serde_json::Value`. + /// + /// # Type Parameters + /// - `I`: An iterator type where each item is a tuple containing a `JsonRpcMethod` and parameters `P`. + /// - `P`: The type of the parameters to be serialized for the JSON-RPC request. + /// + /// # Errors + /// Returns `ProviderError` if any of the requests fail. + async fn batch_requests( + &self, + requests: I, + ) -> Result, ProviderError> + where + I: IntoIterator + Send + Sync, + P: Serialize + Send + Sync; + + /// Retrieves blocks information with transaction hashes for a batch of block IDs. + async fn get_block_with_tx_hashes_batch( + &self, + block_ids: Vec, + ) -> Result, ProviderError> + where + B: AsRef + Send + Sync; } /// Trait for implementation-specific error type. These errors are irrelevant in most cases, diff --git a/starknet-providers/src/sequencer/provider.rs b/starknet-providers/src/sequencer/provider.rs index 313cda9c..fadb46c9 100644 --- a/starknet-providers/src/sequencer/provider.rs +++ b/starknet-providers/src/sequencer/provider.rs @@ -3,6 +3,7 @@ use std::any::Any; use async_trait::async_trait; +use serde::Serialize; use starknet_core::types::{ BlockHashAndNumber, BlockId, BroadcastedDeclareTransaction, BroadcastedDeployAccountTransaction, BroadcastedInvokeTransaction, BroadcastedTransaction, @@ -15,6 +16,7 @@ use starknet_core::types::{ }; use crate::{ + jsonrpc::JsonRpcMethod, provider::ProviderImplError, sequencer::{models::conversions::ConversionError, GatewayClientError}, Provider, ProviderError, SequencerGatewayProvider, @@ -30,6 +32,27 @@ impl Provider for SequencerGatewayProvider { Ok(String::from("0.7.1")) } + async fn batch_requests( + &self, + requests: I, + ) -> Result, ProviderError> + where + I: IntoIterator + Send + Sync, + P: Serialize + Send + Sync, + { + Ok(self.batch_requests(requests).await?) + } + + async fn get_block_with_tx_hashes_batch( + &self, + block_ids: Vec, + ) -> Result, ProviderError> + where + B: AsRef + Send + Sync, + { + Ok(self.get_block_with_tx_hashes_batch(block_ids).await?) + } + async fn get_block_with_tx_hashes( &self, block_id: B,