From c708cfaf8b332c793929ab24ee9de40e3b800217 Mon Sep 17 00:00:00 2001 From: Congyuwang Date: Thu, 27 Jul 2023 21:30:32 +0800 Subject: [PATCH] registering waker only before Poll::Pending to reduce registering cost --- async/src/traits/consumer.rs | 10 +++++----- async/src/traits/producer.rs | 12 ++++++------ 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/async/src/traits/consumer.rs b/async/src/traits/consumer.rs index 653309f..1afaa88 100644 --- a/async/src/traits/consumer.rs +++ b/async/src/traits/consumer.rs @@ -71,7 +71,6 @@ impl<'a, A: AsyncConsumer> Future for PopFuture<'a, A> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { assert!(!self.done); - self.owner.register_write_waker(cx.waker()); let closed = self.owner.is_closed(); #[cfg(feature = "std")] std::println!("PopFuture::poll: closed={}", closed); @@ -84,6 +83,7 @@ impl<'a, A: AsyncConsumer> Future for PopFuture<'a, A> { if closed { Poll::Ready(None) } else { + self.owner.register_write_waker(cx.waker()); Poll::Pending } } @@ -115,7 +115,6 @@ where type Output = Result<(), usize>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.owner.register_write_waker(cx.waker()); let closed = self.owner.is_closed(); let mut slice = self.slice.take().unwrap(); let len = self.owner.pop_slice(slice); @@ -127,6 +126,7 @@ where Poll::Ready(Err(self.count)) } else { self.slice.replace(slice); + self.owner.register_write_waker(cx.waker()); Poll::Pending } } @@ -148,11 +148,11 @@ impl<'a, A: AsyncConsumer> Future for WaitOccupiedFuture<'a, A> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { assert!(!self.done); - self.owner.register_write_waker(cx.waker()); let closed = self.owner.is_closed(); if self.count <= self.owner.occupied_len() || closed { Poll::Ready(()) } else { + self.owner.register_write_waker(cx.waker()); Poll::Pending } } @@ -165,7 +165,6 @@ where type Item = ::Item; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.register_write_waker(cx.waker()); let closed = self.is_closed(); match self.try_pop() { Some(item) => Poll::Ready(Some(item)), @@ -173,6 +172,7 @@ where if closed { Poll::Ready(None) } else { + self.register_write_waker(cx.waker()); Poll::Pending } } @@ -186,12 +186,12 @@ where R::Target: AsyncRingBuffer, { fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { - self.register_write_waker(cx.waker()); let closed = self.is_closed(); let len = self.pop_slice(buf); if len != 0 || closed { Poll::Ready(Ok(len)) } else { + self.register_write_waker(cx.waker()); Poll::Pending } } diff --git a/async/src/traits/producer.rs b/async/src/traits/producer.rs index ea3a8e8..536c06e 100644 --- a/async/src/traits/producer.rs +++ b/async/src/traits/producer.rs @@ -87,7 +87,6 @@ impl<'a, A: AsyncProducer> Future for PushFuture<'a, A> { type Output = Result<(), A::Item>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.owner.register_read_waker(cx.waker()); let item = self.item.take().unwrap(); if self.owner.is_closed() { Poll::Ready(Err(item)) @@ -95,6 +94,7 @@ impl<'a, A: AsyncProducer> Future for PushFuture<'a, A> { match self.owner.try_push(item) { Err(item) => { self.item.replace(item); + self.owner.register_read_waker(cx.waker()); Poll::Pending } Ok(()) => Poll::Ready(Ok(())), @@ -127,7 +127,6 @@ where type Output = Result<(), usize>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.owner.register_read_waker(cx.waker()); let mut slice = self.slice.take().unwrap(); if self.owner.is_closed() { Poll::Ready(Err(self.count)) @@ -139,6 +138,7 @@ where Poll::Ready(Ok(())) } else { self.slice.replace(slice); + self.owner.register_read_waker(cx.waker()); Poll::Pending } } @@ -159,7 +159,6 @@ impl<'a, A: AsyncProducer, I: Iterator> Future for PushIterFutur type Output = bool; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.owner.register_read_waker(cx.waker()); let mut iter = self.iter.take().unwrap(); if self.owner.is_closed() { Poll::Ready(false) @@ -169,6 +168,7 @@ impl<'a, A: AsyncProducer, I: Iterator> Future for PushIterFutur Poll::Ready(true) } else { self.iter.replace(iter); + self.owner.register_read_waker(cx.waker()); Poll::Pending } } @@ -191,11 +191,11 @@ impl<'a, A: AsyncProducer> Future for WaitVacantFuture<'a, A> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { assert!(!self.done); - self.owner.register_read_waker(cx.waker()); let closed = self.owner.is_closed(); if self.count <= self.owner.vacant_len() || closed { Poll::Ready(()) } else { + self.owner.register_read_waker(cx.waker()); Poll::Pending } } @@ -208,10 +208,10 @@ where type Error = (); fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.register_read_waker(cx.waker()); if self.is_closed() { Poll::Ready(Err(())) } else if self.is_full() { + self.register_read_waker(cx.waker()); Poll::Pending } else { Poll::Ready(Ok(())) @@ -237,12 +237,12 @@ where R::Target: AsyncRingBuffer, { fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { - self.register_read_waker(cx.waker()); if self.is_closed() { Poll::Ready(Ok(0)) } else { let count = self.push_slice(buf); if count == 0 { + self.register_read_waker(cx.waker()); Poll::Pending } else { Poll::Ready(Ok(count))