Skip to content

Commit

Permalink
Merge pull request #32 from Congyuwang/master
Browse files Browse the repository at this point in the history
Fix an Error in Previous PR
  • Loading branch information
agerasev authored Jul 31, 2023
2 parents 87f47e8 + 3a3f6e8 commit 3115d57
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 126 deletions.
2 changes: 1 addition & 1 deletion async/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub use rb::AsyncRb;
pub use traits::{consumer, producer};
pub use transfer::async_transfer;

#[cfg(all(test, feature = "std"))]
#[cfg(test)]
mod tests;

#[cfg(all(test, feature = "bench"))]
Expand Down
8 changes: 6 additions & 2 deletions async/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use crate::{async_transfer, traits::*, AsyncHeapRb};
use alloc::vec::Vec;
use core::sync::atomic::{AtomicUsize, Ordering};
use futures::task::{noop_waker_ref, AtomicWaker};
#[cfg(feature = "std")]
use std::sync::Arc;
use std::{vec, vec::Vec};

#[test]
fn atomic_waker() {
Expand Down Expand Up @@ -59,7 +60,7 @@ fn push_pop_slice() {
},
async move {
let mut cons = cons;
let mut data = vec![0; COUNT + 1];
let mut data = [0; COUNT + 1];
let count = cons.pop_slice_all(&mut data).await.unwrap_err();
assert_eq!(count, COUNT);
assert!(data.into_iter().take(COUNT).eq(0..COUNT));
Expand Down Expand Up @@ -94,6 +95,7 @@ fn sink_stream() {
);
}

#[cfg(feature = "std")]
#[test]
fn read_write() {
use futures::{AsyncReadExt, AsyncWriteExt};
Expand Down Expand Up @@ -167,6 +169,7 @@ fn wait() {
);
}

#[cfg(feature = "std")]
#[test]
fn drop_close_prod() {
let (prod, mut cons) = AsyncHeapRb::<usize>::new(1).split();
Expand All @@ -189,6 +192,7 @@ fn drop_close_prod() {
t1.join().unwrap();
}

#[cfg(feature = "std")]
#[test]
fn drop_close_cons() {
let (mut prod, mut cons) = AsyncHeapRb::<usize>::new(1).split();
Expand Down
94 changes: 39 additions & 55 deletions async/src/traits/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,25 +74,18 @@ impl<'a, A: AsyncConsumer> Future for PopFuture<'a, A> {
loop {
assert!(!self.done);
let closed = self.owner.is_closed();
#[cfg(feature = "std")]
match self.owner.try_pop() {
Some(item) => {
self.done = true;
break Poll::Ready(Some(item));
}
None => {
if closed {
break Poll::Ready(None);
} else {
if waker_registered {
break Poll::Pending;
} else {
self.owner.register_write_waker(cx.waker());
waker_registered = true;
}
}
}
if let Some(item) = self.owner.try_pop() {
self.done = true;
break Poll::Ready(Some(item));
}
if closed {
break Poll::Ready(None);
}
if waker_registered {
break Poll::Pending;
}
self.owner.register_write_waker(cx.waker());
waker_registered = true;
}
}
}
Expand Down Expand Up @@ -130,17 +123,16 @@ where
self.count += len;
if slice.is_empty() {
break Poll::Ready(Ok(()));
} else if closed {
}
if closed {
break Poll::Ready(Err(self.count));
} else {
self.slice.replace(slice);
if waker_registered {
break Poll::Pending;
} else {
self.owner.register_write_waker(cx.waker());
waker_registered = true;
}
}
self.slice.replace(slice);
if waker_registered {
break Poll::Pending;
}
self.owner.register_write_waker(cx.waker());
waker_registered = true;
}
}
}
Expand All @@ -166,14 +158,12 @@ impl<'a, A: AsyncConsumer> Future for WaitOccupiedFuture<'a, A> {
let closed = self.owner.is_closed();
if self.count <= self.owner.occupied_len() || closed {
break Poll::Ready(());
} else {
if waker_registered {
break Poll::Pending;
} else {
self.owner.register_write_waker(cx.waker());
waker_registered = true;
}
}
if waker_registered {
break Poll::Pending;
}
self.owner.register_write_waker(cx.waker());
waker_registered = true;
}
}
}
Expand All @@ -188,21 +178,17 @@ where
let mut waker_registered = false;
loop {
let closed = self.is_closed();
match self.try_pop() {
Some(item) => break Poll::Ready(Some(item)),
None => {
if closed {
break Poll::Ready(None);
} else {
if waker_registered {
break Poll::Pending;
} else {
self.register_write_waker(cx.waker());
waker_registered = true;
}
}
}
if let Some(item) = self.try_pop() {
break Poll::Ready(Some(item));
}
if closed {
break Poll::Ready(None);
}
if waker_registered {
break Poll::Pending;
}
self.register_write_waker(cx.waker());
waker_registered = true;
}
}
}
Expand All @@ -219,14 +205,12 @@ where
let len = self.pop_slice(buf);
if len != 0 || closed {
break Poll::Ready(Ok(len));
} else {
if waker_registered {
break Poll::Pending;
} else {
self.register_write_waker(cx.waker());
waker_registered = true;
}
}
if waker_registered {
break Poll::Pending;
}
self.register_write_waker(cx.waker());
waker_registered = true;
}
}
}
121 changes: 53 additions & 68 deletions async/src/traits/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,20 +92,17 @@ impl<'a, A: AsyncProducer> Future for PushFuture<'a, A> {
let item = self.item.take().unwrap();
if self.owner.is_closed() {
break Poll::Ready(Err(item));
} else {
match self.owner.try_push(item) {
Err(item) => {
self.item.replace(item);
if waker_registered {
break Poll::Pending;
} else {
self.owner.register_read_waker(cx.waker());
waker_registered = true;
}
}
Ok(()) => break Poll::Ready(Ok(())),
}
}
let push_result = self.owner.try_push(item);
if push_result.is_ok() {
break Poll::Ready(Ok(()));
}
self.item.replace(push_result.unwrap_err());
if waker_registered {
break Poll::Pending;
}
self.owner.register_read_waker(cx.waker());
waker_registered = true;
}
}
}
Expand Down Expand Up @@ -139,22 +136,19 @@ where
let mut slice = self.slice.take().unwrap();
if self.owner.is_closed() {
break Poll::Ready(Err(self.count));
} else {
let len = self.owner.push_slice(slice);
slice = &slice[len..];
self.count += len;
if slice.is_empty() {
break Poll::Ready(Ok(()));
} else {
self.slice.replace(slice);
if waker_registered {
break Poll::Pending;
} else {
self.owner.register_read_waker(cx.waker());
waker_registered = true;
}
}
}
let len = self.owner.push_slice(slice);
slice = &slice[len..];
self.count += len;
if slice.is_empty() {
break Poll::Ready(Ok(()));
}
self.slice.replace(slice);
if waker_registered {
break Poll::Pending;
}
self.owner.register_read_waker(cx.waker());
waker_registered = true;
}
}
}
Expand All @@ -178,20 +172,17 @@ impl<'a, A: AsyncProducer, I: Iterator<Item = A::Item>> Future for PushIterFutur
let mut iter = self.iter.take().unwrap();
if self.owner.is_closed() {
break Poll::Ready(false);
} else {
self.owner.push_iter(&mut iter);
if iter.peek().is_none() {
break Poll::Ready(true);
} else {
self.iter.replace(iter);
if waker_registered {
break Poll::Pending;
} else {
self.owner.register_read_waker(cx.waker());
waker_registered = true;
}
}
}
self.owner.push_iter(&mut iter);
if iter.peek().is_none() {
break Poll::Ready(true);
}
self.iter.replace(iter);
if waker_registered {
break Poll::Pending;
}
self.owner.register_read_waker(cx.waker());
waker_registered = true;
}
}
}
Expand All @@ -217,14 +208,12 @@ impl<'a, A: AsyncProducer> Future for WaitVacantFuture<'a, A> {
let closed = self.owner.is_closed();
if self.count <= self.owner.vacant_len() || closed {
break Poll::Ready(());
} else {
if waker_registered {
break Poll::Pending;
} else {
self.owner.register_read_waker(cx.waker());
waker_registered = true;
}
}
if waker_registered {
break Poll::Pending;
}
self.owner.register_read_waker(cx.waker());
waker_registered = true;
}
}
}
Expand All @@ -240,16 +229,15 @@ where
loop {
if self.is_closed() {
break Poll::Ready(Err(()));
} else if !self.is_full() {
}
if !self.is_full() {
break Poll::Ready(Ok(()));
} else {
if waker_registered {
break Poll::Pending;
} else {
self.register_read_waker(cx.waker());
waker_registered = true;
}
}
if waker_registered {
break Poll::Pending;
}
self.register_read_waker(cx.waker());
waker_registered = true;
}
}
fn start_send(mut self: Pin<&mut Self>, item: <R::Target as Observer>::Item) -> Result<(), Self::Error> {
Expand All @@ -276,19 +264,16 @@ where
loop {
if self.is_closed() {
break Poll::Ready(Ok(0));
} else {
let count = self.push_slice(buf);
if count == 0 {
if waker_registered {
break Poll::Pending;
} else {
self.register_read_waker(cx.waker());
waker_registered = true;
}
} else {
break Poll::Ready(Ok(count));
}
}
let count = self.push_slice(buf);
if count > 0 {
break Poll::Ready(Ok(count));
}
if waker_registered {
break Poll::Pending;
}
self.register_read_waker(cx.waker());
waker_registered = true;
}
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Expand Down
1 change: 1 addition & 0 deletions test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ cargo check --no-default-features --features alloc && \
cargo check --no-default-features && \
cd async && \
cargo test && \
cargo test --no-default-features --features alloc && \
cargo check --no-default-features --features alloc && \
cargo check --no-default-features && \
cd ../blocking && \
Expand Down

0 comments on commit 3115d57

Please sign in to comment.