Skip to content

Commit

Permalink
chore(codec): Replace tonic::codec::encode::fuse with tokio-stream
Browse files Browse the repository at this point in the history
  • Loading branch information
tottoto committed Sep 22, 2023
1 parent d5c14fa commit 63325e5
Showing 1 changed file with 6 additions and 59 deletions.
65 changes: 6 additions & 59 deletions tonic/src/codec/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ use std::{
};
use tokio_stream::{Stream, StreamExt};

use fuse::Fuse;

pub(super) const BUFFER_SIZE: usize = 8 * 1024;
const YIELD_THRESHOLD: usize = 32 * 1024;

Expand Down Expand Up @@ -71,7 +69,7 @@ where
U: Stream<Item = Result<T::Item, Status>>,
{
#[pin]
source: Fuse<U>,
source: U,
encoder: T,
compression_encoding: Option<CompressionEncoding>,
max_message_size: Option<usize>,
Expand Down Expand Up @@ -107,7 +105,7 @@ where
};

return EncodedBytes {
source: Fuse::new(source),
source,
encoder,
compression_encoding,
max_message_size,
Expand All @@ -126,14 +124,17 @@ where

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let EncodedBytesProj {
mut source,
source,
encoder,
compression_encoding,
max_message_size,
buf,
uncompression_buf,
} = self.project();

let source = source.fuse();
tokio::pin!(source);

loop {
match source.as_mut().poll_next(cx) {
Poll::Pending if buf.is_empty() => {
Expand Down Expand Up @@ -345,57 +346,3 @@ where
Poll::Ready(self.project().state.trailers())
}
}

mod fuse {
use std::{
pin::Pin,
task::{ready, Context, Poll},
};

use tokio_stream::Stream;

/// Stream for the [`fuse`](super::StreamExt::fuse) method.
#[derive(Debug)]
#[pin_project::pin_project]
#[must_use = "streams do nothing unless polled"]
pub(crate) struct Fuse<St> {
#[pin]
stream: St,
done: bool,
}

impl<St> Fuse<St> {
pub(crate) fn new(stream: St) -> Self {
Self {
stream,
done: false,
}
}
}

impl<S: Stream> Stream for Fuse<S> {
type Item = S::Item;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
let this = self.project();

if *this.done {
return Poll::Ready(None);
}

let item = ready!(this.stream.poll_next(cx));
if item.is_none() {
*this.done = true;
}
Poll::Ready(item)
}

fn size_hint(&self) -> (usize, Option<usize>) {
if self.done {
(0, Some(0))
} else {
self.stream.size_hint()
}
}
}
}

0 comments on commit 63325e5

Please sign in to comment.