Skip to content

Commit

Permalink
fix missing waker deadlock
Browse files Browse the repository at this point in the history
  • Loading branch information
Congyuwang committed Jul 27, 2023
1 parent c708cfa commit e7a7480
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 105 deletions.
127 changes: 76 additions & 51 deletions async/src/traits/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,21 +70,26 @@ impl<'a, A: AsyncConsumer> Future for PopFuture<'a, A> {
type Output = Option<A::Item>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
assert!(!self.done);
let closed = self.owner.is_closed();
#[cfg(feature = "std")]
std::println!("PopFuture::poll: closed={}", closed);
match self.owner.try_pop() {
Some(item) => {
self.done = true;
Poll::Ready(Some(item))
}
None => {
if closed {
Poll::Ready(None)
} else {
self.owner.register_write_waker(cx.waker());
Poll::Pending
loop {
assert!(!self.done);
let closed = self.owner.is_closed();
#[cfg(feature = "std")]
std::println!("PopFuture::poll: closed={}", closed);
match self.owner.try_pop() {
Some(item) => {
self.done = true;
break Poll::Ready(Some(item));
}
None => {
if closed {
break Poll::Ready(None);
} else {
self.owner.register_write_waker(cx.waker());
if self.owner.is_full() {
continue;
}
break Poll::Pending;
}
}
}
}
Expand Down Expand Up @@ -115,19 +120,24 @@ where
type Output = Result<(), usize>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let closed = self.owner.is_closed();
let mut slice = self.slice.take().unwrap();
let len = self.owner.pop_slice(slice);
slice = &mut slice[len..];
self.count += len;
if slice.is_empty() {
Poll::Ready(Ok(()))
} else if closed {
Poll::Ready(Err(self.count))
} else {
self.slice.replace(slice);
self.owner.register_write_waker(cx.waker());
Poll::Pending
loop {
let closed = self.owner.is_closed();
let mut slice = self.slice.take().unwrap();
let len = self.owner.pop_slice(slice);
slice = &mut slice[len..];
self.count += len;
if slice.is_empty() {
break Poll::Ready(Ok(()));
} else if closed {
break Poll::Ready(Err(self.count));
} else {
self.slice.replace(slice);
self.owner.register_write_waker(cx.waker());
if self.owner.is_full() {
continue;
}
break Poll::Pending;
}
}
}
}
Expand All @@ -147,13 +157,18 @@ impl<'a, A: AsyncConsumer> Future for WaitOccupiedFuture<'a, A> {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
assert!(!self.done);
let closed = self.owner.is_closed();
if self.count <= self.owner.occupied_len() || closed {
Poll::Ready(())
} else {
self.owner.register_write_waker(cx.waker());
Poll::Pending
loop {
assert!(!self.done);
let closed = self.owner.is_closed();
if self.count <= self.owner.occupied_len() || closed {
break Poll::Ready(());
} else {
self.owner.register_write_waker(cx.waker());
if self.owner.is_full() {
continue;
}
break Poll::Pending;
}
}
}
}
Expand All @@ -165,15 +180,20 @@ where
type Item = <R::Target as Observer>::Item;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let closed = self.is_closed();
match self.try_pop() {
Some(item) => Poll::Ready(Some(item)),
None => {
if closed {
Poll::Ready(None)
} else {
self.register_write_waker(cx.waker());
Poll::Pending
loop {
let closed = self.is_closed();
match self.try_pop() {
Some(item) => break Poll::Ready(Some(item)),
None => {
if closed {
break Poll::Ready(None);
} else {
self.register_write_waker(cx.waker());
if self.is_full() {
continue;
}
break Poll::Pending;
}
}
}
}
Expand All @@ -186,13 +206,18 @@ where
R::Target: AsyncRingBuffer<Item = u8>,
{
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
let closed = self.is_closed();
let len = self.pop_slice(buf);
if len != 0 || closed {
Poll::Ready(Ok(len))
} else {
self.register_write_waker(cx.waker());
Poll::Pending
loop {
let closed = self.is_closed();
let len = self.pop_slice(buf);
if len != 0 || closed {
break Poll::Ready(Ok(len));
} else {
self.register_write_waker(cx.waker());
if self.is_full() {
continue;
}
break Poll::Pending;
}
}
}
}
138 changes: 84 additions & 54 deletions async/src/traits/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,22 @@ impl<'a, A: AsyncProducer> Future for PushFuture<'a, A> {
type Output = Result<(), A::Item>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let item = self.item.take().unwrap();
if self.owner.is_closed() {
Poll::Ready(Err(item))
} else {
match self.owner.try_push(item) {
Err(item) => {
self.item.replace(item);
self.owner.register_read_waker(cx.waker());
Poll::Pending
loop {
let item = self.item.take().unwrap();
if self.owner.is_closed() {
break Poll::Ready(Err(item));
} else {
match self.owner.try_push(item) {
Err(item) => {
self.item.replace(item);
self.owner.register_read_waker(cx.waker());
if self.owner.is_empty() {
continue;
}
break Poll::Pending;
}
Ok(()) => break Poll::Ready(Ok(())),
}
Ok(()) => Poll::Ready(Ok(())),
}
}
}
Expand Down Expand Up @@ -127,19 +132,24 @@ where
type Output = Result<(), usize>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut slice = self.slice.take().unwrap();
if self.owner.is_closed() {
Poll::Ready(Err(self.count))
} else {
let len = self.owner.push_slice(slice);
slice = &slice[len..];
self.count += len;
if slice.is_empty() {
Poll::Ready(Ok(()))
loop {
let mut slice = self.slice.take().unwrap();
if self.owner.is_closed() {
break Poll::Ready(Err(self.count));
} else {
self.slice.replace(slice);
self.owner.register_read_waker(cx.waker());
Poll::Pending
let len = self.owner.push_slice(slice);
slice = &slice[len..];
self.count += len;
if slice.is_empty() {
break Poll::Ready(Ok(()));
} else {
self.slice.replace(slice);
self.owner.register_read_waker(cx.waker());
if self.owner.is_empty() {
continue;
}
break Poll::Pending;
}
}
}
}
Expand All @@ -159,17 +169,22 @@ impl<'a, A: AsyncProducer, I: Iterator<Item = A::Item>> Future for PushIterFutur
type Output = bool;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut iter = self.iter.take().unwrap();
if self.owner.is_closed() {
Poll::Ready(false)
} else {
self.owner.push_iter(&mut iter);
if iter.peek().is_none() {
Poll::Ready(true)
loop {
let mut iter = self.iter.take().unwrap();
if self.owner.is_closed() {
break Poll::Ready(false);
} else {
self.iter.replace(iter);
self.owner.register_read_waker(cx.waker());
Poll::Pending
self.owner.push_iter(&mut iter);
if iter.peek().is_none() {
break Poll::Ready(true);
} else {
self.iter.replace(iter);
self.owner.register_read_waker(cx.waker());
if self.owner.is_empty() {
continue;
}
break Poll::Pending;
}
}
}
}
Expand All @@ -190,13 +205,18 @@ impl<'a, A: AsyncProducer> Future for WaitVacantFuture<'a, A> {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
assert!(!self.done);
let closed = self.owner.is_closed();
if self.count <= self.owner.vacant_len() || closed {
Poll::Ready(())
} else {
self.owner.register_read_waker(cx.waker());
Poll::Pending
loop {
assert!(!self.done);
let closed = self.owner.is_closed();
if self.count <= self.owner.vacant_len() || closed {
break Poll::Ready(());
} else {
self.owner.register_read_waker(cx.waker());
if self.owner.is_empty() {
continue;
}
break Poll::Pending;
}
}
}
}
Expand All @@ -208,13 +228,18 @@ where
type Error = ();

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.is_closed() {
Poll::Ready(Err(()))
} else if self.is_full() {
self.register_read_waker(cx.waker());
Poll::Pending
} else {
Poll::Ready(Ok(()))
loop {
if self.is_closed() {
break Poll::Ready(Err(()));
} else if self.is_full() {
self.register_read_waker(cx.waker());
if self.is_empty() {
continue;
}
break Poll::Pending;
} else {
break Poll::Ready(Ok(()));
}
}
}
fn start_send(mut self: Pin<&mut Self>, item: <R::Target as Observer>::Item) -> Result<(), Self::Error> {
Expand All @@ -237,15 +262,20 @@ where
R::Target: AsyncRingBuffer<Item = u8>,
{
fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
if self.is_closed() {
Poll::Ready(Ok(0))
} else {
let count = self.push_slice(buf);
if count == 0 {
self.register_read_waker(cx.waker());
Poll::Pending
loop {
if self.is_closed() {
break Poll::Ready(Ok(0));
} else {
Poll::Ready(Ok(count))
let count = self.push_slice(buf);
if count == 0 {
self.register_read_waker(cx.waker());
if self.is_empty() {
continue;
}
break Poll::Pending;
} else {
break Poll::Ready(Ok(count));
}
}
}
}
Expand Down

0 comments on commit e7a7480

Please sign in to comment.