Skip to content

Commit

Permalink
docs and misc convenience
Browse files Browse the repository at this point in the history
  • Loading branch information
prestwich committed Aug 21, 2023
1 parent 047ba1d commit c5ce15b
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 32 deletions.
6 changes: 3 additions & 3 deletions crates/middleware/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl<N: Network, T: Transport + Clone> Middleware<N, T> for RpcClient<T> {
&'s self,
tx: &'a <N as Network>::TransactionRequest,
) -> MwareFut<'fut, alloy_primitives::U256, TransportError> {
self.prepare("eth_estimateGas", Cow::Borrowed(tx)).box_pin()
self.prepare("eth_estimateGas", Cow::Borrowed(tx)).boxed()
}

fn get_transaction_count<'s: 'fut, 'a: 'fut, 'fut>(
Expand All @@ -93,15 +93,15 @@ impl<N: Network, T: Transport + Clone> Middleware<N, T> for RpcClient<T> {
"eth_getTransactionCount",
Cow::<(Address, &'static str)>::Owned((address, "latest")),
)
.box_pin()
.boxed()
}

fn send_transaction<'s: 'fut, 'a: 'fut, 'fut>(
&'s self,
tx: &'a N::TransactionRequest,
) -> MwareFut<'fut, N::Receipt, TransportError> {
self.prepare("eth_sendTransaction", Cow::Borrowed(tx))
.box_pin()
.boxed()
}
}

Expand Down
40 changes: 20 additions & 20 deletions crates/transports/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
future::{Future, IntoFuture},
marker::PhantomData,
pin::Pin,
task::{self, ready},
task::{self, ready, Poll},
};

use futures_channel::oneshot;
Expand All @@ -17,7 +17,8 @@ type Channel = oneshot::Sender<RpcResult<Box<RawValue>, TransportError>>;
type ChannelMap = HashMap<Id, Channel>;

#[must_use = "A BatchRequest does nothing unless sent via `send_batch` and `.await`"]
/// A Batch JSON-RPC request, awaiting dispatch.
/// A batch JSON-RPC request, used to bundle requests into a single transport
/// call.
#[derive(Debug)]
pub struct BatchRequest<'a, T> {
transport: &'a RpcClient<T>,
Expand Down Expand Up @@ -48,13 +49,10 @@ where
{
type Output = RpcResult<Resp, TransportError>;

fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> task::Poll<Self::Output> {
fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
let resp = ready!(Pin::new(&mut self.rx).poll(cx));

task::Poll::Ready(match resp {
Poll::Ready(match resp {
Ok(resp) => resp.deser_ok_or_else(|e, text| TransportError::deser_err(e, text)),
Err(e) => RpcResult::Err(TransportError::Custom(Box::new(e))),
})
Expand Down Expand Up @@ -157,7 +155,7 @@ where
fn poll_prepared(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> task::Poll<<Self as Future>::Output> {
) -> Poll<<Self as Future>::Output> {
let CallStateProj::Prepared {
transport,
requests,
Expand All @@ -169,7 +167,7 @@ where

if let Err(e) = task::ready!(transport.poll_ready(cx)) {
self.set(BatchFuture::Complete);
return task::Poll::Ready(Err(e));
return Poll::Ready(Err(e));
}

// We only have mut refs, and we want ownership, so we just replace
Expand All @@ -181,20 +179,20 @@ where
Ok(req) => req,
Err(e) => {
self.set(BatchFuture::Complete);
return task::Poll::Ready(Err(e));
return Poll::Ready(Err(e));
}
};

let fut = transport.call(req);
self.set(BatchFuture::AwaitingResponse { channels, fut });
cx.waker().wake_by_ref();
task::Poll::Pending
Poll::Pending
}

fn poll_awaiting_response(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> task::Poll<<Self as Future>::Output> {
) -> Poll<<Self as Future>::Output> {
let CallStateProj::AwaitingResponse { channels, fut } = self.as_mut().project() else {
unreachable!("Called poll_awaiting_response in incorrect state")
};
Expand All @@ -204,46 +202,48 @@ where
Ok(responses) => responses,
Err(e) => {
self.set(BatchFuture::Complete);
return task::Poll::Ready(Err(e));
return Poll::Ready(Err(e));
}
};

let responses: Vec<JsonRpcResponse> = match serde_json::from_str(responses.get()) {
Ok(responses) => responses,
Err(err) => {
self.set(BatchFuture::Complete);
return task::Poll::Ready(Err(TransportError::deser_err(err, responses.get())));
return Poll::Ready(Err(TransportError::deser_err(err, responses.get())));
}
};

// Drain the responses into the channels.
// Send the responses via the channels by removing the channels from
// the map.
for response in responses {
if let Some(tx) = channels.remove(&response.id) {
let _ = tx.send(RpcResult::from(response));
}
}

// Any remaining channels are missing responses.
// Any channels remaining in the map are missing responses. To avoid
// hanging futures, we send an error.
channels.drain().for_each(|(_, tx)| {
let _ = tx.send(RpcResult::Err(TransportError::MissingBatchResponse));
});

self.set(BatchFuture::Complete);
task::Poll::Ready(Ok(()))
Poll::Ready(Ok(()))
}

fn poll_ser_error(
mut self: Pin<&mut Self>,
_cx: &mut task::Context<'_>,
) -> task::Poll<<Self as Future>::Output> {
) -> Poll<<Self as Future>::Output> {
let e = if let CallStateProj::SerError(e) = self.as_mut().project() {
e.take().expect("No error. This is a bug.")
} else {
unreachable!("Called poll_ser_error in incorrect state")
};

self.set(BatchFuture::Complete);
task::Poll::Ready(Err(e))
Poll::Ready(Err(e))
}
}

Expand All @@ -254,7 +254,7 @@ where
{
type Output = Result<(), TransportError>;

fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
if matches!(*self.as_mut(), BatchFuture::Prepared { .. }) {
return self.poll_prepared(cx);
}
Expand Down
33 changes: 32 additions & 1 deletion crates/transports/src/call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ use crate::{
};

use alloy_json_rpc::{JsonRpcRequest, RpcParam, RpcResult, RpcReturn};
use core::panic;
use serde_json::value::RawValue;
use std::{future::Future, marker::PhantomData, pin::Pin, task};
use tower::{Layer, Service};

/// The states of the [`RpcCall`] future.
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[pin_project::pin_project(project = CallStateProj)]
enum CallState<Params, Conn>
Expand Down Expand Up @@ -97,6 +99,21 @@ where
}
}

/// A prepared, but unsent, RPC call.
///
/// This is a future that will send the request when polled. It contains a
/// [`JsonRpcRequest`], a [`Transport`], and knowledge of its expected response
/// type. Upon awaiting, it will send the request and wait for the response. It
/// will then deserialize the response into the expected type.
///
/// Errors are captured in the [`RpcResult`] type. Rpc Calls will result in
/// either a successful response of the `Resp` type, an error response, or a
/// transport error.
///
/// ### Note:
///
/// Serializing the request is done lazily. The request is not serialized until
/// the future is polled.
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[pin_project::pin_project]
pub struct RpcCall<Conn, Params, Resp>
Expand All @@ -116,6 +133,7 @@ where
Conn::Future: Send,
Params: RpcParam,
{
#[doc(hidden)]
pub fn new(req: JsonRpcRequest<Params>, connection: Conn) -> Self {
Self {
state: CallState::Prepared {
Expand All @@ -125,6 +143,18 @@ where
_pd: PhantomData,
}
}

/// Get a mutable reference to the params of the request.
///
/// This is useful for modifying the params after the request has been
/// prepared.
pub fn params(&mut self) -> &mut Params {
if let CallState::Prepared { request, .. } = &mut self.state {
&mut request.as_mut().unwrap().params
} else {
panic!("Cannot get params after request has been sent");
}
}
}

impl<'a, Conn, Params, Resp> RpcCall<Conn, Params, Resp>
Expand All @@ -134,7 +164,8 @@ where
Params: RpcParam + 'a,
Resp: RpcReturn,
{
pub fn box_pin(
/// Convert this future into a boxed, pinned future, erasing its type.
pub fn boxed(
self,
) -> Pin<Box<dyn Future<Output = RpcResult<Resp, TransportError>> + Send + 'a>> {
Box::pin(self)
Expand Down
Loading

0 comments on commit c5ce15b

Please sign in to comment.