Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Notifications and pubsub transports #10

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion crates/json-rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ exclude.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
alloy-primitives.workspace = true
serde = { workspace = true, features = ["derive"] }
serde_json = { version = "1.0.103", features = ["raw_value"] }
serde_json = { version = "1.0.103", features = ["raw_value"] }
25 changes: 25 additions & 0 deletions crates/json-rpc/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,31 @@ pub enum Id {
None,
}

impl PartialOrd for Id {
Copy link
Member Author

@prestwich prestwich Sep 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allows use as key in btreemaps. implementation is arbitrary, but totally ordered

fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

impl Ord for Id {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
// numbers < strings
// strings < null
// null == null
match (self, other) {
(Id::Number(a), Id::Number(b)) => a.cmp(b),
(Id::Number(_), _) => std::cmp::Ordering::Less,

(Id::String(_), Id::Number(_)) => std::cmp::Ordering::Greater,
(Id::String(a), Id::String(b)) => a.cmp(b),
(Id::String(_), Id::None) => std::cmp::Ordering::Less,

(Id::None, Id::None) => std::cmp::Ordering::Equal,
(Id::None, _) => std::cmp::Ordering::Greater,
}
}
}

impl Id {
/// Returns `true` if the ID is a number.
pub fn is_number(&self) -> bool {
Expand Down
3 changes: 3 additions & 0 deletions crates/json-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@

use serde::{de::DeserializeOwned, Serialize};

mod notification;
pub use notification::{EthNotification, PubSubItem};

mod request;
pub use request::Request;

Expand Down
20 changes: 20 additions & 0 deletions crates/json-rpc/src/notification.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use alloy_primitives::U256;
use serde::{Deserialize, Serialize};

use crate::Response;

/// An ethereum-style notification, not to be confused with a JSON-RPC
/// notification.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct EthNotification<T = Box<serde_json::value::RawValue>> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are distinct from JSON-PRC notifications (which are ID-less requests)

pub subscription: U256,
pub result: T,
}

/// An item received from a pubsub transport.
#[derive(Debug, Clone, Deserialize)]
#[serde(untagged)]
pub enum PubSubItem {
Response(Response),
Notification(EthNotification),
}
32 changes: 25 additions & 7 deletions crates/json-rpc/src/request.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{common::Id, RpcParam};

use serde::{ser::SerializeMap, Deserialize, Serialize};
use serde_json::value::RawValue;

/// A JSON-RPC 2.0 request object.
///
Expand All @@ -12,12 +13,30 @@ use serde::{ser::SerializeMap, Deserialize, Serialize};
///
/// The value of `method` should be known at compile time.
#[derive(Debug, Deserialize, Clone)]
pub struct Request<Params> {
pub struct Request<Params = Box<RawValue>> {
pub method: &'static str,
pub params: Params,
pub id: Id,
}

impl<Params> Request<Params>
where
Params: RpcParam,
{
/// Serialize the request parameters as a boxed [`RawValue`].
///
/// # Panics
///
/// If serialization of the params fails.
pub fn box_params(self) -> Request {
Request {
method: self.method,
params: RawValue::from_string(serde_json::to_string(&self.params).unwrap()).unwrap(),
id: self.id,
}
}
}

// manually implemented to avoid adding a type for the protocol-required
// `jsonrpc` field
impl<Params> Serialize for Request<Params>
Expand All @@ -28,11 +47,14 @@ where
where
S: serde::Serializer,
{
let mut map = serializer.serialize_map(Some(4))?;
let sized_params = std::mem::size_of::<Params>() != 0;

let mut map = serializer.serialize_map(Some(3 + sized_params as usize))?;
map.serialize_entry("method", self.method)?;

// Params may be omitted if it is 0-sized
if !is_zst::<Params>() {
if sized_params {
// TODO: remove unwrap
map.serialize_entry("params", &self.params)?;
}

Expand All @@ -41,7 +63,3 @@ where
map.end()
}
}

fn is_zst<T>() -> bool {
std::mem::size_of::<T>() == 0
}
29 changes: 18 additions & 11 deletions crates/json-rpc/src/response.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::fmt;
use std::{fmt, marker::PhantomData};

use serde::{
de::{MapAccess, Visitor},
Deserialize, Deserializer, Serialize,
};
use serde_json::value::RawValue;

use crate::common::Id;
use crate::{common::Id, RpcReturn};

/// A JSONRPC-2.0 error object.
///
Expand All @@ -32,8 +32,8 @@ pub struct ErrorPayload {
/// This type does not implement `Serialize` or `Deserialize` directly. It is
/// deserialized as part of the [`Response`] type.
#[derive(Debug, Clone)]
pub enum ResponsePayload {
Success(Box<RawValue>),
pub enum ResponsePayload<T = Box<RawValue>> {
Success(T),
Error(ErrorPayload),
}

Expand All @@ -43,12 +43,16 @@ pub enum ResponsePayload {
/// either a successful result or an error. The `id` field is used to match
/// the response to the request that it is responding to, and should be
/// mirrored from the response.
pub struct Response {
#[derive(Debug, Clone)]
pub struct Response<T = Box<RawValue>> {
pub id: Id,
pub payload: ResponsePayload,
pub payload: ResponsePayload<T>,
}

impl<'de> Deserialize<'de> for Response {
impl<'de, T> Deserialize<'de> for Response<T>
where
T: RpcReturn,
{
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
Expand Down Expand Up @@ -90,10 +94,13 @@ impl<'de> Deserialize<'de> for Response {
}
}

struct JsonRpcResponseVisitor;
struct JsonRpcResponseVisitor<T>(PhantomData<T>);

impl<'de> Visitor<'de> for JsonRpcResponseVisitor {
type Value = Response;
impl<'de, T> Visitor<'de> for JsonRpcResponseVisitor<T>
where
T: RpcReturn,
{
type Value = Response<T>;

fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str(
Expand Down Expand Up @@ -153,7 +160,7 @@ impl<'de> Deserialize<'de> for Response {
}
}

deserializer.deserialize_map(JsonRpcResponseVisitor)
deserializer.deserialize_map(JsonRpcResponseVisitor(PhantomData))
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/providers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ where
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
/// Provider is parameterized with a network and a transport. The default
/// transport is type-erased, but you can do `Provider<N, Http>`.
/// transport is type-, but you can do `Provider<N, Http>`.
pub trait Provider<N: Network, T: Transport = BoxTransport>: Send + Sync {
fn raw_client(&self) -> &RpcClient<T> {
&self.client().client
Expand Down
9 changes: 8 additions & 1 deletion crates/transports/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ pin-project.workspace = true

# feature deps
reqwest = { version = "0.11.18", features = ["serde_json", "json"], optional = true }
tokio = { version = "1.32.0", features = ["macros"] }
bimap = "0.6.3"
alloy-primitives.workspace = true
tokio-tungstenite = "0.20.1"
http = "0.2.9"
tracing = "0.1.37"
futures-util.workspace = true

[target.'cfg(not(target_arch = "wasm32"))'.dependencies.hyper]
version = "0.14.27"
Expand All @@ -36,4 +43,4 @@ features = ["full"]
[features]
default = ["reqwest", "hyper"]
reqwest = ["dep:reqwest"]
hyper = ["dep:hyper", "hyper/client"]
hyper = ["dep:hyper", "hyper/client"]
29 changes: 13 additions & 16 deletions crates/transports/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ use std::{
use futures_channel::oneshot;
use serde_json::value::RawValue;

use crate::{error::TransportError, transports::Transport, utils::to_json_raw_value, RpcClient};
use crate::{
error::TransportError,
transports::{SingleRequest, Transport},
RpcClient,
};
use alloy_json_rpc::{Id, Request, Response, RpcParam, RpcResult, RpcReturn};

type Channel = oneshot::Sender<RpcResult<Box<RawValue>, TransportError>>;
Expand All @@ -25,7 +29,7 @@ pub struct BatchRequest<'a, T> {
transport: &'a RpcClient<T>,

/// The requests to be sent.
requests: Vec<Box<RawValue>>,
requests: Vec<SingleRequest>,

/// The channels to send the responses through.
channels: ChannelMap,
Expand Down Expand Up @@ -70,7 +74,7 @@ where
{
Prepared {
transport: Conn,
requests: Vec<Box<RawValue>>,
requests: Vec<SingleRequest>,
channels: ChannelMap,
},
SerError(Option<TransportError>),
Expand All @@ -93,11 +97,10 @@ impl<'a, T> BatchRequest<'a, T> {

fn push_raw(
&mut self,
id: Id,
request: Box<RawValue>,
request: SingleRequest,
) -> oneshot::Receiver<RpcResult<Box<RawValue>, TransportError>> {
let (tx, rx) = oneshot::channel();
self.channels.insert(id, tx);
self.channels.insert(request.id.clone(), tx);
self.requests.push(request);
rx
}
Expand All @@ -106,7 +109,8 @@ impl<'a, T> BatchRequest<'a, T> {
&mut self,
request: Request<Params>,
) -> Result<Waiter<Resp>, TransportError> {
to_json_raw_value(&request).map(|rv| self.push_raw(request.id, rv).into())
let req = request.try_into()?;
Ok(self.push_raw(req).into())
}
}

Expand Down Expand Up @@ -176,17 +180,10 @@ where
// We only have mut refs, and we want ownership, so we just replace
// with 0-capacity collections.
let channels = std::mem::replace(channels, HashMap::with_capacity(0));
let req = std::mem::replace(requests, Vec::with_capacity(0));

let req = match to_json_raw_value(&req) {
Ok(req) => req,
Err(e) => {
self.set(BatchFuture::Complete);
return Poll::Ready(Err(e));
}
};
let req = std::mem::replace(requests, Vec::with_capacity(0));

let fut = transport.call(req);
let fut = transport.call(req.into());
self.set(BatchFuture::AwaitingResponse { channels, fut });
cx.waker().wake_by_ref();
Poll::Pending
Expand Down
13 changes: 11 additions & 2 deletions crates/transports/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,27 @@ pub enum TransportError {
#[error("Missing response in batch request")]
MissingBatchResponse,

/// Custom error
#[error(transparent)]
Custom(Box<dyn StdError + Send + Sync + 'static>),

/// Hyper http transport
/// Hyper http transport.
#[error(transparent)]
#[cfg(feature = "reqwest")]
Reqwest(#[from] reqwest::Error),

/// Hyper http transport
/// Hyper http transport.
#[error(transparent)]
#[cfg(all(not(target_arch = "wasm32"), feature = "hyper"))]
Hyper(#[from] hyper::Error),

/// Tungstenite websocket transport.
#[error(transparent)]
Tungstenite(#[from] tokio_tungstenite::tungstenite::Error),

/// PubSub backend connection task has stopped.
#[error("PubSub backend connection task has stopped.")]
BackendGone,
}

impl TransportError {
Expand Down
6 changes: 5 additions & 1 deletion crates/transports/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@ pub use batch::BatchRequest;
mod transports;
pub use transports::{BoxTransport, Http, Transport};

pub use alloy_json_rpc::RpcResult;
mod pubsub;
pub use pubsub::{
BoxPubSub, ConnectionHandle, ConnectionInterface, PubSub, PubSubConnect, PubSubFrontend,
};

pub(crate) mod utils;

pub use alloy_json_rpc::RpcResult;
pub use type_aliases::*;

#[cfg(not(target_arch = "wasm32"))]
Expand Down
Loading
Loading