From 280ec309aa8efa1c1c0b151df77ecfc3e5dba89c Mon Sep 17 00:00:00 2001 From: Congyuwang Date: Sat, 29 Jul 2023 11:26:28 +0800 Subject: [PATCH 1/5] wake the other side on closing async consumer/producer --- async/src/halves.rs | 6 ++++-- async/src/rb.rs | 9 ++++++++- async/src/traits/mod.rs | 4 ++-- async/src/traits/ring_buffer.rs | 8 ++++++++ 4 files changed, 22 insertions(+), 5 deletions(-) diff --git a/async/src/halves.rs b/async/src/halves.rs index a1a5090..bce7ff9 100644 --- a/async/src/halves.rs +++ b/async/src/halves.rs @@ -149,7 +149,8 @@ where self.base.rb().is_closed() } fn close(&self) { - self.base.rb().close() + self.base.rb().close(); + self.base.rb().wake_reader(); } } impl AsyncProducer for AsyncProd @@ -181,7 +182,8 @@ where self.base.rb().is_closed() } fn close(&self) { - self.base.rb().close() + self.base.rb().close(); + self.base.rb().wake_writer(); } } impl AsyncConsumer for AsyncCons diff --git a/async/src/rb.rs b/async/src/rb.rs index 06f8eed..9d964b9 100644 --- a/async/src/rb.rs +++ b/async/src/rb.rs @@ -81,7 +81,14 @@ impl AsyncConsumer for AsyncRb { self.write.register(waker); } } -impl AsyncRingBuffer for AsyncRb {} +impl AsyncRingBuffer for AsyncRb { + fn wake_writer(&self) { + self.write.wake(); + } + fn wake_reader(&self) { + self.read.wake() + } +} impl SplitRef for AsyncRb { type RefProd<'a> = AsyncProd<&'a Self> where Self: 'a; diff --git a/async/src/traits/mod.rs b/async/src/traits/mod.rs index a3c48a6..aab30cd 100644 --- a/async/src/traits/mod.rs +++ b/async/src/traits/mod.rs @@ -1,11 +1,11 @@ pub mod consumer; pub mod observer; pub mod producer; +pub mod ring_buffer; pub use consumer::AsyncConsumer; pub use observer::AsyncObserver; pub use producer::AsyncProducer; - -pub trait AsyncRingBuffer: ringbuf::traits::RingBuffer + AsyncProducer + AsyncConsumer {} +pub use ring_buffer::AsyncRingBuffer; pub use ringbuf::traits::*; diff --git a/async/src/traits/ring_buffer.rs b/async/src/traits/ring_buffer.rs index e69de29..e1cf155 100644 --- a/async/src/traits/ring_buffer.rs +++ b/async/src/traits/ring_buffer.rs @@ -0,0 +1,8 @@ +use crate::consumer::AsyncConsumer; +use crate::producer::AsyncProducer; +use ringbuf::traits::RingBuffer; + +pub trait AsyncRingBuffer: RingBuffer + AsyncProducer + AsyncConsumer { + fn wake_writer(&self); + fn wake_reader(&self); +} From 87b91baed2651ee1972bbb6c476ad4ef359da24b Mon Sep 17 00:00:00 2001 From: Congyuwang Date: Sat, 29 Jul 2023 11:26:51 +0800 Subject: [PATCH 2/5] add a test for drop close --- async/src/tests.rs | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/async/src/tests.rs b/async/src/tests.rs index 3bc2159..cc4ce7a 100644 --- a/async/src/tests.rs +++ b/async/src/tests.rs @@ -168,3 +168,42 @@ fn wait() { }, ); } + +#[test] +fn drop_close_prod() { + let (prod, mut cons) = AsyncHeapRb::::new(1).split(); + let stage = AtomicUsize::new(0); + execute!( + async { + drop(prod); + assert_eq!(stage.fetch_add(1, Ordering::SeqCst), 0); + }, + async { + cons.wait_occupied(1).await; + assert_eq!(stage.fetch_add(1, Ordering::SeqCst), 1); + assert!(cons.is_closed()); + }, + ); +} + +#[test] +fn drop_close_cons() { + let (mut prod, mut cons) = AsyncHeapRb::::new(1).split(); + let stage = AtomicUsize::new(0); + execute!( + async { + prod.push(0).await.unwrap(); + assert_eq!(stage.fetch_add(1, Ordering::SeqCst), 0); + + prod.wait_vacant(1).await; + assert_eq!(stage.fetch_add(1, Ordering::SeqCst), 3); + assert!(prod.is_closed()); + }, + async { + cons.wait_occupied(1).await; + assert_eq!(stage.fetch_add(1, Ordering::SeqCst), 1); + drop(cons); + assert_eq!(stage.fetch_add(1, Ordering::SeqCst), 2); + }, + ); +} From de2b0a4a809ec6e58c6943decf6c23ca32d44167 Mon Sep 17 00:00:00 2001 From: Congyuwang Date: Sat, 29 Jul 2023 11:27:00 +0800 Subject: [PATCH 3/5] cargo fmt --- src/transfer.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/transfer.rs b/src/transfer.rs index d284e3d..19f4da4 100644 --- a/src/transfer.rs +++ b/src/transfer.rs @@ -6,11 +6,7 @@ use crate::{consumer::Consumer, producer::Producer}; /// `count` is the number of items being moved, if `None` - as much as possible items will be moved. /// /// Returns number of items been moved. -pub fn transfer, P: Producer>( - src: &mut C, - dst: &mut P, - count: Option, -) -> usize { +pub fn transfer, P: Producer>(src: &mut C, dst: &mut P, count: Option) -> usize { let (src_left, src_right) = src.occupied_slices(); let (dst_left, dst_right) = dst.vacant_slices_mut(); let src_iter = src_left.iter().chain(src_right.iter()); From a7fad25c07ea436ab885a2aacf00fabb116a5990 Mon Sep 17 00:00:00 2001 From: Congyuwang Date: Sat, 29 Jul 2023 12:09:47 +0800 Subject: [PATCH 4/5] fix wrong waking and renaming for clarity --- async/src/halves.rs | 4 ++-- async/src/rb.rs | 6 +++--- async/src/traits/ring_buffer.rs | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/async/src/halves.rs b/async/src/halves.rs index bce7ff9..13f820b 100644 --- a/async/src/halves.rs +++ b/async/src/halves.rs @@ -150,7 +150,7 @@ where } fn close(&self) { self.base.rb().close(); - self.base.rb().wake_reader(); + self.base.rb().wake_consumer(); } } impl AsyncProducer for AsyncProd @@ -183,7 +183,7 @@ where } fn close(&self) { self.base.rb().close(); - self.base.rb().wake_writer(); + self.base.rb().wake_producer(); } } impl AsyncConsumer for AsyncCons diff --git a/async/src/rb.rs b/async/src/rb.rs index 9d964b9..79c102b 100644 --- a/async/src/rb.rs +++ b/async/src/rb.rs @@ -82,10 +82,10 @@ impl AsyncConsumer for AsyncRb { } } impl AsyncRingBuffer for AsyncRb { - fn wake_writer(&self) { - self.write.wake(); + fn wake_consumer(&self) { + self.write.wake() } - fn wake_reader(&self) { + fn wake_producer(&self) { self.read.wake() } } diff --git a/async/src/traits/ring_buffer.rs b/async/src/traits/ring_buffer.rs index e1cf155..8718a4d 100644 --- a/async/src/traits/ring_buffer.rs +++ b/async/src/traits/ring_buffer.rs @@ -3,6 +3,6 @@ use crate::producer::AsyncProducer; use ringbuf::traits::RingBuffer; pub trait AsyncRingBuffer: RingBuffer + AsyncProducer + AsyncConsumer { - fn wake_writer(&self); - fn wake_reader(&self); + fn wake_producer(&self); + fn wake_consumer(&self); } From 4aa2f34971d11ebfa60299a828e4a390d05fa9a4 Mon Sep 17 00:00:00 2001 From: Congyuwang Date: Sat, 29 Jul 2023 12:28:15 +0800 Subject: [PATCH 5/5] make executor poll the test tasks independently --- async/src/tests.rs | 45 ++++++++++++++++++++++++++++----------------- 1 file changed, 28 insertions(+), 17 deletions(-) diff --git a/async/src/tests.rs b/async/src/tests.rs index cc4ce7a..f96e0d4 100644 --- a/async/src/tests.rs +++ b/async/src/tests.rs @@ -1,6 +1,7 @@ use crate::{async_transfer, traits::*, AsyncHeapRb}; use core::sync::atomic::{AtomicUsize, Ordering}; use futures::task::{noop_waker_ref, AtomicWaker}; +use std::sync::Arc; use std::{vec, vec::Vec}; #[test] @@ -172,38 +173,48 @@ fn wait() { #[test] fn drop_close_prod() { let (prod, mut cons) = AsyncHeapRb::::new(1).split(); - let stage = AtomicUsize::new(0); - execute!( - async { + let stage = Arc::new(AtomicUsize::new(0)); + let stage_clone = stage.clone(); + let t0 = std::thread::spawn(move || { + execute!(async { drop(prod); assert_eq!(stage.fetch_add(1, Ordering::SeqCst), 0); - }, - async { + }); + }); + let t1 = std::thread::spawn(move || { + execute!(async { cons.wait_occupied(1).await; - assert_eq!(stage.fetch_add(1, Ordering::SeqCst), 1); + assert_eq!(stage_clone.fetch_add(1, Ordering::SeqCst), 1); assert!(cons.is_closed()); - }, - ); + }); + }); + t0.join().unwrap(); + t1.join().unwrap(); } #[test] fn drop_close_cons() { let (mut prod, mut cons) = AsyncHeapRb::::new(1).split(); - let stage = AtomicUsize::new(0); - execute!( - async { + let stage = Arc::new(AtomicUsize::new(0)); + let stage_clone = stage.clone(); + let t0 = std::thread::spawn(move || { + execute!(async { prod.push(0).await.unwrap(); assert_eq!(stage.fetch_add(1, Ordering::SeqCst), 0); prod.wait_vacant(1).await; assert_eq!(stage.fetch_add(1, Ordering::SeqCst), 3); assert!(prod.is_closed()); - }, - async { + }); + }); + let t1 = std::thread::spawn(move || { + execute!(async { cons.wait_occupied(1).await; - assert_eq!(stage.fetch_add(1, Ordering::SeqCst), 1); + assert_eq!(stage_clone.fetch_add(1, Ordering::SeqCst), 1); drop(cons); - assert_eq!(stage.fetch_add(1, Ordering::SeqCst), 2); - }, - ); + assert_eq!(stage_clone.fetch_add(1, Ordering::SeqCst), 2); + }); + }); + t0.join().unwrap(); + t1.join().unwrap(); }