diff --git a/async/src/tests.rs b/async/src/tests.rs index f96e0d4..293bbe8 100644 --- a/async/src/tests.rs +++ b/async/src/tests.rs @@ -177,8 +177,8 @@ fn drop_close_prod() { let stage_clone = stage.clone(); let t0 = std::thread::spawn(move || { execute!(async { - drop(prod); assert_eq!(stage.fetch_add(1, Ordering::SeqCst), 0); + drop(prod); }); }); let t1 = std::thread::spawn(move || { @@ -199,11 +199,11 @@ fn drop_close_cons() { let stage_clone = stage.clone(); let t0 = std::thread::spawn(move || { execute!(async { - prod.push(0).await.unwrap(); assert_eq!(stage.fetch_add(1, Ordering::SeqCst), 0); + prod.push(0).await.unwrap(); prod.wait_vacant(1).await; - assert_eq!(stage.fetch_add(1, Ordering::SeqCst), 3); + assert_eq!(stage.fetch_add(1, Ordering::SeqCst), 2); assert!(prod.is_closed()); }); }); @@ -212,7 +212,6 @@ fn drop_close_cons() { cons.wait_occupied(1).await; assert_eq!(stage_clone.fetch_add(1, Ordering::SeqCst), 1); drop(cons); - assert_eq!(stage_clone.fetch_add(1, Ordering::SeqCst), 2); }); }); t0.join().unwrap(); diff --git a/async/src/traits/consumer.rs b/async/src/traits/consumer.rs index 653309f..4a2d342 100644 --- a/async/src/traits/consumer.rs +++ b/async/src/traits/consumer.rs @@ -70,21 +70,28 @@ impl<'a, A: AsyncConsumer> Future for PopFuture<'a, A> { type Output = Option; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - assert!(!self.done); - self.owner.register_write_waker(cx.waker()); - let closed = self.owner.is_closed(); - #[cfg(feature = "std")] - std::println!("PopFuture::poll: closed={}", closed); - match self.owner.try_pop() { - Some(item) => { - self.done = true; - Poll::Ready(Some(item)) - } - None => { - if closed { - Poll::Ready(None) - } else { - Poll::Pending + let mut waker_registered = false; + loop { + assert!(!self.done); + let closed = self.owner.is_closed(); + #[cfg(feature = "std")] + std::println!("PopFuture::poll: closed={}", closed); + match self.owner.try_pop() { + Some(item) => { + self.done = true; + break Poll::Ready(Some(item)); + } + None => { + if closed { + break Poll::Ready(None); + } else { + if waker_registered { + break Poll::Pending; + } else { + self.owner.register_write_waker(cx.waker()); + waker_registered = true; + } + } } } } @@ -115,19 +122,26 @@ where type Output = Result<(), usize>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.owner.register_write_waker(cx.waker()); - let closed = self.owner.is_closed(); - let mut slice = self.slice.take().unwrap(); - let len = self.owner.pop_slice(slice); - slice = &mut slice[len..]; - self.count += len; - if slice.is_empty() { - Poll::Ready(Ok(())) - } else if closed { - Poll::Ready(Err(self.count)) - } else { - self.slice.replace(slice); - Poll::Pending + let mut waker_registered = false; + loop { + let closed = self.owner.is_closed(); + let mut slice = self.slice.take().unwrap(); + let len = self.owner.pop_slice(slice); + slice = &mut slice[len..]; + self.count += len; + if slice.is_empty() { + break Poll::Ready(Ok(())); + } else if closed { + break Poll::Ready(Err(self.count)); + } else { + self.slice.replace(slice); + if waker_registered { + break Poll::Pending; + } else { + self.owner.register_write_waker(cx.waker()); + waker_registered = true; + } + } } } } @@ -147,13 +161,20 @@ impl<'a, A: AsyncConsumer> Future for WaitOccupiedFuture<'a, A> { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - assert!(!self.done); - self.owner.register_write_waker(cx.waker()); - let closed = self.owner.is_closed(); - if self.count <= self.owner.occupied_len() || closed { - Poll::Ready(()) - } else { - Poll::Pending + let mut waker_registered = false; + loop { + assert!(!self.done); + let closed = self.owner.is_closed(); + if self.count <= self.owner.occupied_len() || closed { + break Poll::Ready(()); + } else { + if waker_registered { + break Poll::Pending; + } else { + self.owner.register_write_waker(cx.waker()); + waker_registered = true; + } + } } } } @@ -165,15 +186,22 @@ where type Item = ::Item; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.register_write_waker(cx.waker()); - let closed = self.is_closed(); - match self.try_pop() { - Some(item) => Poll::Ready(Some(item)), - None => { - if closed { - Poll::Ready(None) - } else { - Poll::Pending + let mut waker_registered = false; + loop { + let closed = self.is_closed(); + match self.try_pop() { + Some(item) => break Poll::Ready(Some(item)), + None => { + if closed { + break Poll::Ready(None); + } else { + if waker_registered { + break Poll::Pending; + } else { + self.register_write_waker(cx.waker()); + waker_registered = true; + } + } } } } @@ -186,13 +214,20 @@ where R::Target: AsyncRingBuffer, { fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { - self.register_write_waker(cx.waker()); - let closed = self.is_closed(); - let len = self.pop_slice(buf); - if len != 0 || closed { - Poll::Ready(Ok(len)) - } else { - Poll::Pending + let mut waker_registered = false; + loop { + let closed = self.is_closed(); + let len = self.pop_slice(buf); + if len != 0 || closed { + break Poll::Ready(Ok(len)); + } else { + if waker_registered { + break Poll::Pending; + } else { + self.register_write_waker(cx.waker()); + waker_registered = true; + } + } } } } diff --git a/async/src/traits/producer.rs b/async/src/traits/producer.rs index ea3a8e8..3364a44 100644 --- a/async/src/traits/producer.rs +++ b/async/src/traits/producer.rs @@ -87,17 +87,24 @@ impl<'a, A: AsyncProducer> Future for PushFuture<'a, A> { type Output = Result<(), A::Item>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.owner.register_read_waker(cx.waker()); - let item = self.item.take().unwrap(); - if self.owner.is_closed() { - Poll::Ready(Err(item)) - } else { - match self.owner.try_push(item) { - Err(item) => { - self.item.replace(item); - Poll::Pending + let mut waker_registered = false; + loop { + let item = self.item.take().unwrap(); + if self.owner.is_closed() { + break Poll::Ready(Err(item)); + } else { + match self.owner.try_push(item) { + Err(item) => { + self.item.replace(item); + if waker_registered { + break Poll::Pending; + } else { + self.owner.register_read_waker(cx.waker()); + waker_registered = true; + } + } + Ok(()) => break Poll::Ready(Ok(())), } - Ok(()) => Poll::Ready(Ok(())), } } } @@ -127,19 +134,26 @@ where type Output = Result<(), usize>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.owner.register_read_waker(cx.waker()); - let mut slice = self.slice.take().unwrap(); - if self.owner.is_closed() { - Poll::Ready(Err(self.count)) - } else { - let len = self.owner.push_slice(slice); - slice = &slice[len..]; - self.count += len; - if slice.is_empty() { - Poll::Ready(Ok(())) + let mut waker_registered = false; + loop { + let mut slice = self.slice.take().unwrap(); + if self.owner.is_closed() { + break Poll::Ready(Err(self.count)); } else { - self.slice.replace(slice); - Poll::Pending + let len = self.owner.push_slice(slice); + slice = &slice[len..]; + self.count += len; + if slice.is_empty() { + break Poll::Ready(Ok(())); + } else { + self.slice.replace(slice); + if waker_registered { + break Poll::Pending; + } else { + self.owner.register_read_waker(cx.waker()); + waker_registered = true; + } + } } } } @@ -159,17 +173,24 @@ impl<'a, A: AsyncProducer, I: Iterator> Future for PushIterFutur type Output = bool; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.owner.register_read_waker(cx.waker()); - let mut iter = self.iter.take().unwrap(); - if self.owner.is_closed() { - Poll::Ready(false) - } else { - self.owner.push_iter(&mut iter); - if iter.peek().is_none() { - Poll::Ready(true) + let mut waker_registered = false; + loop { + let mut iter = self.iter.take().unwrap(); + if self.owner.is_closed() { + break Poll::Ready(false); } else { - self.iter.replace(iter); - Poll::Pending + self.owner.push_iter(&mut iter); + if iter.peek().is_none() { + break Poll::Ready(true); + } else { + self.iter.replace(iter); + if waker_registered { + break Poll::Pending; + } else { + self.owner.register_read_waker(cx.waker()); + waker_registered = true; + } + } } } } @@ -190,13 +211,20 @@ impl<'a, A: AsyncProducer> Future for WaitVacantFuture<'a, A> { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - assert!(!self.done); - self.owner.register_read_waker(cx.waker()); - let closed = self.owner.is_closed(); - if self.count <= self.owner.vacant_len() || closed { - Poll::Ready(()) - } else { - Poll::Pending + let mut waker_registered = false; + loop { + assert!(!self.done); + let closed = self.owner.is_closed(); + if self.count <= self.owner.vacant_len() || closed { + break Poll::Ready(()); + } else { + if waker_registered { + break Poll::Pending; + } else { + self.owner.register_read_waker(cx.waker()); + waker_registered = true; + } + } } } } @@ -208,13 +236,20 @@ where type Error = (); fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.register_read_waker(cx.waker()); - if self.is_closed() { - Poll::Ready(Err(())) - } else if self.is_full() { - Poll::Pending - } else { - Poll::Ready(Ok(())) + let mut waker_registered = false; + loop { + if self.is_closed() { + break Poll::Ready(Err(())); + } else if !self.is_full() { + break Poll::Ready(Ok(())); + } else { + if waker_registered { + break Poll::Pending; + } else { + self.register_read_waker(cx.waker()); + waker_registered = true; + } + } } } fn start_send(mut self: Pin<&mut Self>, item: ::Item) -> Result<(), Self::Error> { @@ -237,15 +272,22 @@ where R::Target: AsyncRingBuffer, { fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { - self.register_read_waker(cx.waker()); - if self.is_closed() { - Poll::Ready(Ok(0)) - } else { - let count = self.push_slice(buf); - if count == 0 { - Poll::Pending + let mut waker_registered = false; + loop { + if self.is_closed() { + break Poll::Ready(Ok(0)); } else { - Poll::Ready(Ok(count)) + let count = self.push_slice(buf); + if count == 0 { + if waker_registered { + break Poll::Pending; + } else { + self.register_read_waker(cx.waker()); + waker_registered = true; + } + } else { + break Poll::Ready(Ok(count)); + } } } }