From 799869471ab1b50cb90bbbb495b216e6ff34b8a3 Mon Sep 17 00:00:00 2001 From: James Date: Fri, 1 Dec 2023 17:38:39 -0800 Subject: [PATCH 01/13] feature: interprocess-based IPC --- Cargo.toml | 1 + crates/transport-ipc/Cargo.toml | 26 +++++ crates/transport-ipc/README.md | 3 + crates/transport-ipc/src/connect.rs | 52 +++++++++ crates/transport-ipc/src/lib.rs | 171 ++++++++++++++++++++++++++++ 5 files changed, 253 insertions(+) create mode 100644 crates/transport-ipc/Cargo.toml create mode 100644 crates/transport-ipc/README.md create mode 100644 crates/transport-ipc/src/connect.rs create mode 100644 crates/transport-ipc/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index a332db33199..9e809fbd9ea 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ alloy-json-rpc = { version = "0.1.0", path = "crates/json-rpc" } alloy-transport = { version = "0.1.0", path = "crates/transport" } alloy-pubsub = { version = "0.1.0", path = "crates/pubsub" } alloy-transport-http = { version = "0.1.0", path = "crates/transport-http" } +alloy-transport-ipc = { version = "0.1.0", path = "crates/transport-ipc" } alloy-transport-ws = { version = "0.1.0", path = "crates/transport-ws" } alloy-networks = { version = "0.1.0", path = "crates/networks" } alloy-rpc-types = { version = "0.1.0", path = "crates/rpc-types" } diff --git a/crates/transport-ipc/Cargo.toml b/crates/transport-ipc/Cargo.toml new file mode 100644 index 00000000000..3f78afdf90b --- /dev/null +++ b/crates/transport-ipc/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "alloy-transport-ipc" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +authors.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true +exclude.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +alloy-json-rpc.workspace = true +alloy-transport.workspace = true +alloy-pubsub.workspace = true + +futures.workspace = true +pin-project.workspace = true +serde_json.workspace = true +tokio.workspace = true +tracing.workspace = true + +bytes = "1.5.0" +interprocess = { version = "1.2.1", features = ["tokio", "tokio_support"] } \ No newline at end of file diff --git a/crates/transport-ipc/README.md b/crates/transport-ipc/README.md new file mode 100644 index 00000000000..5305aaa6414 --- /dev/null +++ b/crates/transport-ipc/README.md @@ -0,0 +1,3 @@ +# alloy-transport-ipc + +IPC transport implementation. diff --git a/crates/transport-ipc/src/connect.rs b/crates/transport-ipc/src/connect.rs new file mode 100644 index 00000000000..f905b76056e --- /dev/null +++ b/crates/transport-ipc/src/connect.rs @@ -0,0 +1,52 @@ +use std::{ + ffi::{CString, OsString}, + path::PathBuf, +}; + +#[derive(Debug, Clone)] +/// An IPC Connection object. +pub struct IpcConnect { + /// + inner: T, +} + +macro_rules! impl_connect { + ($target:ty) => { + impl From<$target> for IpcConnect<$target> { + fn from(inner: $target) -> Self { + Self { inner } + } + } + + impl From> for $target { + fn from(this: IpcConnect<$target>) -> $target { + this.inner + } + } + + impl alloy_pubsub::PubSubConnect for IpcConnect<$target> { + fn is_local(&self) -> bool { + true + } + + fn connect<'a: 'b, 'b>( + &'a self, + ) -> alloy_transport::Pbf< + 'b, + alloy_pubsub::ConnectionHandle, + alloy_transport::TransportError, + > { + Box::pin(async move { + crate::IpcBackend::connect(&self.inner) + .await + .map_err(alloy_transport::TransportErrorKind::custom) + }) + } + } + }; +} + +impl_connect!(OsString); +impl_connect!(CString); +impl_connect!(PathBuf); +impl_connect!(String); diff --git a/crates/transport-ipc/src/lib.rs b/crates/transport-ipc/src/lib.rs new file mode 100644 index 00000000000..375ce6ff51c --- /dev/null +++ b/crates/transport-ipc/src/lib.rs @@ -0,0 +1,171 @@ +#![doc = include_str!("../README.md")] +#![doc( + html_logo_url = "https://raw.githubusercontent.com/alloy-rs/core/main/assets/alloy.jpg", + html_favicon_url = "https://raw.githubusercontent.com/alloy-rs/core/main/assets/favicon.ico" +)] +#![warn( + missing_copy_implementations, + missing_debug_implementations, + missing_docs, + unreachable_pub, + clippy::missing_const_for_fn, + rustdoc::all +)] +#![cfg_attr(not(test), warn(unused_crate_dependencies))] +#![deny(unused_must_use, rust_2018_idioms)] +#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] + +mod connect; +pub use connect::IpcConnect; + +use std::task::Poll::{Pending, Ready}; + +use alloy_json_rpc::PubSubItem; +use bytes::{Buf, BytesMut}; +use futures::{ready, AsyncRead, AsyncWriteExt, StreamExt}; +use interprocess::local_socket::{ + tokio::{LocalSocketStream, OwnedReadHalf}, + ToLocalSocketName, +}; +use tokio::select; + +type Result = std::result::Result; + +/// An IPC backend task. +struct IpcBackend { + pub(crate) socket: LocalSocketStream, + + pub(crate) interface: alloy_pubsub::ConnectionInterface, +} + +impl IpcBackend { + /// Connect to a local socket. Either a unix socket or a windows named pipe. + async fn connect<'a, I>(name: &I) -> Result + where + // TODO: remove bound on next interprocess crate release + I: ToLocalSocketName<'a> + Clone, + { + let socket = LocalSocketStream::connect(name.clone()).await?; + let (handle, interface) = alloy_pubsub::ConnectionHandle::new(); + + let backend = IpcBackend { socket, interface }; + + backend.spawn(); + + Ok(handle) + } + + fn spawn(mut self) { + let fut = async move { + let (read, mut writer) = self.socket.into_split(); + let mut read = ReadJsonStream::new(read).fuse(); + + let err = loop { + select! { + biased; + item = self.interface.recv_from_frontend() => { + match item { + Some(msg) => { + let bytes = msg.get(); + if let Err(e) = writer.write_all(bytes.as_bytes()).await { + tracing::error!(%e, "Failed to write to IPC socket"); + break true; + } + }, + // dispatcher has gone away, or shutdown was received + None => break false, + } + } + // Read from the socket. + item = read.next() => { + match item { + Some(item) => { + if self.interface.send_to_frontend(item).is_err() { + // frontend has gone away + break false; + } + } + None => { + break true; + } + } + } + } + }; + if err { + self.interface.close_with_error(); + } + }; + + tokio::spawn(fut); + } +} + +#[pin_project::pin_project] +struct ReadJsonStream { + #[pin] + reader: OwnedReadHalf, + buf: BytesMut, + done: bool, + items: Vec, +} + +impl ReadJsonStream { + fn new(reader: OwnedReadHalf) -> Self { + Self { reader, buf: BytesMut::with_capacity(4096), done: false, items: vec![] } + } +} + +impl futures::stream::Stream for ReadJsonStream { + type Item = alloy_json_rpc::PubSubItem; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + if self.done { + return Ready(None); + } + + let this = self.project(); + + // Deserialize any buffered items. + if !this.buf.is_empty() { + let mut de = serde_json::Deserializer::from_slice(this.buf.as_ref()).into_iter(); + + let item = de.next(); + match item { + Some(Ok(response)) => { + this.items.push(response); + } + Some(Err(e)) => { + tracing::error!(%e, "Failed to deserialize IPC response"); + *this.done = true; + return Ready(None); + } + None => {} + } + this.buf.advance(de.byte_offset()); + } + + if !this.items.is_empty() { + // may have more work! + cx.waker().wake_by_ref(); + return Ready(this.items.pop()); + } + + let data = ready!(this.reader.poll_read(cx, this.buf)); + match data { + Ok(0) | Err(_) => { + *this.done = true; + return Ready(None); + } + _ => { + // wake task to run deserialization + cx.waker().wake_by_ref(); + } + } + + Pending + } +} From c94cb4213c0be9df078eaeaceb420c1bf80d5557 Mon Sep 17 00:00:00 2001 From: James Date: Fri, 1 Dec 2023 17:43:18 -0800 Subject: [PATCH 02/13] cleanup: tracing and flow --- crates/transport-ipc/src/lib.rs | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/crates/transport-ipc/src/lib.rs b/crates/transport-ipc/src/lib.rs index 375ce6ff51c..9a302cce670 100644 --- a/crates/transport-ipc/src/lib.rs +++ b/crates/transport-ipc/src/lib.rs @@ -73,7 +73,10 @@ impl IpcBackend { } }, // dispatcher has gone away, or shutdown was received - None => break false, + None => { + tracing::debug!("Frontend has gone away"); + break false; + }, } } // Read from the socket. @@ -81,11 +84,12 @@ impl IpcBackend { match item { Some(item) => { if self.interface.send_to_frontend(item).is_err() { - // frontend has gone away + tracing::debug!("Frontend has gone away"); break false; } } None => { + tracing::error!("Read stream has failed."); break true; } } @@ -106,13 +110,12 @@ struct ReadJsonStream { #[pin] reader: OwnedReadHalf, buf: BytesMut, - done: bool, items: Vec, } impl ReadJsonStream { fn new(reader: OwnedReadHalf) -> Self { - Self { reader, buf: BytesMut::with_capacity(4096), done: false, items: vec![] } + Self { reader, buf: BytesMut::with_capacity(4096), items: vec![] } } } @@ -123,10 +126,6 @@ impl futures::stream::Stream for ReadJsonStream { self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - if self.done { - return Ready(None); - } - let this = self.project(); // Deserialize any buffered items. @@ -139,8 +138,7 @@ impl futures::stream::Stream for ReadJsonStream { this.items.push(response); } Some(Err(e)) => { - tracing::error!(%e, "Failed to deserialize IPC response"); - *this.done = true; + tracing::error!(%e, "IPC response contained invalid JSON"); return Ready(None); } None => {} @@ -156,8 +154,12 @@ impl futures::stream::Stream for ReadJsonStream { let data = ready!(this.reader.poll_read(cx, this.buf)); match data { - Ok(0) | Err(_) => { - *this.done = true; + Ok(0) => { + tracing::debug!("IPC socket closed"); + return Ready(None); + } + Err(e) => { + tracing::error!(%e, "Failed to read from IPC socket"); return Ready(None); } _ => { From 3d33512b87800aafce8fb5da471a55b04623581b Mon Sep 17 00:00:00 2001 From: James Date: Fri, 1 Dec 2023 17:49:32 -0800 Subject: [PATCH 03/13] feat: log buffer contents --- crates/transport-ipc/src/lib.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/crates/transport-ipc/src/lib.rs b/crates/transport-ipc/src/lib.rs index 9a302cce670..a2cd85bb777 100644 --- a/crates/transport-ipc/src/lib.rs +++ b/crates/transport-ipc/src/lib.rs @@ -138,7 +138,12 @@ impl futures::stream::Stream for ReadJsonStream { this.items.push(response); } Some(Err(e)) => { - tracing::error!(%e, "IPC response contained invalid JSON"); + tracing::error!(%e, "IPC response contained invalid JSON. Buffer contents will be logged at trace level"); + tracing::trace!( + buffer = %String::from_utf8_lossy(this.buf.as_ref()), + "IPC response contained invalid JSON. NOTE: Buffer contents do not include invalid utf8.", + ); + return Ready(None); } None => {} From 099573985daab4db45cbade0db780364d0f4a3ee Mon Sep 17 00:00:00 2001 From: Enrique Ortiz Date: Wed, 6 Dec 2023 14:31:10 -0400 Subject: [PATCH 04/13] chore: do not run wasm check on alloy-transport-ipc --- .github/workflows/ci.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3934561434f..8879941e042 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -54,7 +54,8 @@ jobs: with: cache-on-failure: true - name: check - run: cargo check --workspace --target wasm32-unknown-unknown + # Do not run WASM checks on IPC as it doesn't make sense + run: cargo check --workspace --target wasm32-unknown-unknown --exclude "alloy-transport-ipc" feature-checks: runs-on: ubuntu-latest From b92525fcc77b6e9918efb2975875a875cd2853fd Mon Sep 17 00:00:00 2001 From: James Date: Wed, 6 Dec 2023 11:17:58 -0800 Subject: [PATCH 05/13] chore: add cc0-1.0 for to_method --- deny.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/deny.toml b/deny.toml index 8d7bf3ea155..a9fa6886dee 100644 --- a/deny.toml +++ b/deny.toml @@ -37,6 +37,7 @@ exceptions = [ # so we prefer to not have dependencies using it # https://tldrlegal.com/license/creative-commons-cc0-1.0-universal { allow = ["CC0-1.0"], name = "tiny-keccak" }, + { allow = ["CC0-1.0"], name = "to_method" } ] [[licenses.clarify]] From 6fd316443ab2bf7d1bc0c16dc166d9c43b311b1a Mon Sep 17 00:00:00 2001 From: James Date: Thu, 7 Dec 2023 17:10:49 -0800 Subject: [PATCH 06/13] fix: buffer properly --- crates/transport-ipc/src/lib.rs | 64 ++++++++++++++++++++++----------- 1 file changed, 43 insertions(+), 21 deletions(-) diff --git a/crates/transport-ipc/src/lib.rs b/crates/transport-ipc/src/lib.rs index a2cd85bb777..ffe28f39850 100644 --- a/crates/transport-ipc/src/lib.rs +++ b/crates/transport-ipc/src/lib.rs @@ -22,11 +22,8 @@ use std::task::Poll::{Pending, Ready}; use alloy_json_rpc::PubSubItem; use bytes::{Buf, BytesMut}; -use futures::{ready, AsyncRead, AsyncWriteExt, StreamExt}; -use interprocess::local_socket::{ - tokio::{LocalSocketStream, OwnedReadHalf}, - ToLocalSocketName, -}; +use futures::{io::BufReader, ready, AsyncBufRead, AsyncRead, AsyncWriteExt, StreamExt}; +use interprocess::local_socket::{tokio::LocalSocketStream, ToLocalSocketName}; use tokio::select; type Result = std::result::Result; @@ -105,21 +102,41 @@ impl IpcBackend { } } +/// A stream of JSON-RPC items, read from an [`AsyncRead`] stream. +#[derive(Debug)] #[pin_project::pin_project] -struct ReadJsonStream { +pub struct ReadJsonStream { + /// The underlying reader. #[pin] - reader: OwnedReadHalf, + reader: BufReader, + /// A buffer of bytes read from the reader. buf: BytesMut, + /// A buffer of items deserialized from the reader. items: Vec, } -impl ReadJsonStream { - fn new(reader: OwnedReadHalf) -> Self { - Self { reader, buf: BytesMut::with_capacity(4096), items: vec![] } +impl ReadJsonStream +where + T: AsyncRead, +{ + fn new(reader: T) -> Self { + Self { reader: BufReader::new(reader), buf: BytesMut::with_capacity(4096), items: vec![] } } } -impl futures::stream::Stream for ReadJsonStream { +impl From for ReadJsonStream +where + T: AsyncRead, +{ + fn from(reader: T) -> Self { + Self::new(reader) + } +} + +impl futures::stream::Stream for ReadJsonStream +where + T: AsyncRead, +{ type Item = alloy_json_rpc::PubSubItem; fn poll_next( @@ -130,6 +147,9 @@ impl futures::stream::Stream for ReadJsonStream { // Deserialize any buffered items. if !this.buf.is_empty() { + this.reader.consume(this.buf.len()); + + tracing::debug!(buf_len = this.buf.len(), "Deserializing buffered IPC data"); let mut de = serde_json::Deserializer::from_slice(this.buf.as_ref()).into_iter(); let item = de.next(); @@ -149,30 +169,32 @@ impl futures::stream::Stream for ReadJsonStream { None => {} } this.buf.advance(de.byte_offset()); + cx.waker().wake_by_ref(); + return Pending; } + // Return any buffered items, rewaking. if !this.items.is_empty() { // may have more work! cx.waker().wake_by_ref(); return Ready(this.items.pop()); } - let data = ready!(this.reader.poll_read(cx, this.buf)); + tracing::debug!(buf_len = this.buf.len(), "Polling IPC socket for data"); + + let data = ready!(this.reader.poll_fill_buf(cx)); match data { - Ok(0) => { - tracing::debug!("IPC socket closed"); - return Ready(None); - } Err(e) => { - tracing::error!(%e, "Failed to read from IPC socket"); - return Ready(None); + tracing::error!(%e, "Failed to read from IPC socket, shutting down"); + Ready(None) } - _ => { + Ok(data) => { + tracing::debug!(data_len = data.len(), "Read data from IPC socket"); + this.buf.extend_from_slice(data); // wake task to run deserialization cx.waker().wake_by_ref(); + Pending } } - - Pending } } From d18b084e5b4242de526bd9991fb7ae348008140b Mon Sep 17 00:00:00 2001 From: James Date: Fri, 8 Dec 2023 10:28:08 -0800 Subject: [PATCH 07/13] feature: some rexports and client impl --- crates/rpc-client/Cargo.toml | 4 ++++ crates/rpc-client/src/client.rs | 12 +++++++++++- crates/rpc-client/src/lib.rs | 6 ++++++ 3 files changed, 21 insertions(+), 1 deletion(-) diff --git a/crates/rpc-client/Cargo.toml b/crates/rpc-client/Cargo.toml index cbac62f8111..0deffdfc820 100644 --- a/crates/rpc-client/Cargo.toml +++ b/crates/rpc-client/Cargo.toml @@ -30,6 +30,9 @@ reqwest = { workspace = true, optional = true } tokio = { workspace = true, optional = true } url = { workspace = true, optional = true } +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +alloy-transport-ipc = { workspace = true, optional = true } + [dev-dependencies] alloy-primitives.workspace = true alloy-transport-ws.workspace = true @@ -42,3 +45,4 @@ reqwest = ["dep:url", "dep:reqwest", "alloy-transport-http/reqwest"] hyper = ["dep:url", "dep:hyper", "alloy-transport-http/hyper"] pubsub = ["dep:tokio", "dep:alloy-pubsub", "dep:alloy-primitives"] ws = ["pubsub", "dep:alloy-transport-ws"] +ipc = ["pubsub", "dep:alloy-transport-ipc"] \ No newline at end of file diff --git a/crates/rpc-client/src/client.rs b/crates/rpc-client/src/client.rs index d2ecf786bfc..b603c14f83a 100644 --- a/crates/rpc-client/src/client.rs +++ b/crates/rpc-client/src/client.rs @@ -131,7 +131,7 @@ where #[cfg(feature = "pubsub")] mod pubsub_impl { use super::*; - use alloy_pubsub::PubSubFrontend; + use alloy_pubsub::{PubSubConnect, PubSubFrontend}; use tokio::sync::broadcast; impl RpcClient { @@ -142,6 +142,16 @@ mod pubsub_impl { ) -> broadcast::Receiver> { self.transport.get_subscription(id).await.unwrap() } + + /// Connect to a transport via a [`PubSubConnect`] implementor. + pub async fn connect_pubsub( + connect: C, + ) -> Result, TransportError> + where + C: PubSubConnect, + { + ClientBuilder::default().pubsub(connect).await + } } } diff --git a/crates/rpc-client/src/lib.rs b/crates/rpc-client/src/lib.rs index 0e479e95ff1..c0a148c4e8c 100644 --- a/crates/rpc-client/src/lib.rs +++ b/crates/rpc-client/src/lib.rs @@ -29,3 +29,9 @@ pub use call::RpcCall; mod client; pub use client::RpcClient; + +#[cfg(feature = "ws")] +pub use alloy_transport_ws::WsConnect; + +#[cfg(all(feature = "ipc", not(target_arch = "wasm32")))] +pub use alloy_transport_ipc::IpcConnect; From 5fa64024a7d9d242dbd8bcf01300151fe7bd9b08 Mon Sep 17 00:00:00 2001 From: James Date: Fri, 8 Dec 2023 10:45:00 -0800 Subject: [PATCH 08/13] feature: mock ipc --- crates/json-rpc/src/response/error.rs | 4 +- crates/json-rpc/src/response/mod.rs | 27 +++++++++++- crates/transport-ipc/Cargo.toml | 8 +++- crates/transport-ipc/src/lib.rs | 3 ++ crates/transport-ipc/src/mock.rs | 61 +++++++++++++++++++++++++++ 5 files changed, 99 insertions(+), 4 deletions(-) create mode 100644 crates/transport-ipc/src/mock.rs diff --git a/crates/json-rpc/src/response/error.rs b/crates/json-rpc/src/response/error.rs index 11632f4df6e..6b709239347 100644 --- a/crates/json-rpc/src/response/error.rs +++ b/crates/json-rpc/src/response/error.rs @@ -1,6 +1,6 @@ use serde::{ de::{DeserializeOwned, MapAccess, Visitor}, - Deserialize, Deserializer, + Deserialize, Deserializer, Serialize, }; use serde_json::value::RawValue; use std::{borrow::Borrow, fmt, marker::PhantomData}; @@ -10,7 +10,7 @@ use std::{borrow::Borrow, fmt, marker::PhantomData}; /// This response indicates that the server received and handled the request, /// but that there was an error in the processing of it. The error should be /// included in the `message` field of the response payload. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] pub struct ErrorPayload> { /// The error code. pub code: i64, diff --git a/crates/json-rpc/src/response/mod.rs b/crates/json-rpc/src/response/mod.rs index ed84dc689ad..97cbb261be7 100644 --- a/crates/json-rpc/src/response/mod.rs +++ b/crates/json-rpc/src/response/mod.rs @@ -1,7 +1,8 @@ use crate::common::Id; use serde::{ de::{DeserializeOwned, MapAccess, Visitor}, - Deserialize, Deserializer, + ser::SerializeMap, + Deserialize, Deserializer, Serialize, }; use serde_json::value::RawValue; use std::{borrow::Borrow, fmt, marker::PhantomData}; @@ -224,6 +225,30 @@ where } } +impl Serialize for Response +where + Payload: Serialize, + ErrData: Serialize, +{ + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut map = serializer.serialize_map(Some(3))?; + map.serialize_entry("jsonrpc", "2.0")?; + map.serialize_entry("id", &self.id)?; + match &self.payload { + ResponsePayload::Success(result) => { + map.serialize_entry("result", result)?; + } + ResponsePayload::Failure(error) => { + map.serialize_entry("error", error)?; + } + } + map.end() + } +} + #[cfg(test)] mod test { #[test] diff --git a/crates/transport-ipc/Cargo.toml b/crates/transport-ipc/Cargo.toml index 3f78afdf90b..8a98337e51e 100644 --- a/crates/transport-ipc/Cargo.toml +++ b/crates/transport-ipc/Cargo.toml @@ -23,4 +23,10 @@ tokio.workspace = true tracing.workspace = true bytes = "1.5.0" -interprocess = { version = "1.2.1", features = ["tokio", "tokio_support"] } \ No newline at end of file +interprocess = { version = "1.2.1", features = ["tokio", "tokio_support"] } +serde = { workspace = true, optional = true } +tempfile = { version = "3.8.1", optional = true } + +[features] +default = [] +mock = ["dep:serde", "dep:tempfile"] diff --git a/crates/transport-ipc/src/lib.rs b/crates/transport-ipc/src/lib.rs index ffe28f39850..4e51444c089 100644 --- a/crates/transport-ipc/src/lib.rs +++ b/crates/transport-ipc/src/lib.rs @@ -18,6 +18,9 @@ mod connect; pub use connect::IpcConnect; +#[cfg(feature = "mock")] +pub mod mock; + use std::task::Poll::{Pending, Ready}; use alloy_json_rpc::PubSubItem; diff --git a/crates/transport-ipc/src/mock.rs b/crates/transport-ipc/src/mock.rs new file mode 100644 index 00000000000..6255d7c8151 --- /dev/null +++ b/crates/transport-ipc/src/mock.rs @@ -0,0 +1,61 @@ +//! Mock IPC server. + +use alloy_json_rpc::Response; +use futures::{AsyncReadExt, AsyncWriteExt}; +use serde::Serialize; +use std::{collections::VecDeque, path::PathBuf}; +use tempfile::NamedTempFile; + +/// Mock IPC server. +#[derive(Debug)] +pub struct MockIpcServer { + /// Replies to send, in order + replies: VecDeque>, + /// Path to the socket + path: NamedTempFile, +} + +impl MockIpcServer { + /// Create a new mock IPC server. + pub fn new() -> Self { + Self { replies: VecDeque::new(), path: NamedTempFile::new().unwrap() } + } + + /// Add a raw reply to the server. + pub fn add_raw_reply(&mut self, reply: Vec) { + self.replies.push_back(reply); + } + + /// Add a reply to the server. + pub fn add_reply(&mut self, s: S) { + let reply = serde_json::to_vec(&s).unwrap(); + self.add_raw_reply(reply); + } + + /// Add a json-rpc response to the server. + pub fn add_response(&mut self, response: Response) { + self.add_reply(response); + } + + /// Get the path to the socket. + pub fn path(&self) -> PathBuf { + self.path.path().to_owned() + } + + /// Run the server. + pub async fn run(mut self) { + let socket = + interprocess::local_socket::tokio::LocalSocketStream::connect(self.path.path()) + .await + .unwrap(); + + let (mut reader, mut writer) = socket.into_split(); + + let mut buf = [0u8; 4096]; + loop { + reader.read(&mut buf).await.unwrap(); + let reply = self.replies.pop_front().unwrap(); + writer.write_all(&reply).await.unwrap(); + } + } +} From adbb4bcd3b106d314930db24fd846764ff3bc3ad Mon Sep 17 00:00:00 2001 From: James Date: Fri, 8 Dec 2023 10:52:32 -0800 Subject: [PATCH 09/13] docs: some of them --- crates/transport-ipc/src/lib.rs | 2 ++ crates/transport-ipc/src/mock.rs | 51 +++++++++++++++++++++++--------- 2 files changed, 39 insertions(+), 14 deletions(-) diff --git a/crates/transport-ipc/src/lib.rs b/crates/transport-ipc/src/lib.rs index 4e51444c089..fe550ca1938 100644 --- a/crates/transport-ipc/src/lib.rs +++ b/crates/transport-ipc/src/lib.rs @@ -20,6 +20,8 @@ pub use connect::IpcConnect; #[cfg(feature = "mock")] pub mod mock; +#[cfg(feature = "mock")] +pub use mock::MockIpcServer; use std::task::Poll::{Pending, Ready}; diff --git a/crates/transport-ipc/src/mock.rs b/crates/transport-ipc/src/mock.rs index 6255d7c8151..b456bfe87ec 100644 --- a/crates/transport-ipc/src/mock.rs +++ b/crates/transport-ipc/src/mock.rs @@ -7,6 +7,27 @@ use std::{collections::VecDeque, path::PathBuf}; use tempfile::NamedTempFile; /// Mock IPC server. +/// +/// Currently unix socket only, due to use of namedtempfile. +/// +/// ## Example: +/// +/// ``` +/// use alloy_transport_ipc::MockIpcServer; +/// # fn main() -> Result<(), Box> { +/// // Instantiate a new mock server. +/// let mut server = MockIpcServer::new(); +/// // Get the path to the socket. +/// let path = server.path(); +/// // Add a reply to the server. Can also use `add_raw_reply` to add a raw +/// // byte vector, or `add_response` to add a json-rpc response. +/// server.add_reply("hello"); +/// // Run the server. The first request will get "hello" as a response. +/// MockIpcServer::new().spawn(); +/// +/// # Ok(()) +/// # } +/// ``` #[derive(Debug)] pub struct MockIpcServer { /// Replies to send, in order @@ -43,19 +64,21 @@ impl MockIpcServer { } /// Run the server. - pub async fn run(mut self) { - let socket = - interprocess::local_socket::tokio::LocalSocketStream::connect(self.path.path()) - .await - .unwrap(); - - let (mut reader, mut writer) = socket.into_split(); - - let mut buf = [0u8; 4096]; - loop { - reader.read(&mut buf).await.unwrap(); - let reply = self.replies.pop_front().unwrap(); - writer.write_all(&reply).await.unwrap(); - } + pub async fn spawn(mut self) { + tokio::spawn(async move { + let socket = + interprocess::local_socket::tokio::LocalSocketStream::connect(self.path.path()) + .await + .unwrap(); + + let (mut reader, mut writer) = socket.into_split(); + + let mut buf = [0u8; 4096]; + loop { + reader.read(&mut buf).await.unwrap(); + let reply = self.replies.pop_front().unwrap_or_default(); + writer.write_all(&reply).await.unwrap(); + } + }); } } From c00aa0ef7702a871357d19c282251090ade5560d Mon Sep 17 00:00:00 2001 From: James Date: Fri, 8 Dec 2023 10:59:24 -0800 Subject: [PATCH 10/13] lint: clippy --- crates/transport-ipc/src/mock.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/crates/transport-ipc/src/mock.rs b/crates/transport-ipc/src/mock.rs index b456bfe87ec..3b5c0e0c923 100644 --- a/crates/transport-ipc/src/mock.rs +++ b/crates/transport-ipc/src/mock.rs @@ -36,6 +36,12 @@ pub struct MockIpcServer { path: NamedTempFile, } +impl Default for MockIpcServer { + fn default() -> Self { + Self::new() + } +} + impl MockIpcServer { /// Create a new mock IPC server. pub fn new() -> Self { @@ -75,7 +81,7 @@ impl MockIpcServer { let mut buf = [0u8; 4096]; loop { - reader.read(&mut buf).await.unwrap(); + let _ = reader.read(&mut buf).await.unwrap(); let reply = self.replies.pop_front().unwrap_or_default(); writer.write_all(&reply).await.unwrap(); } From 67ebd798481cca19d710cbbeaaa22a21a10ae445 Mon Sep 17 00:00:00 2001 From: James Date: Fri, 8 Dec 2023 10:59:44 -0800 Subject: [PATCH 11/13] nit: newline --- crates/transport-ipc/src/mock.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/transport-ipc/src/mock.rs b/crates/transport-ipc/src/mock.rs index 3b5c0e0c923..994f26e793b 100644 --- a/crates/transport-ipc/src/mock.rs +++ b/crates/transport-ipc/src/mock.rs @@ -24,7 +24,6 @@ use tempfile::NamedTempFile; /// server.add_reply("hello"); /// // Run the server. The first request will get "hello" as a response. /// MockIpcServer::new().spawn(); -/// /// # Ok(()) /// # } /// ``` From dbc808f06212b0dbaf91cadada520f18c1f0489c Mon Sep 17 00:00:00 2001 From: Enrique Ortiz Date: Fri, 8 Dec 2023 15:52:13 -0400 Subject: [PATCH 12/13] feat: close file so other processes can access it --- crates/transport-ipc/src/mock.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/crates/transport-ipc/src/mock.rs b/crates/transport-ipc/src/mock.rs index 994f26e793b..9c3a8947c33 100644 --- a/crates/transport-ipc/src/mock.rs +++ b/crates/transport-ipc/src/mock.rs @@ -71,10 +71,11 @@ impl MockIpcServer { /// Run the server. pub async fn spawn(mut self) { tokio::spawn(async move { - let socket = - interprocess::local_socket::tokio::LocalSocketStream::connect(self.path.path()) - .await - .unwrap(); + let socket = interprocess::local_socket::tokio::LocalSocketStream::connect( + self.path.into_temp_path().to_path_buf(), + ) + .await + .unwrap(); let (mut reader, mut writer) = socket.into_split(); From 6005c4c4a068dd51a844ede87568559203c4bccf Mon Sep 17 00:00:00 2001 From: evalir Date: Fri, 8 Dec 2023 16:14:42 -0400 Subject: [PATCH 13/13] feat: simple `IPC` test (#65) * chore: add basic ipc test * install test binaries * feat: ipc connection on builder if ipc feat is enabled * chore: remove, better done separately * review comments * review comments * chore: remove unused crates for now * add to dev deps * chore: add instructions for repro on ipc test * chore: re-add normal test * clippy * chore: change test to use mock ipc * add mock * no sleep till brooklyn * chore: revert to geth impl for now --- .github/scripts/install_test_binaries.sh | 51 ++++++++++++++++++++++++ .github/workflows/ci.yml | 3 ++ crates/rpc-client/Cargo.toml | 3 ++ crates/rpc-client/tests/it/ipc.rs | 38 ++++++++++++++++++ crates/rpc-client/tests/it/main.rs | 3 ++ crates/rpc-types/src/eth/filter.rs | 2 +- 6 files changed, 99 insertions(+), 1 deletion(-) create mode 100755 .github/scripts/install_test_binaries.sh create mode 100644 crates/rpc-client/tests/it/ipc.rs diff --git a/.github/scripts/install_test_binaries.sh b/.github/scripts/install_test_binaries.sh new file mode 100755 index 00000000000..39a1f869171 --- /dev/null +++ b/.github/scripts/install_test_binaries.sh @@ -0,0 +1,51 @@ +#!/usr/bin/env bash +# Installs Solc and Geth binaries +# Note: intended for use only with CI (x86_64 Ubuntu, MacOS or Windows) +set -e + +GETH_BUILD=${GETH_BUILD:-"1.11.2-73b01f40"} + +BIN_DIR=${BIN_DIR:-"$HOME/bin"} + +PLATFORM="$(uname -s | awk '{print tolower($0)}')" +if [ "$PLATFORM" != "linux" ] && [ "$PLATFORM" != "darwin" ]; then + EXT=".exe" +fi + +main() { + mkdir -p "$BIN_DIR" + cd "$BIN_DIR" + export PATH="$BIN_DIR:$PATH" + if [ "$GITHUB_PATH" ]; then + echo "$BIN_DIR" >> "$GITHUB_PATH" + fi + + install_geth + + echo "" + echo "Installed Geth:" + geth version +} + +# Installs geth from https://geth.ethereum.org/downloads +install_geth() { + case "$PLATFORM" in + linux|darwin) + name="geth-$PLATFORM-amd64-$GETH_BUILD" + curl -s "https://gethstore.blob.core.windows.net/builds/$name.tar.gz" | tar -xzf - + mv -f "$name/geth" ./ + rm -rf "$name" + chmod +x geth + ;; + *) + name="geth-windows-amd64-$GETH_BUILD" + zip="$name.zip" + curl -so "$zip" "https://gethstore.blob.core.windows.net/builds/$zip" + unzip "$zip" + mv -f "$name/geth.exe" ./ + rm -rf "$name" "$zip" + ;; + esac +} + +main diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8879941e042..726a1aa84c0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -31,6 +31,9 @@ jobs: uses: foundry-rs/foundry-toolchain@v1 with: version: nightly + - name: Install test binaries + shell: bash + run: ./.github/scripts/install_test_binaries.sh - uses: Swatinem/rust-cache@v2 with: cache-on-failure: true diff --git a/crates/rpc-client/Cargo.toml b/crates/rpc-client/Cargo.toml index 0deffdfc820..1b822c6afe6 100644 --- a/crates/rpc-client/Cargo.toml +++ b/crates/rpc-client/Cargo.toml @@ -38,6 +38,9 @@ alloy-primitives.workspace = true alloy-transport-ws.workspace = true test-log = { version = "0.2.13", default-features = false, features = ["trace"] } tracing-subscriber = { version = "0.3.17", features = ["std", "env-filter"] } +ethers-core = "2.0.10" +alloy-transport-ipc = { workspace = true, features = ["mock"] } +tempfile = "3" [features] default = ["reqwest"] diff --git a/crates/rpc-client/tests/it/ipc.rs b/crates/rpc-client/tests/it/ipc.rs new file mode 100644 index 00000000000..2a8f15c5e58 --- /dev/null +++ b/crates/rpc-client/tests/it/ipc.rs @@ -0,0 +1,38 @@ +use std::borrow::Cow; + +use alloy_primitives::U64; +use alloy_pubsub::PubSubFrontend; +use alloy_rpc_client::{ClientBuilder, RpcCall, RpcClient}; +use alloy_transport_ipc::IpcConnect; +use ethers_core::utils::{Geth, GethInstance}; +use tempfile::NamedTempFile; + +async fn connect() -> (RpcClient, GethInstance) { + let temp_file = NamedTempFile::new().unwrap(); + let path = temp_file.into_temp_path().to_path_buf(); + let geth = Geth::new().block_time(1u64).ipc_path(&path).spawn(); + + // [Windows named pipes](https://learn.microsoft.com/en-us/windows/win32/ipc/named-pipes) + // are located at `\\\pipe\`. + #[cfg(windows)] + let path = format!(r"\\.\pipe\{}", path.display()); + + let connector: IpcConnect<_> = path.into(); + + let client = ClientBuilder::default().pubsub(connector).await.unwrap(); + + (client, geth) +} + +#[test_log::test(tokio::test)] +async fn it_makes_a_request() { + let (client, _geth) = connect().await; + + let params: Cow<'static, _> = Cow::Owned(vec![]); + + let req: RpcCall<_, Cow<'static, Vec>, U64> = client.prepare("eth_blockNumber", params); + + let timeout = tokio::time::timeout(std::time::Duration::from_secs(2), req); + + timeout.await.unwrap().unwrap(); +} diff --git a/crates/rpc-client/tests/it/main.rs b/crates/rpc-client/tests/it/main.rs index c5c35e7bd39..932cf898ee0 100644 --- a/crates/rpc-client/tests/it/main.rs +++ b/crates/rpc-client/tests/it/main.rs @@ -5,3 +5,6 @@ mod http; #[cfg(feature = "pubsub")] mod ws; + +#[cfg(feature = "pubsub")] +mod ipc; diff --git a/crates/rpc-types/src/eth/filter.rs b/crates/rpc-types/src/eth/filter.rs index 6c714e05d0e..fc239c1ba91 100644 --- a/crates/rpc-types/src/eth/filter.rs +++ b/crates/rpc-types/src/eth/filter.rs @@ -736,7 +736,7 @@ impl FilteredParams { } /// Returns `true` if the bloom matches the topics - pub fn matches_topics(bloom: Bloom, topic_filters: &Vec) -> bool { + pub fn matches_topics(bloom: Bloom, topic_filters: &[BloomFilter]) -> bool { if topic_filters.is_empty() { return true; }