From 63325e5d756b382fbc7abb979455f00d859d7e76 Mon Sep 17 00:00:00 2001 From: tottoto Date: Fri, 22 Sep 2023 20:46:54 +0900 Subject: [PATCH] chore(codec): Replace tonic::codec::encode::fuse with tokio-stream --- tonic/src/codec/encode.rs | 65 ++++----------------------------------- 1 file changed, 6 insertions(+), 59 deletions(-) diff --git a/tonic/src/codec/encode.rs b/tonic/src/codec/encode.rs index bd48ee47b..d5d82e3c5 100644 --- a/tonic/src/codec/encode.rs +++ b/tonic/src/codec/encode.rs @@ -11,8 +11,6 @@ use std::{ }; use tokio_stream::{Stream, StreamExt}; -use fuse::Fuse; - pub(super) const BUFFER_SIZE: usize = 8 * 1024; const YIELD_THRESHOLD: usize = 32 * 1024; @@ -71,7 +69,7 @@ where U: Stream>, { #[pin] - source: Fuse, + source: U, encoder: T, compression_encoding: Option, max_message_size: Option, @@ -107,7 +105,7 @@ where }; return EncodedBytes { - source: Fuse::new(source), + source, encoder, compression_encoding, max_message_size, @@ -126,7 +124,7 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let EncodedBytesProj { - mut source, + source, encoder, compression_encoding, max_message_size, @@ -134,6 +132,9 @@ where uncompression_buf, } = self.project(); + let source = source.fuse(); + tokio::pin!(source); + loop { match source.as_mut().poll_next(cx) { Poll::Pending if buf.is_empty() => { @@ -345,57 +346,3 @@ where Poll::Ready(self.project().state.trailers()) } } - -mod fuse { - use std::{ - pin::Pin, - task::{ready, Context, Poll}, - }; - - use tokio_stream::Stream; - - /// Stream for the [`fuse`](super::StreamExt::fuse) method. - #[derive(Debug)] - #[pin_project::pin_project] - #[must_use = "streams do nothing unless polled"] - pub(crate) struct Fuse { - #[pin] - stream: St, - done: bool, - } - - impl Fuse { - pub(crate) fn new(stream: St) -> Self { - Self { - stream, - done: false, - } - } - } - - impl Stream for Fuse { - type Item = S::Item; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - - if *this.done { - return Poll::Ready(None); - } - - let item = ready!(this.stream.poll_next(cx)); - if item.is_none() { - *this.done = true; - } - Poll::Ready(item) - } - - fn size_hint(&self) -> (usize, Option) { - if self.done { - (0, Some(0)) - } else { - self.stream.size_hint() - } - } - } -}