From 5417c6ec16b6da8bdb7312bf21df88359c9549e1 Mon Sep 17 00:00:00 2001 From: tottoto Date: Fri, 22 Sep 2023 20:46:54 +0900 Subject: [PATCH] chore(codec): Replace tonic::codec::encode::fuse with tokio-stream --- tonic/src/codec/encode.rs | 65 +++------------------------------------ 1 file changed, 5 insertions(+), 60 deletions(-) diff --git a/tonic/src/codec/encode.rs b/tonic/src/codec/encode.rs index 593634f93..13eb2c96d 100644 --- a/tonic/src/codec/encode.rs +++ b/tonic/src/codec/encode.rs @@ -11,8 +11,6 @@ use std::{ }; use tokio_stream::{Stream, StreamExt}; -use fuse::Fuse; - pub(super) const BUFFER_SIZE: usize = 8 * 1024; const YIELD_THRESHOLD: usize = 32 * 1024; @@ -29,7 +27,7 @@ where { let stream = EncodedBytes::new( encoder, - source, + source.fuse(), compression_encoding, compression_override, max_message_size, @@ -50,7 +48,7 @@ where { let stream = EncodedBytes::new( encoder, - source.map(Ok), + source.fuse().map(Ok), compression_encoding, SingleMessageCompressionOverride::default(), max_message_size, @@ -71,7 +69,7 @@ where U: Stream>, { #[pin] - source: Fuse, + source: U, encoder: T, compression_encoding: Option, max_message_size: Option, @@ -84,6 +82,7 @@ where T: Encoder, U: Stream>, { + // `source` should be fused stream. fn new( encoder: T, source: U, @@ -107,7 +106,7 @@ where }; Self { - source: Fuse::new(source), + source, encoder, compression_encoding, max_message_size, @@ -345,57 +344,3 @@ where Poll::Ready(self.project().state.trailers()) } } - -mod fuse { - use std::{ - pin::Pin, - task::{ready, Context, Poll}, - }; - - use tokio_stream::Stream; - - /// Stream for the [`fuse`](super::StreamExt::fuse) method. - #[derive(Debug)] - #[pin_project::pin_project] - #[must_use = "streams do nothing unless polled"] - pub(crate) struct Fuse { - #[pin] - stream: St, - done: bool, - } - - impl Fuse { - pub(crate) fn new(stream: St) -> Self { - Self { - stream, - done: false, - } - } - } - - impl Stream for Fuse { - type Item = S::Item; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - - if *this.done { - return Poll::Ready(None); - } - - let item = ready!(this.stream.poll_next(cx)); - if item.is_none() { - *this.done = true; - } - Poll::Ready(item) - } - - fn size_hint(&self) -> (usize, Option) { - if self.done { - (0, Some(0)) - } else { - self.stream.size_hint() - } - } - } -}