Skip to content

Commit

Permalink
registering waker only before Poll::Pending to reduce registering cost
Browse files Browse the repository at this point in the history
  • Loading branch information
Congyuwang committed Jul 27, 2023
1 parent 45cc3a8 commit c708cfa
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 11 deletions.
10 changes: 5 additions & 5 deletions async/src/traits/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ impl<'a, A: AsyncConsumer> Future for PopFuture<'a, A> {

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
assert!(!self.done);
self.owner.register_write_waker(cx.waker());
let closed = self.owner.is_closed();
#[cfg(feature = "std")]
std::println!("PopFuture::poll: closed={}", closed);
Expand All @@ -84,6 +83,7 @@ impl<'a, A: AsyncConsumer> Future for PopFuture<'a, A> {
if closed {
Poll::Ready(None)
} else {
self.owner.register_write_waker(cx.waker());
Poll::Pending
}
}
Expand Down Expand Up @@ -115,7 +115,6 @@ where
type Output = Result<(), usize>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.owner.register_write_waker(cx.waker());
let closed = self.owner.is_closed();
let mut slice = self.slice.take().unwrap();
let len = self.owner.pop_slice(slice);
Expand All @@ -127,6 +126,7 @@ where
Poll::Ready(Err(self.count))
} else {
self.slice.replace(slice);
self.owner.register_write_waker(cx.waker());
Poll::Pending
}
}
Expand All @@ -148,11 +148,11 @@ impl<'a, A: AsyncConsumer> Future for WaitOccupiedFuture<'a, A> {

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
assert!(!self.done);
self.owner.register_write_waker(cx.waker());
let closed = self.owner.is_closed();
if self.count <= self.owner.occupied_len() || closed {
Poll::Ready(())
} else {
self.owner.register_write_waker(cx.waker());
Poll::Pending
}
}
Expand All @@ -165,14 +165,14 @@ where
type Item = <R::Target as Observer>::Item;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.register_write_waker(cx.waker());
let closed = self.is_closed();
match self.try_pop() {
Some(item) => Poll::Ready(Some(item)),
None => {
if closed {
Poll::Ready(None)
} else {
self.register_write_waker(cx.waker());
Poll::Pending
}
}
Expand All @@ -186,12 +186,12 @@ where
R::Target: AsyncRingBuffer<Item = u8>,
{
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
self.register_write_waker(cx.waker());
let closed = self.is_closed();
let len = self.pop_slice(buf);
if len != 0 || closed {
Poll::Ready(Ok(len))
} else {
self.register_write_waker(cx.waker());
Poll::Pending
}
}
Expand Down
12 changes: 6 additions & 6 deletions async/src/traits/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,14 @@ impl<'a, A: AsyncProducer> Future for PushFuture<'a, A> {
type Output = Result<(), A::Item>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.owner.register_read_waker(cx.waker());
let item = self.item.take().unwrap();
if self.owner.is_closed() {
Poll::Ready(Err(item))
} else {
match self.owner.try_push(item) {
Err(item) => {
self.item.replace(item);
self.owner.register_read_waker(cx.waker());
Poll::Pending
}
Ok(()) => Poll::Ready(Ok(())),
Expand Down Expand Up @@ -127,7 +127,6 @@ where
type Output = Result<(), usize>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.owner.register_read_waker(cx.waker());
let mut slice = self.slice.take().unwrap();
if self.owner.is_closed() {
Poll::Ready(Err(self.count))
Expand All @@ -139,6 +138,7 @@ where
Poll::Ready(Ok(()))
} else {
self.slice.replace(slice);
self.owner.register_read_waker(cx.waker());
Poll::Pending
}
}
Expand All @@ -159,7 +159,6 @@ impl<'a, A: AsyncProducer, I: Iterator<Item = A::Item>> Future for PushIterFutur
type Output = bool;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.owner.register_read_waker(cx.waker());
let mut iter = self.iter.take().unwrap();
if self.owner.is_closed() {
Poll::Ready(false)
Expand All @@ -169,6 +168,7 @@ impl<'a, A: AsyncProducer, I: Iterator<Item = A::Item>> Future for PushIterFutur
Poll::Ready(true)
} else {
self.iter.replace(iter);
self.owner.register_read_waker(cx.waker());
Poll::Pending
}
}
Expand All @@ -191,11 +191,11 @@ impl<'a, A: AsyncProducer> Future for WaitVacantFuture<'a, A> {

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
assert!(!self.done);
self.owner.register_read_waker(cx.waker());
let closed = self.owner.is_closed();
if self.count <= self.owner.vacant_len() || closed {
Poll::Ready(())
} else {
self.owner.register_read_waker(cx.waker());
Poll::Pending
}
}
Expand All @@ -208,10 +208,10 @@ where
type Error = ();

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.register_read_waker(cx.waker());
if self.is_closed() {
Poll::Ready(Err(()))
} else if self.is_full() {
self.register_read_waker(cx.waker());
Poll::Pending
} else {
Poll::Ready(Ok(()))
Expand All @@ -237,12 +237,12 @@ where
R::Target: AsyncRingBuffer<Item = u8>,
{
fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
self.register_read_waker(cx.waker());
if self.is_closed() {
Poll::Ready(Ok(0))
} else {
let count = self.push_slice(buf);
if count == 0 {
self.register_read_waker(cx.waker());
Poll::Pending
} else {
Poll::Ready(Ok(count))
Expand Down

0 comments on commit c708cfa

Please sign in to comment.