diff --git a/blocking/src/rb.rs b/blocking/src/rb.rs index fb38e91..b5d2a75 100644 --- a/blocking/src/rb.rs +++ b/blocking/src/rb.rs @@ -2,15 +2,15 @@ use crate::sync::StdSemaphore; use crate::{ sync::Semaphore, - traits::{BlockingConsumer, BlockingProducer}, wrap::{BlockingCons, BlockingProd}, }; #[cfg(feature = "alloc")] use alloc::sync::Arc; -use core::{mem::MaybeUninit, num::NonZeroUsize, time::Duration}; +use core::{mem::MaybeUninit, num::NonZeroUsize}; #[cfg(feature = "alloc")] use ringbuf::traits::Split; use ringbuf::{ + rb::traits::RbRef, storage::Storage, traits::{Consumer, Observer, Producer, RingBuffer, SplitRef}, SharedRb, @@ -25,8 +25,8 @@ pub struct BlockingRb { #[cfg(feature = "std")] pub struct BlockingRb { base: SharedRb, - read: X, - write: X, + pub(crate) read: X, + pub(crate) write: X, } impl BlockingRb { @@ -88,21 +88,6 @@ impl RingBuffer for BlockingRb { } } -impl BlockingProducer for BlockingRb { - type Instant = X::Instant; - fn wait_vacant(&self, count: usize, timeout: Option) -> bool { - debug_assert!(count <= self.capacity().get()); - self.read.wait(|| self.vacant_len() >= count, timeout) - } -} -impl BlockingConsumer for BlockingRb { - type Instant = X::Instant; - fn wait_occupied(&self, count: usize, timeout: Option) -> bool { - debug_assert!(count <= self.capacity().get()); - self.write.wait(|| self.occupied_len() >= count, timeout) - } -} - impl SplitRef for BlockingRb { type RefProd<'a> = BlockingProd<&'a Self> where Self: 'a; type RefCons<'a> = BlockingCons<&'a Self> where Self: 'a; @@ -121,3 +106,12 @@ impl Split for BlockingRb { (BlockingProd::new(arc.clone()), BlockingCons::new(arc)) } } + +pub trait BlockingRbRef: RbRef> { + type Storage: Storage; + type Semaphore: Semaphore; +} +impl>> BlockingRbRef for R { + type Storage = S; + type Semaphore = X; +} diff --git a/blocking/src/tests.rs b/blocking/src/tests.rs index 193e10e..592860a 100644 --- a/blocking/src/tests.rs +++ b/blocking/src/tests.rs @@ -66,14 +66,16 @@ fn slice_all() { let pjh = thread::spawn(move || { let bytes = smsg.as_bytes(); - assert_eq!(prod.push_slice_all(bytes, TIMEOUT), bytes.len()); - prod.push(0, TIMEOUT).unwrap(); + prod.set_timeout(TIMEOUT); + assert_eq!(prod.push_slice_all(bytes), bytes.len()); + prod.push(0).unwrap(); }); let cjh = thread::spawn(move || { let mut bytes = vec![0u8; smsg.as_bytes().len()]; - assert_eq!(cons.pop_slice_all(&mut bytes, TIMEOUT), bytes.len()); - assert_eq!(cons.pop_wait(TIMEOUT).unwrap(), 0); + cons.set_timeout(TIMEOUT); + assert_eq!(cons.pop_slice_all(&mut bytes), bytes.len()); + assert_eq!(cons.pop_wait().unwrap(), 0); String::from_utf8(bytes).unwrap() }); @@ -92,12 +94,14 @@ fn iter_all() { let smsg = THE_BOOK_FOREWORD; let pjh = thread::spawn(move || { + prod.set_timeout(TIMEOUT); let bytes = smsg.as_bytes(); - assert_eq!(prod.push_iter_all(bytes.iter().copied().chain(once(0)), TIMEOUT), bytes.len() + 1); + assert_eq!(prod.push_iter_all(bytes.iter().copied().chain(once(0))), bytes.len() + 1); }); let cjh = thread::spawn(move || { - let bytes = cons.pop_iter_all(TIMEOUT).take_while(|x| *x != 0).collect::>(); + cons.set_timeout(TIMEOUT); + let bytes = cons.pop_iter_all().take_while(|x| *x != 0).collect::>(); String::from_utf8(bytes).unwrap() }); diff --git a/blocking/src/traits.rs b/blocking/src/traits.rs index 665ddb7..177f3c4 100644 --- a/blocking/src/traits.rs +++ b/blocking/src/traits.rs @@ -8,8 +8,11 @@ pub trait BlockingProducer: Producer { fn wait_vacant(&self, count: usize, timeout: Option) -> bool; - fn push(&mut self, item: Self::Item, timeout: Option) -> Result<(), Self::Item> { - if self.wait_vacant(1, timeout) { + fn set_timeout(&mut self, timeout: Option); + fn timeout(&self) -> Option; + + fn push(&mut self, item: Self::Item) -> Result<(), Self::Item> { + if self.wait_vacant(1, self.timeout()) { assert!(self.try_push(item).is_ok()); Ok(()) } else { @@ -17,10 +20,10 @@ pub trait BlockingProducer: Producer { } } - fn push_iter_all>(&mut self, iter: I, timeout: Option) -> usize { + fn push_iter_all>(&mut self, iter: I) -> usize { let mut count = 0; let mut iter = iter.peekable(); - for timeout in TimeoutIterator::::new(timeout) { + for timeout in TimeoutIterator::::new(self.timeout()) { if iter.peek().is_none() { break; } @@ -31,12 +34,12 @@ pub trait BlockingProducer: Producer { count } - fn push_slice_all(&mut self, mut slice: &[Self::Item], timeout: Option) -> usize + fn push_slice_all(&mut self, mut slice: &[Self::Item]) -> usize where Self::Item: Copy, { let mut count = 0; - for timeout in TimeoutIterator::::new(timeout) { + for timeout in TimeoutIterator::::new(self.timeout()) { if slice.is_empty() { break; } @@ -55,24 +58,27 @@ pub trait BlockingConsumer: Consumer { fn wait_occupied(&self, count: usize, timeout: Option) -> bool; - fn pop_wait(&mut self, timeout: Option) -> Option { - if self.wait_occupied(1, timeout) { + fn set_timeout(&mut self, timeout: Option); + fn timeout(&self) -> Option; + + fn pop_wait(&mut self) -> Option { + if self.wait_occupied(1, self.timeout()) { Some(self.try_pop().unwrap()) } else { None } } - fn pop_iter_all(&mut self, timeout: Option) -> PopAllIter<'_, Self> { - PopAllIter::new(self, timeout) + fn pop_iter_all(&mut self) -> PopAllIter<'_, Self> { + PopAllIter::new(self, self.timeout()) } - fn pop_slice_all(&mut self, mut slice: &mut [Self::Item], timeout: Option) -> usize + fn pop_slice_all(&mut self, mut slice: &mut [Self::Item]) -> usize where Self::Item: Copy, { let mut count = 0; - for timeout in TimeoutIterator::::new(timeout) { + for timeout in TimeoutIterator::::new(self.timeout()) { if slice.is_empty() { break; } diff --git a/blocking/src/wrap.rs b/blocking/src/wrap.rs index 1df9aab..aed0eb0 100644 --- a/blocking/src/wrap.rs +++ b/blocking/src/wrap.rs @@ -1,22 +1,33 @@ -use crate::traits::{BlockingConsumer, BlockingProducer}; +use crate::{ + rb::BlockingRbRef, + sync::Semaphore, + traits::{BlockingConsumer, BlockingProducer}, +}; use core::time::Duration; use ringbuf::{ - rb::traits::{RbRef, ToRbRef}, - traits::{consumer::DelegateConsumer, observer::DelegateObserver, producer::DelegateProducer, Based}, + rb::traits::ToRbRef, + traits::{ + consumer::DelegateConsumer, + observer::{DelegateObserver, Observer}, + producer::DelegateProducer, + Based, + }, wrap::caching::Caching, Obs, }; -pub struct BlockingWrap { +pub struct BlockingWrap { base: Option>, + timeout: Option, } pub type BlockingProd = BlockingWrap; pub type BlockingCons = BlockingWrap; -impl BlockingWrap { +impl BlockingWrap { pub fn new(rb: R) -> Self { Self { base: Some(Caching::new(rb)), + timeout: None, } } @@ -24,7 +35,7 @@ impl BlockingWrap { self.base().observe() } } -impl Based for BlockingWrap { +impl Based for BlockingWrap { type Base = Caching; fn base(&self) -> &Self::Base { self.base.as_ref().unwrap() @@ -33,7 +44,7 @@ impl Based for BlockingWrap { self.base.as_mut().unwrap() } } -impl ToRbRef for BlockingWrap { +impl ToRbRef for BlockingWrap { type RbRef = R; fn rb_ref(&self) -> &Self::RbRef { self.base().rb_ref() @@ -43,30 +54,41 @@ impl ToRbRef for BlockingWrap { } } -impl DelegateObserver for BlockingProd {} -impl DelegateProducer for BlockingProd {} +impl DelegateObserver for BlockingProd {} +impl DelegateProducer for BlockingProd {} -impl BlockingProducer for BlockingProd -where - R::Target: BlockingProducer, -{ - type Instant = ::Instant; +impl BlockingProducer for BlockingProd { + type Instant = ::Instant; fn wait_vacant(&self, count: usize, timeout: Option) -> bool { - self.base().rb().wait_vacant(count, timeout) + let rb = self.rb(); + debug_assert!(count <= rb.capacity().get()); + rb.read.wait(|| rb.vacant_len() >= count, timeout) } -} -impl DelegateObserver for BlockingCons {} -impl DelegateConsumer for BlockingCons {} + fn set_timeout(&mut self, timeout: Option) { + self.timeout = timeout; + } + fn timeout(&self) -> Option { + self.timeout + } +} -impl BlockingConsumer for BlockingCons -where - R::Target: BlockingConsumer, -{ - type Instant = ::Instant; +impl DelegateObserver for BlockingCons {} +impl DelegateConsumer for BlockingCons {} +impl BlockingConsumer for BlockingCons { + type Instant = ::Instant; fn wait_occupied(&self, count: usize, timeout: Option) -> bool { - self.base().rb().wait_occupied(count, timeout) + let rb = self.rb(); + debug_assert!(count <= rb.capacity().get()); + rb.write.wait(|| rb.occupied_len() >= count, timeout) + } + + fn set_timeout(&mut self, timeout: Option) { + self.timeout = timeout; + } + fn timeout(&self) -> Option { + self.timeout } }