Skip to content

Commit

Permalink
cleanup: tracing and flow
Browse files Browse the repository at this point in the history
  • Loading branch information
prestwich committed Dec 2, 2023
1 parent 7998694 commit c94cb42
Showing 1 changed file with 14 additions and 12 deletions.
26 changes: 14 additions & 12 deletions crates/transport-ipc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,23 @@ impl IpcBackend {
}
},
// dispatcher has gone away, or shutdown was received
None => break false,
None => {
tracing::debug!("Frontend has gone away");
break false;
},
}
}
// Read from the socket.
item = read.next() => {
match item {
Some(item) => {
if self.interface.send_to_frontend(item).is_err() {
// frontend has gone away
tracing::debug!("Frontend has gone away");
break false;
}
}
None => {
tracing::error!("Read stream has failed.");
break true;
}
}
Expand All @@ -106,13 +110,12 @@ struct ReadJsonStream {
#[pin]
reader: OwnedReadHalf,
buf: BytesMut,
done: bool,
items: Vec<PubSubItem>,
}

impl ReadJsonStream {
fn new(reader: OwnedReadHalf) -> Self {
Self { reader, buf: BytesMut::with_capacity(4096), done: false, items: vec![] }
Self { reader, buf: BytesMut::with_capacity(4096), items: vec![] }
}
}

Expand All @@ -123,10 +126,6 @@ impl futures::stream::Stream for ReadJsonStream {
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
if self.done {
return Ready(None);
}

let this = self.project();

// Deserialize any buffered items.
Expand All @@ -139,8 +138,7 @@ impl futures::stream::Stream for ReadJsonStream {
this.items.push(response);
}
Some(Err(e)) => {
tracing::error!(%e, "Failed to deserialize IPC response");
*this.done = true;
tracing::error!(%e, "IPC response contained invalid JSON");
return Ready(None);
}
None => {}
Expand All @@ -156,8 +154,12 @@ impl futures::stream::Stream for ReadJsonStream {

let data = ready!(this.reader.poll_read(cx, this.buf));
match data {
Ok(0) | Err(_) => {
*this.done = true;
Ok(0) => {
tracing::debug!("IPC socket closed");
return Ready(None);
}
Err(e) => {
tracing::error!(%e, "Failed to read from IPC socket");
return Ready(None);
}
_ => {
Expand Down

0 comments on commit c94cb42

Please sign in to comment.