From decf29a910228f7134d135ce4a610b027dbb8566 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 16 Sep 2024 23:11:01 +0100 Subject: [PATCH 1/2] merge_streams: shorten the code a tiny bit --- crates/librqbit/src/merge_streams.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/crates/librqbit/src/merge_streams.rs b/crates/librqbit/src/merge_streams.rs index 35205d4f..c3848e59 100644 --- a/crates/librqbit/src/merge_streams.rs +++ b/crates/librqbit/src/merge_streams.rs @@ -36,11 +36,7 @@ fn poll_two + Unpin, S2: Stream + Unpin>( let s1p = s1.poll_next_unpin(cx); match s1p { Poll::Ready(Some(v)) => Poll::Ready(Some(v)), - Poll::Ready(None) => match s2.poll_next_unpin(cx) { - Poll::Ready(Some(v)) => Poll::Ready(Some(v)), - Poll::Ready(None) => Poll::Ready(None), - Poll::Pending => Poll::Pending, - }, + Poll::Ready(None) => s2.poll_next_unpin(cx), Poll::Pending => match s2.poll_next_unpin(cx) { Poll::Ready(Some(v)) => Poll::Ready(Some(v)), Poll::Ready(None) | Poll::Pending => Poll::Pending, From 2c6a9f7a14c5b1ae473b5b574a07ed6e4f2cb71b Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 16 Sep 2024 23:16:09 +0100 Subject: [PATCH 2/2] Remove custom "merge_streams" in favor of tokio_stream --- crates/librqbit/src/merge_streams.rs | 59 ++-------------------------- 1 file changed, 3 insertions(+), 56 deletions(-) diff --git a/crates/librqbit/src/merge_streams.rs b/crates/librqbit/src/merge_streams.rs index c3848e59..d2ada2c1 100644 --- a/crates/librqbit/src/merge_streams.rs +++ b/crates/librqbit/src/merge_streams.rs @@ -1,15 +1,4 @@ -use std::{ - sync::atomic::{AtomicU64, Ordering}, - task::Poll, -}; - -use futures::Stream; - -struct MergedStreams { - poll_count: AtomicU64, - s1: futures::stream::Fuse, - s2: futures::stream::Fuse, -} +use futures::stream::Stream; pub fn merge_streams< I, @@ -19,48 +8,6 @@ pub fn merge_streams< s1: S1, s2: S2, ) -> impl Stream + Unpin + 'static { - use futures::stream::StreamExt; - MergedStreams { - poll_count: AtomicU64::new(0), - s1: s1.fuse(), - s2: s2.fuse(), - } -} - -fn poll_two + Unpin, S2: Stream + Unpin>( - s1: &mut S1, - s2: &mut S2, - cx: &mut std::task::Context<'_>, -) -> Poll> { - use futures::StreamExt; - let s1p = s1.poll_next_unpin(cx); - match s1p { - Poll::Ready(Some(v)) => Poll::Ready(Some(v)), - Poll::Ready(None) => s2.poll_next_unpin(cx), - Poll::Pending => match s2.poll_next_unpin(cx) { - Poll::Ready(Some(v)) => Poll::Ready(Some(v)), - Poll::Ready(None) | Poll::Pending => Poll::Pending, - }, - } -} - -impl Stream for MergedStreams -where - S1: Stream + Unpin, - S2: Stream + Unpin, -{ - type Item = I; - - fn poll_next( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - let this = self.get_mut(); - let s1_first = this.poll_count.fetch_add(1, Ordering::Relaxed) % 2 == 0; - if s1_first { - poll_two(&mut this.s1, &mut this.s2, cx) - } else { - poll_two(&mut this.s2, &mut this.s1, cx) - } - } + use tokio_stream::StreamExt; + s1.merge(s2) }