diff --git a/async/src/traits/consumer.rs b/async/src/traits/consumer.rs index 286ac10..349b5ff 100644 --- a/async/src/traits/consumer.rs +++ b/async/src/traits/consumer.rs @@ -85,10 +85,9 @@ impl<'a, A: AsyncConsumer> Future for PopFuture<'a, A> { break Poll::Ready(None); } else { self.owner.register_write_waker(cx.waker()); - if self.owner.is_full() { - continue; + if self.owner.is_empty() && !self.owner.is_closed() { + break Poll::Pending; } - break Poll::Pending; } } } @@ -133,10 +132,9 @@ where } else { self.slice.replace(slice); self.owner.register_write_waker(cx.waker()); - if self.owner.is_full() { - continue; + if self.owner.is_empty() && !self.owner.is_closed() { + break Poll::Pending; } - break Poll::Pending; } } } @@ -164,10 +162,9 @@ impl<'a, A: AsyncConsumer> Future for WaitOccupiedFuture<'a, A> { break Poll::Ready(()); } else { self.owner.register_write_waker(cx.waker()); - if self.owner.is_full() { - continue; + if self.count > self.owner.occupied_len() && !self.owner.is_closed() { + break Poll::Pending; } - break Poll::Pending; } } } @@ -189,10 +186,9 @@ where break Poll::Ready(None); } else { self.register_write_waker(cx.waker()); - if self.is_full() { - continue; + if self.is_empty() && !self.is_closed() { + break Poll::Pending; } - break Poll::Pending; } } } @@ -213,10 +209,9 @@ where break Poll::Ready(Ok(len)); } else { self.register_write_waker(cx.waker()); - if self.is_full() { - continue; + if self.is_empty() && !self.is_closed() { + break Poll::Pending; } - break Poll::Pending; } } } diff --git a/async/src/traits/producer.rs b/async/src/traits/producer.rs index b172121..dad5d06 100644 --- a/async/src/traits/producer.rs +++ b/async/src/traits/producer.rs @@ -96,10 +96,9 @@ impl<'a, A: AsyncProducer> Future for PushFuture<'a, A> { Err(item) => { self.item.replace(item); self.owner.register_read_waker(cx.waker()); - if self.owner.is_empty() { - continue; + if self.owner.is_full() && !self.owner.is_closed() { + break Poll::Pending; } - break Poll::Pending; } Ok(()) => break Poll::Ready(Ok(())), } @@ -145,10 +144,11 @@ where } else { self.slice.replace(slice); self.owner.register_read_waker(cx.waker()); - if self.owner.is_empty() { - continue; + if self.owner.is_full() && !self.owner.is_closed() { + // should be full here, because `push_slice_all` + // has not been able to push all of the slice. + break Poll::Pending; } - break Poll::Pending; } } } @@ -180,10 +180,11 @@ impl<'a, A: AsyncProducer, I: Iterator> Future for PushIterFutur } else { self.iter.replace(iter); self.owner.register_read_waker(cx.waker()); - if self.owner.is_empty() { - continue; + if self.owner.is_full() && !self.owner.is_closed() { + // should be full here, because `push_iter_all` + // has not been able to push all of the iterator. + break Poll::Pending; } - break Poll::Pending; } } } @@ -212,10 +213,9 @@ impl<'a, A: AsyncProducer> Future for WaitVacantFuture<'a, A> { break Poll::Ready(()); } else { self.owner.register_read_waker(cx.waker()); - if self.owner.is_empty() { - continue; + if self.count > self.owner.vacant_len() && !self.owner.is_closed() { + break Poll::Pending; } - break Poll::Pending; } } } @@ -233,10 +233,9 @@ where break Poll::Ready(Err(())); } else if self.is_full() { self.register_read_waker(cx.waker()); - if self.is_empty() { - continue; + if self.is_full() && !self.is_closed() { + break Poll::Pending; } - break Poll::Pending; } else { break Poll::Ready(Ok(())); } @@ -269,10 +268,9 @@ where let count = self.push_slice(buf); if count == 0 { self.register_read_waker(cx.waker()); - if self.is_empty() { - continue; + if self.is_full() && !self.is_closed() { + break Poll::Pending; } - break Poll::Pending; } else { break Poll::Ready(Ok(count)); }