Skip to content

Commit

Permalink
Merge branch 'prestwich/interprocess' into evalir/ipc-test
Browse files Browse the repository at this point in the history
  • Loading branch information
Evalir committed Dec 8, 2023
2 parents 77ab105 + 6fd3164 commit 4b26aac
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 21 deletions.
64 changes: 43 additions & 21 deletions crates/transport-ipc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,8 @@ use std::task::Poll::{Pending, Ready};

use alloy_json_rpc::PubSubItem;
use bytes::{Buf, BytesMut};
use futures::{ready, AsyncRead, AsyncWriteExt, StreamExt};
use interprocess::local_socket::{
tokio::{LocalSocketStream, OwnedReadHalf},
ToLocalSocketName,
};
use futures::{io::BufReader, ready, AsyncBufRead, AsyncRead, AsyncWriteExt, StreamExt};
use interprocess::local_socket::{tokio::LocalSocketStream, ToLocalSocketName};
use tokio::select;

type Result<T> = std::result::Result<T, std::io::Error>;
Expand Down Expand Up @@ -105,21 +102,41 @@ impl IpcBackend {
}
}

/// A stream of JSON-RPC items, read from an [`AsyncRead`] stream.
#[derive(Debug)]
#[pin_project::pin_project]
struct ReadJsonStream {
pub struct ReadJsonStream<T> {
/// The underlying reader.
#[pin]
reader: OwnedReadHalf,
reader: BufReader<T>,
/// A buffer of bytes read from the reader.
buf: BytesMut,
/// A buffer of items deserialized from the reader.
items: Vec<PubSubItem>,
}

impl ReadJsonStream {
fn new(reader: OwnedReadHalf) -> Self {
Self { reader, buf: BytesMut::with_capacity(4096), items: vec![] }
impl<T> ReadJsonStream<T>
where
T: AsyncRead,
{
fn new(reader: T) -> Self {
Self { reader: BufReader::new(reader), buf: BytesMut::with_capacity(4096), items: vec![] }
}
}

impl futures::stream::Stream for ReadJsonStream {
impl<T> From<T> for ReadJsonStream<T>
where
T: AsyncRead,
{
fn from(reader: T) -> Self {
Self::new(reader)
}
}

impl<T> futures::stream::Stream for ReadJsonStream<T>
where
T: AsyncRead,
{
type Item = alloy_json_rpc::PubSubItem;

fn poll_next(
Expand All @@ -130,6 +147,9 @@ impl futures::stream::Stream for ReadJsonStream {

// Deserialize any buffered items.
if !this.buf.is_empty() {
this.reader.consume(this.buf.len());

tracing::debug!(buf_len = this.buf.len(), "Deserializing buffered IPC data");
let mut de = serde_json::Deserializer::from_slice(this.buf.as_ref()).into_iter();

let item = de.next();
Expand All @@ -149,30 +169,32 @@ impl futures::stream::Stream for ReadJsonStream {
None => {}
}
this.buf.advance(de.byte_offset());
cx.waker().wake_by_ref();
return Pending;
}

// Return any buffered items, rewaking.
if !this.items.is_empty() {
// may have more work!
cx.waker().wake_by_ref();
return Ready(this.items.pop());
}

let data = ready!(this.reader.poll_read(cx, this.buf));
tracing::debug!(buf_len = this.buf.len(), "Polling IPC socket for data");

let data = ready!(this.reader.poll_fill_buf(cx));
match data {
Ok(0) => {
tracing::debug!("IPC socket closed");
return Ready(None);
}
Err(e) => {
tracing::error!(%e, "Failed to read from IPC socket");
return Ready(None);
tracing::error!(%e, "Failed to read from IPC socket, shutting down");
Ready(None)
}
_ => {
Ok(data) => {
tracing::debug!(data_len = data.len(), "Read data from IPC socket");
this.buf.extend_from_slice(data);
// wake task to run deserialization
cx.waker().wake_by_ref();
Pending
}
}

Pending
}
}
1 change: 1 addition & 0 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ exceptions = [
# so we prefer to not have dependencies using it
# https://tldrlegal.com/license/creative-commons-cc0-1.0-universal
{ allow = ["CC0-1.0"], name = "tiny-keccak" },
{ allow = ["CC0-1.0"], name = "to_method" }
]

[[licenses.clarify]]
Expand Down

0 comments on commit 4b26aac

Please sign in to comment.