Skip to content

Commit

Permalink
Optimisation: Prevent creating multiple ws connections on client (#654)
Browse files Browse the repository at this point in the history
* fix: prevent creating multiple ws connections

* BREAKING: removed rpc_methods from AvailClient

* updated e2e tests to use low-level interface for kate RPC's

* Post merge fix

---------

Co-authored-by: Marko Petrlic <[email protected]>
  • Loading branch information
ToufeeqP and markopoloparadox authored Oct 3, 2024
1 parent 60356e9 commit 38faf63
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 95 deletions.
5 changes: 3 additions & 2 deletions avail-rust/src/sdk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ pub struct SDK {

impl SDK {
pub async fn new(endpoint: &str) -> Result<Self, Box<dyn std::error::Error>> {
let api = Api::from_url(endpoint).await?;
let rpc = Rpc::new(endpoint, true).await?;
// Cloning RpcClient is cheaper and doesn't create a new WS connection.
let api = Api::from_rpc_client(rpc.client.clone()).await?;

Ok(SDK {
tx: Transactions::new(api.clone(), rpc.clone()),
Expand All @@ -22,8 +23,8 @@ impl SDK {
}

pub async fn new_insecure(endpoint: &str) -> Result<Self, Box<dyn std::error::Error>> {
let api = Api::from_insecure_url(endpoint).await?;
let rpc = Rpc::new(endpoint, false).await?;
let api = Api::from_rpc_client(rpc.client.clone()).await?;

Ok(SDK {
tx: Transactions::new(api.clone(), rpc.clone()),
Expand Down
54 changes: 2 additions & 52 deletions avail-subxt/src/avail_client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::AvailConfig;

use core::ops::Deref;
pub use jsonrpsee::core::client::Client as RpcMethods;
use subxt::{
backend::{legacy::LegacyRpcMethods, rpc::RpcClient},
utils::validate_url_is_secure,
Expand All @@ -10,41 +9,24 @@ use subxt::{

#[derive(Debug)]
pub struct AvailClient {
rpc_methods: RpcMethods,
online: OnlineClient<AvailConfig>,
rpc: RpcClient,
}

impl AvailClient {
pub async fn new<U: AsRef<str>>(ws_uri: U) -> Result<Self, Error> {
validate_url_is_secure(ws_uri.as_ref())?;
let rpc_methods = jsonrpsee_helpers::client(ws_uri.as_ref())
.await
.map_err(|e| Error::Other(format!("Client cannot be created: {e:?}")))?;

let rpc = RpcClient::from_url(ws_uri).await?;
let online = OnlineClient::<AvailConfig>::from_rpc_client(rpc.clone()).await?;

Ok(AvailClient {
rpc_methods,
online,
rpc,
})
Ok(AvailClient { online, rpc })
}

pub async fn new_insecure<U: AsRef<str>>(ws_uri: U) -> Result<Self, Error> {
let rpc_methods = jsonrpsee_helpers::client(ws_uri.as_ref())
.await
.map_err(|e| Error::Other(format!("Client cannot be created: {e:?}")))?;

let rpc = RpcClient::from_insecure_url(ws_uri).await?;
let online = OnlineClient::<AvailConfig>::from_rpc_client(rpc.clone()).await?;

Ok(AvailClient {
rpc_methods,
online,
rpc,
})
Ok(AvailClient { online, rpc })
}

pub fn legacy_rpc(&self) -> LegacyRpcMethods<AvailConfig> {
Expand All @@ -58,10 +40,6 @@ impl AvailClient {
pub fn online(&self) -> &OnlineClient<AvailConfig> {
&self.online
}

pub fn rpc_methods(&self) -> &RpcMethods {
&self.rpc_methods
}
}

impl Deref for AvailClient {
Expand All @@ -70,31 +48,3 @@ impl Deref for AvailClient {
&self.online
}
}

// #[cfg(feature = "native")]
mod jsonrpsee_helpers {
pub use jsonrpsee::{
client_transport::ws::{self, EitherStream, Url, WsTransportClientBuilder},
core::client::{Client, Error},
};
use tokio_util::compat::Compat;

pub type Sender = ws::Sender<Compat<EitherStream>>;
pub type Receiver = ws::Receiver<Compat<EitherStream>>;

/// Build WS RPC client from URL
pub async fn client(url: &str) -> Result<Client, Error> {
let (sender, receiver) = ws_transport(url).await?;
Ok(Client::builder()
.max_buffer_capacity_per_subscription(4096)
.build_with_tokio(sender, receiver))
}

async fn ws_transport(url: &str) -> Result<(Sender, Receiver), Error> {
let url = Url::parse(url).map_err(|e| Error::Transport(e.into()))?;
WsTransportClientBuilder::default()
.build(url)
.await
.map_err(|e| Error::Transport(e.into()))
}
}
58 changes: 39 additions & 19 deletions e2e/src/tests/rpc_queries.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,28 @@
use super::{alice_nonce, local_connection, no_concurrency};

use avail_core::data_proof::ProofResponse;
use avail_core::{AppExtrinsic, AppId, Keccak256};
use avail_subxt::{
api::{
self,
runtime_types::avail_core::{
header::extension::HeaderExtension, BlockLengthColumns, BlockLengthRows,
runtime_types::{
avail_core::{header::extension::HeaderExtension, BlockLengthColumns, BlockLengthRows},
frame_system::limits::BlockLength,
},
},
avail::{Cells, GDataProof, GRawScalar, Rows},
rpc::{GProof, KateRpcClient as _},
avail::{Cells, GDataProof, GRawScalar, GRow, Rows},
rpc::GProof,
submit::submit_data_with_nonce as submit_data,
tx,
utils::H256,
AvailClient, Cell,
AvailClient, Cell, RpcParams,
};
use kate::{
com::Cell as KateCell,
gridgen::{AsBytes as _, EvaluationGrid},
};
use kate_recovery::{matrix::Dimensions, proof::verify};
use subxt::Error;
use subxt_signer::sr25519::dev;

use anyhow::{anyhow, Result};
Expand Down Expand Up @@ -51,7 +54,9 @@ async fn eval_grid_from_block(client: &AvailClient, block_hash: H256) -> Result<
app_extrinsics.push(app_extrinsic);
}

let block_len = client.rpc_methods().query_block_length(block_hash).await?;
let mut params = RpcParams::new();
params.push(block_hash)?;
let block_len: BlockLength = client.rpc().request("kate_blockLength", params).await?;
let max_width = block_len.cols.0 as usize;
let max_height = block_len.rows.0 as usize;
let seed = [0u8; 32];
Expand Down Expand Up @@ -112,10 +117,10 @@ pub async fn rpc_query_proof_test() -> Result<()> {
.collect();

// RPC call
let actual_proofs: Vec<GDataProof> = client
.rpc_methods()
.query_proof(Cells::try_from(cells).unwrap(), block_hash)
.await?;
let mut params = RpcParams::new();
params.push(Cells::try_from(cells).unwrap())?;
params.push(block_hash)?;
let actual_proofs: Vec<GDataProof> = client.rpc().request("kate_queryProof", params).await?;

let len = actual_proofs.len();
assert_eq!(actual_proofs.len(), expected_proofs.len());
Expand Down Expand Up @@ -147,7 +152,10 @@ pub async fn rpc_query_proof_test_2() -> Result<()> {
let cells = Cells::try_from(vec![cell.clone()]).unwrap();

// RPC call
let actual_proof: Vec<GDataProof> = client.rpc_methods().query_proof(cells, block_hash).await?;
let mut params = RpcParams::new();
params.push(cells)?;
params.push(block_hash)?;
let actual_proof: Vec<GDataProof> = client.rpc().request("kate_queryProof", params).await?;
let actual_proof: Vec<u8> = actual_proof
.iter()
.flat_map(|(raw_scalar, g_proof)| {
Expand Down Expand Up @@ -191,7 +199,10 @@ pub async fn rpc_query_proof_test_2() -> Result<()> {
let cells = Cells::try_from(vec![cell.clone()]).unwrap();

// RPC call
let actual_proof: Vec<GDataProof> = client.rpc_methods().query_proof(cells, block_hash).await?;
let mut params = RpcParams::new();
params.push(cells)?;
params.push(block_hash)?;
let actual_proof: Vec<GDataProof> = client.rpc().request("kate_queryProof", params).await?;
let actual_proof: Vec<u8> = actual_proof
.iter()
.flat_map(|(raw_scalar, g_proof)| {
Expand Down Expand Up @@ -242,17 +253,21 @@ pub async fn empty_commitments_test() -> Result<()> {

// query_rows should fail for block with empty commitments
let row_indexes = Rows::truncate_from(vec![0]);
let rows = client
.rpc_methods()
.query_rows(row_indexes, block_hash)
.await;
let mut params = RpcParams::new();
params.push(row_indexes)?;
params.push(block_hash)?;
let rows: Result<Vec<GRow>, Error> = client.rpc().request("kate_queryRows", params).await;
assert!(rows.is_err());

// query_proof should fail for block with empty commitments
let cell = Cell { row: 0, col: 0 };
let cells = Cells::try_from(vec![cell.clone()]).unwrap();

let proof = client.rpc_methods().query_proof(cells, block_hash).await;
let mut params = RpcParams::new();
params.push(cells)?;
params.push(block_hash)?;
let proof: Result<Vec<GDataProof>, Error> =
client.rpc().request("kate_queryProof", params).await;
assert!(proof.is_err());
Ok(())
}
Expand All @@ -270,7 +285,9 @@ pub async fn rpc_query_block_length_test() -> Result<()> {
.block_hash();

// RPC call
let length = client.rpc_methods().query_block_length(block_hash).await?;
let mut params = RpcParams::new();
params.push(block_hash)?;
let length: BlockLength = client.rpc().request("kate_blockLength", params).await?;
assert_eq!(length.cols, BlockLengthColumns(256));
assert_eq!(length.rows, BlockLengthRows(256));
assert_eq!(length.chunk_size, 32);
Expand All @@ -292,7 +309,10 @@ pub async fn rpc_query_data_proof_test() -> Result<()> {

let expected_proof_root = merkle_proof::<Keccak256, _, _>(vec![keccak_256(DATA)], 0);

let actual_proof = client.rpc_methods().query_data_proof(1, block_hash).await?;
let mut params = RpcParams::new();
params.push(1)?;
params.push(block_hash)?;
let actual_proof: ProofResponse = client.rpc().request("kate_queryDataProof", params).await?;
// root is calculated keccak256(blob_root, bridge_root)
let mut root_data = vec![];
root_data.extend(expected_proof_root.root.as_bytes());
Expand Down
21 changes: 10 additions & 11 deletions e2e/src/tests/submit_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ use super::{alice_nonce, allow_concurrency, local_connection};

use avail_core::AppId;
use avail_subxt::{
avail::{Cells, Rows, TxInBlock, TxProgress},
avail::{GDataProof, GRow, Rows, TxInBlock, TxProgress},
primitives::Cell,
rpc::KateRpcClient as _,
submit::submit_data_with_nonce,
tx,
tx, RpcParams,
};
use subxt_signer::sr25519::dev;

Expand Down Expand Up @@ -94,10 +93,10 @@ async fn main() -> anyhow::Result<()> {
// Note: Ideal way to get the rows for specific appData, we should use the app_specific_rows from kate recovery, which is out scope for this example
// 1. Check query rows.
let row_indexes = Rows::truncate_from(vec![0]);
let query_rows = client
.rpc_methods()
.query_rows(Rows::truncate_from(row_indexes.to_vec()), block_hash)
.await?;
let mut params = RpcParams::new();
params.push(row_indexes)?;
params.push(block_hash)?;
let query_rows: Vec<GRow> = client.rpc().request("kate_queryRows", params).await?;
trace!("Query rows RPC: {query_rows:?}");

// 3. Check proof.
Expand All @@ -108,10 +107,10 @@ async fn main() -> anyhow::Result<()> {
Cell::new(0, col)
})
.collect::<Vec<_>>();
let proof = client
.rpc_methods()
.query_proof(Cells::truncate_from(cells), block_hash)
.await?;
let mut params = RpcParams::new();
params.push(cells)?;
params.push(block_hash)?;
let proof: Vec<GDataProof> = client.rpc().request("kate_queryProof", params).await?;
trace!("Query proof RPC: {proof:?}");

Ok(())
Expand Down
26 changes: 15 additions & 11 deletions e2e/src/tests/vector_send_msg.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use super::{alice_nonce, allow_concurrency, local_connection};

use avail_core::{
data_proof::{tx_uid, AddressedMessage, BoundedData, Message, SubTrie},
data_proof::{tx_uid, AddressedMessage, BoundedData, Message, ProofResponse, SubTrie},
AppId, Keccak256,
};
use avail_subxt::{
api, avail_client::RpcMethods, rpc::KateRpcClient as _, tx, AccountId, AvailClient,
api, api::runtime_types::frame_system::limits::BlockLength, tx, AccountId, AvailClient,
RpcParams,
};
use subxt::{backend::BlockRef, error::RpcError, utils::H256, Error};
use subxt_signer::sr25519::dev;
Expand Down Expand Up @@ -116,11 +117,8 @@ fn messages_to_leaves(block_number: u32, tx_indexes: Vec<u32>) -> Vec<Leaf> {
.collect::<Vec<_>>()
}

async fn check_query_data_proof_rpc(
rpc_methods: &RpcMethods,
block_hash: H256,
leaves: &[Leaf],
) -> Result<(), Error> {
async fn check_query_data_proof_rpc(block_hash: H256, leaves: &[Leaf]) -> Result<(), Error> {
let client = local_connection().await.unwrap();
let indexed_leafs_len = leaves.len();
for indexed_leaf in leaves {
let Leaf {
Expand All @@ -129,8 +127,12 @@ async fn check_query_data_proof_rpc(
leaf,
} = indexed_leaf;

let rpc_proof = rpc_methods
.query_data_proof(*tx_idx, block_hash)
let mut params = RpcParams::new();
params.push(*tx_idx)?;
params.push(block_hash)?;
let rpc_proof: ProofResponse = client
.rpc()
.request("kate_queryDataProof", params)
.await
.map_err(|je| RpcError::ClientError(Box::new(je)))?;
let bridge_root = rpc_proof.data_proof.roots.bridge_root;
Expand Down Expand Up @@ -167,10 +169,12 @@ async fn vector_send_msg() -> anyhow::Result<()> {
let indexed_leaves = messages_to_leaves(block_number, tx_indexes);

// 2. Use Kate to get the proof and double-check it.
check_query_data_proof_rpc(client.rpc_methods(), block_hash, &indexed_leaves).await?;
check_query_data_proof_rpc(block_hash, &indexed_leaves).await?;

// 3. Test query_block len RPC.
let block_len = client.rpc_methods().query_block_length(block_hash).await?;
let mut params = RpcParams::new();
params.push(block_hash)?;
let block_len: BlockLength = client.rpc().request("kate_blockLength", params).await?;
trace!(
"Test query_block_length RPC: cols={}, rows={}",
block_len.cols.0,
Expand Down

0 comments on commit 38faf63

Please sign in to comment.