Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance: register atomic waker lazily in AsyncRingBuf #28

Merged
merged 7 commits into from
Jul 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions async/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ fn drop_close_prod() {
let stage_clone = stage.clone();
let t0 = std::thread::spawn(move || {
execute!(async {
drop(prod);
assert_eq!(stage.fetch_add(1, Ordering::SeqCst), 0);
drop(prod);
});
});
let t1 = std::thread::spawn(move || {
Expand All @@ -199,11 +199,11 @@ fn drop_close_cons() {
let stage_clone = stage.clone();
let t0 = std::thread::spawn(move || {
execute!(async {
prod.push(0).await.unwrap();
assert_eq!(stage.fetch_add(1, Ordering::SeqCst), 0);
prod.push(0).await.unwrap();

prod.wait_vacant(1).await;
assert_eq!(stage.fetch_add(1, Ordering::SeqCst), 3);
assert_eq!(stage.fetch_add(1, Ordering::SeqCst), 2);
assert!(prod.is_closed());
});
});
Expand All @@ -212,7 +212,6 @@ fn drop_close_cons() {
cons.wait_occupied(1).await;
assert_eq!(stage_clone.fetch_add(1, Ordering::SeqCst), 1);
drop(cons);
assert_eq!(stage_clone.fetch_add(1, Ordering::SeqCst), 2);
});
});
t0.join().unwrap();
Expand Down
137 changes: 86 additions & 51 deletions async/src/traits/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,21 +70,28 @@ impl<'a, A: AsyncConsumer> Future for PopFuture<'a, A> {
type Output = Option<A::Item>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
assert!(!self.done);
self.owner.register_write_waker(cx.waker());
let closed = self.owner.is_closed();
#[cfg(feature = "std")]
std::println!("PopFuture::poll: closed={}", closed);
match self.owner.try_pop() {
Some(item) => {
self.done = true;
Poll::Ready(Some(item))
}
None => {
if closed {
Poll::Ready(None)
} else {
Poll::Pending
let mut waker_registered = false;
loop {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice solution, checking ring buffer in a loop really eliminates the need to register waker in advance.

assert!(!self.done);
let closed = self.owner.is_closed();
#[cfg(feature = "std")]
std::println!("PopFuture::poll: closed={}", closed);
match self.owner.try_pop() {
Some(item) => {
self.done = true;
break Poll::Ready(Some(item));
}
None => {
if closed {
break Poll::Ready(None);
} else {
if waker_registered {
break Poll::Pending;
} else {
self.owner.register_write_waker(cx.waker());
waker_registered = true;
}
}
}
}
}
Expand Down Expand Up @@ -115,19 +122,26 @@ where
type Output = Result<(), usize>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.owner.register_write_waker(cx.waker());
let closed = self.owner.is_closed();
let mut slice = self.slice.take().unwrap();
let len = self.owner.pop_slice(slice);
slice = &mut slice[len..];
self.count += len;
if slice.is_empty() {
Poll::Ready(Ok(()))
} else if closed {
Poll::Ready(Err(self.count))
} else {
self.slice.replace(slice);
Poll::Pending
let mut waker_registered = false;
loop {
let closed = self.owner.is_closed();
let mut slice = self.slice.take().unwrap();
let len = self.owner.pop_slice(slice);
slice = &mut slice[len..];
self.count += len;
if slice.is_empty() {
break Poll::Ready(Ok(()));
} else if closed {
break Poll::Ready(Err(self.count));
} else {
self.slice.replace(slice);
if waker_registered {
break Poll::Pending;
} else {
self.owner.register_write_waker(cx.waker());
waker_registered = true;
}
}
}
}
}
Expand All @@ -147,13 +161,20 @@ impl<'a, A: AsyncConsumer> Future for WaitOccupiedFuture<'a, A> {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
assert!(!self.done);
self.owner.register_write_waker(cx.waker());
let closed = self.owner.is_closed();
if self.count <= self.owner.occupied_len() || closed {
Poll::Ready(())
} else {
Poll::Pending
let mut waker_registered = false;
loop {
assert!(!self.done);
let closed = self.owner.is_closed();
if self.count <= self.owner.occupied_len() || closed {
break Poll::Ready(());
} else {
if waker_registered {
break Poll::Pending;
} else {
self.owner.register_write_waker(cx.waker());
waker_registered = true;
}
}
}
}
}
Expand All @@ -165,15 +186,22 @@ where
type Item = <R::Target as Observer>::Item;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.register_write_waker(cx.waker());
let closed = self.is_closed();
match self.try_pop() {
Some(item) => Poll::Ready(Some(item)),
None => {
if closed {
Poll::Ready(None)
} else {
Poll::Pending
let mut waker_registered = false;
loop {
let closed = self.is_closed();
match self.try_pop() {
Some(item) => break Poll::Ready(Some(item)),
None => {
if closed {
break Poll::Ready(None);
} else {
if waker_registered {
break Poll::Pending;
} else {
self.register_write_waker(cx.waker());
waker_registered = true;
}
}
}
}
}
Expand All @@ -186,13 +214,20 @@ where
R::Target: AsyncRingBuffer<Item = u8>,
{
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
self.register_write_waker(cx.waker());
let closed = self.is_closed();
let len = self.pop_slice(buf);
if len != 0 || closed {
Poll::Ready(Ok(len))
} else {
Poll::Pending
let mut waker_registered = false;
loop {
let closed = self.is_closed();
let len = self.pop_slice(buf);
if len != 0 || closed {
break Poll::Ready(Ok(len));
} else {
if waker_registered {
break Poll::Pending;
} else {
self.register_write_waker(cx.waker());
waker_registered = true;
}
}
}
}
}
150 changes: 96 additions & 54 deletions async/src/traits/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,24 @@ impl<'a, A: AsyncProducer> Future for PushFuture<'a, A> {
type Output = Result<(), A::Item>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.owner.register_read_waker(cx.waker());
let item = self.item.take().unwrap();
if self.owner.is_closed() {
Poll::Ready(Err(item))
} else {
match self.owner.try_push(item) {
Err(item) => {
self.item.replace(item);
Poll::Pending
let mut waker_registered = false;
loop {
let item = self.item.take().unwrap();
if self.owner.is_closed() {
break Poll::Ready(Err(item));
} else {
match self.owner.try_push(item) {
Err(item) => {
self.item.replace(item);
if waker_registered {
break Poll::Pending;
} else {
self.owner.register_read_waker(cx.waker());
waker_registered = true;
}
}
Ok(()) => break Poll::Ready(Ok(())),
}
Ok(()) => Poll::Ready(Ok(())),
}
}
}
Expand Down Expand Up @@ -127,19 +134,26 @@ where
type Output = Result<(), usize>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.owner.register_read_waker(cx.waker());
let mut slice = self.slice.take().unwrap();
if self.owner.is_closed() {
Poll::Ready(Err(self.count))
} else {
let len = self.owner.push_slice(slice);
slice = &slice[len..];
self.count += len;
if slice.is_empty() {
Poll::Ready(Ok(()))
let mut waker_registered = false;
loop {
let mut slice = self.slice.take().unwrap();
if self.owner.is_closed() {
break Poll::Ready(Err(self.count));
} else {
self.slice.replace(slice);
Poll::Pending
let len = self.owner.push_slice(slice);
slice = &slice[len..];
self.count += len;
if slice.is_empty() {
break Poll::Ready(Ok(()));
} else {
self.slice.replace(slice);
if waker_registered {
break Poll::Pending;
} else {
self.owner.register_read_waker(cx.waker());
waker_registered = true;
}
}
}
}
}
Expand All @@ -159,17 +173,24 @@ impl<'a, A: AsyncProducer, I: Iterator<Item = A::Item>> Future for PushIterFutur
type Output = bool;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.owner.register_read_waker(cx.waker());
let mut iter = self.iter.take().unwrap();
if self.owner.is_closed() {
Poll::Ready(false)
} else {
self.owner.push_iter(&mut iter);
if iter.peek().is_none() {
Poll::Ready(true)
let mut waker_registered = false;
loop {
let mut iter = self.iter.take().unwrap();
if self.owner.is_closed() {
break Poll::Ready(false);
} else {
self.iter.replace(iter);
Poll::Pending
self.owner.push_iter(&mut iter);
if iter.peek().is_none() {
break Poll::Ready(true);
} else {
self.iter.replace(iter);
if waker_registered {
break Poll::Pending;
} else {
self.owner.register_read_waker(cx.waker());
waker_registered = true;
}
}
}
}
}
Expand All @@ -190,13 +211,20 @@ impl<'a, A: AsyncProducer> Future for WaitVacantFuture<'a, A> {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
assert!(!self.done);
self.owner.register_read_waker(cx.waker());
let closed = self.owner.is_closed();
if self.count <= self.owner.vacant_len() || closed {
Poll::Ready(())
} else {
Poll::Pending
let mut waker_registered = false;
loop {
assert!(!self.done);
let closed = self.owner.is_closed();
if self.count <= self.owner.vacant_len() || closed {
break Poll::Ready(());
} else {
if waker_registered {
break Poll::Pending;
} else {
self.owner.register_read_waker(cx.waker());
waker_registered = true;
}
}
}
}
}
Expand All @@ -208,13 +236,20 @@ where
type Error = ();

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.register_read_waker(cx.waker());
if self.is_closed() {
Poll::Ready(Err(()))
} else if self.is_full() {
Poll::Pending
} else {
Poll::Ready(Ok(()))
let mut waker_registered = false;
loop {
if self.is_closed() {
break Poll::Ready(Err(()));
} else if !self.is_full() {
break Poll::Ready(Ok(()));
} else {
if waker_registered {
break Poll::Pending;
} else {
self.register_read_waker(cx.waker());
waker_registered = true;
}
}
}
}
fn start_send(mut self: Pin<&mut Self>, item: <R::Target as Observer>::Item) -> Result<(), Self::Error> {
Expand All @@ -237,15 +272,22 @@ where
R::Target: AsyncRingBuffer<Item = u8>,
{
fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
self.register_read_waker(cx.waker());
if self.is_closed() {
Poll::Ready(Ok(0))
} else {
let count = self.push_slice(buf);
if count == 0 {
Poll::Pending
let mut waker_registered = false;
loop {
if self.is_closed() {
break Poll::Ready(Ok(0));
} else {
Poll::Ready(Ok(count))
let count = self.push_slice(buf);
if count == 0 {
if waker_registered {
break Poll::Pending;
} else {
self.register_read_waker(cx.waker());
waker_registered = true;
}
} else {
break Poll::Ready(Ok(count));
}
}
}
}
Expand Down
Loading