From 748ed623e39620539a0a6890a36f4db5d277f27d Mon Sep 17 00:00:00 2001 From: TehPers Date: Wed, 14 Aug 2024 23:40:27 -0700 Subject: [PATCH] Add when_read_async, cleanup when_read --- README.md | 7 +- src/assertions.rs | 3 +- src/assertions/async_read.rs | 9 ++ src/assertions/async_read/extensions.rs | 75 +++++++++++++++ src/assertions/async_read/modifiers.rs | 3 + .../async_read/modifiers/when_read.rs | 51 ++++++++++ src/assertions/async_read/outputs.rs | 3 + .../async_read/outputs/when_read.rs | 94 +++++++++++++++++++ src/assertions/read/extensions.rs | 10 +- src/assertions/read/modifiers.rs | 4 +- .../modifiers/{as_bytes.rs => when_read.rs} | 17 ++-- src/prelude.rs | 2 +- 12 files changed, 258 insertions(+), 20 deletions(-) create mode 100644 src/assertions/async_read.rs create mode 100644 src/assertions/async_read/extensions.rs create mode 100644 src/assertions/async_read/modifiers.rs create mode 100644 src/assertions/async_read/modifiers/when_read.rs create mode 100644 src/assertions/async_read/outputs.rs create mode 100644 src/assertions/async_read/outputs/when_read.rs rename src/assertions/read/modifiers/{as_bytes.rs => when_read.rs} (73%) diff --git a/README.md b/README.md index 57259e8..f5c2db9 100644 --- a/README.md +++ b/README.md @@ -102,9 +102,10 @@ async fn get_cat_url(id: u32) -> String { ### Readers -| Modifier | Description | -| ----------- | ---------------------- | -| `when_read` | reads into byte buffer | +| Modifier | Description | Requires feature | +| ----------------- | ------------------------------------- | ---------------- | +| `when_read` | reads into byte buffer | | +| `when_read_async` | asynchronously reads into byte buffer | `futures` | ### Futures diff --git a/src/assertions.rs b/src/assertions.rs index 588d978..d0405a5 100644 --- a/src/assertions.rs +++ b/src/assertions.rs @@ -227,7 +227,8 @@ //! [`to_be_some`]: crate::prelude::OptionAssertions::to_be_some //! [`to_equal`]: crate::prelude::GeneralAssertions::to_equal -// pub mod functions; +#[cfg(feature = "futures")] +pub mod async_read; #[cfg(feature = "futures")] pub mod futures; pub mod general; diff --git a/src/assertions/async_read.rs b/src/assertions/async_read.rs new file mode 100644 index 0000000..a44aa9b --- /dev/null +++ b/src/assertions/async_read.rs @@ -0,0 +1,9 @@ +//! Modifiers for types that can be read asynchronously. + +mod extensions; +mod modifiers; +mod outputs; + +pub use extensions::*; +pub use modifiers::*; +pub use outputs::*; diff --git a/src/assertions/async_read/extensions.rs b/src/assertions/async_read/extensions.rs new file mode 100644 index 0000000..5182afa --- /dev/null +++ b/src/assertions/async_read/extensions.rs @@ -0,0 +1,75 @@ +use futures::AsyncRead; + +use crate::assertions::AssertionBuilder; + +use super::WhenReadAsyncModifier; + +/// Modifiers for types that implement [`futures::AsyncRead`]. +pub trait AsyncReadAssertions +where + T: AsyncRead, +{ + /// Asynchronously reads the subject into a buffer, then executes the + /// assertion on it. + /// + /// ``` + /// # use expecters::prelude::*; + /// use futures::io::Cursor; + /// # #[tokio::main(flavor = "current_thread")] + /// # async fn main() { + /// expect!( + /// Cursor::new("Hello, world!"), + /// when_read_async, + /// as_utf8, + /// to_equal("Hello, world!"), + /// ) + /// .await; + /// # } + /// ``` + /// + /// The assertion fails if reading the subject fails: + /// + /// ```should_panic + /// # use expecters::prelude::*; + /// use std::{ + /// pin::Pin, + /// task::{Context, Poll}, + /// }; + /// + /// use futures::io::{Error, ErrorKind, AsyncRead}; + /// + /// struct MyReader; + /// + /// impl AsyncRead for MyReader { + /// fn poll_read( + /// self: Pin<&mut Self>, + /// _cx: &mut Context, + /// _buf: &mut [u8], + /// ) -> Poll> { + /// Poll::Ready(Err(Error::new(ErrorKind::Other, "always fail"))) + /// } + /// } + /// + /// # #[tokio::main(flavor = "current_thread")] + /// # async fn main() { + /// expect!( + /// MyReader, + /// when_read_async, + /// count, + /// to_be_greater_than_or_equal_to(0), + /// ) + /// .await; + /// # } + /// ``` + fn when_read_async(self) -> AssertionBuilder, WhenReadAsyncModifier>; +} + +impl AsyncReadAssertions for AssertionBuilder +where + T: AsyncRead, +{ + #[inline] + fn when_read_async(self) -> AssertionBuilder, WhenReadAsyncModifier> { + AssertionBuilder::modify(self, WhenReadAsyncModifier::new) + } +} diff --git a/src/assertions/async_read/modifiers.rs b/src/assertions/async_read/modifiers.rs new file mode 100644 index 0000000..2c644e7 --- /dev/null +++ b/src/assertions/async_read/modifiers.rs @@ -0,0 +1,3 @@ +mod when_read; + +pub use when_read::*; diff --git a/src/assertions/async_read/modifiers/when_read.rs b/src/assertions/async_read/modifiers/when_read.rs new file mode 100644 index 0000000..98484b1 --- /dev/null +++ b/src/assertions/async_read/modifiers/when_read.rs @@ -0,0 +1,51 @@ +use futures::AsyncRead; + +use crate::assertions::{ + async_read::WhenReadAsyncFuture, general::IntoInitializableOutput, Assertion, AssertionContext, + AssertionContextBuilder, AssertionModifier, +}; + +/// Reads a subject into a buffer asynchronously. +#[derive(Clone, Debug)] +pub struct WhenReadAsyncModifier { + prev: M, +} + +impl WhenReadAsyncModifier { + #[inline] + pub(crate) fn new(prev: M) -> Self { + Self { prev } + } +} + +impl AssertionModifier for WhenReadAsyncModifier +where + M: AssertionModifier>, +{ + type Output = M::Output; + + #[inline] + fn apply(self, cx: AssertionContextBuilder, next: A) -> Self::Output { + self.prev.apply(cx, WhenReadAsyncAssertion { next }) + } +} + +/// Reads the subject into a buffer asynchronously and executes the inner +/// assertion on it. +#[derive(Clone, Debug)] +pub struct WhenReadAsyncAssertion { + next: A, +} + +impl Assertion for WhenReadAsyncAssertion +where + A: Assertion, Output: IntoInitializableOutput>, + T: AsyncRead, +{ + type Output = WhenReadAsyncFuture; + + #[inline] + fn execute(self, cx: AssertionContext, subject: T) -> Self::Output { + WhenReadAsyncFuture::new(cx, subject, self.next) + } +} diff --git a/src/assertions/async_read/outputs.rs b/src/assertions/async_read/outputs.rs new file mode 100644 index 0000000..2c644e7 --- /dev/null +++ b/src/assertions/async_read/outputs.rs @@ -0,0 +1,3 @@ +mod when_read; + +pub use when_read::*; diff --git a/src/assertions/async_read/outputs/when_read.rs b/src/assertions/async_read/outputs/when_read.rs new file mode 100644 index 0000000..7b9aa93 --- /dev/null +++ b/src/assertions/async_read/outputs/when_read.rs @@ -0,0 +1,94 @@ +use std::{ + future::Future, + pin::Pin, + task::{ready, Context, Poll}, +}; + +use futures::AsyncRead; +use pin_project_lite::pin_project; + +use crate::assertions::{general::IntoInitializableOutput, Assertion, AssertionContext}; + +pin_project! { + /// Asynchronously reads a subject and executes an assertion on it. + #[derive(Clone, Debug)] + pub struct WhenReadAsyncFuture { + #[pin] + subject: T, + buffer: Vec, + result: Vec, + next: Option<(AssertionContext, A)> + } +} + +impl WhenReadAsyncFuture { + #[inline] + pub(crate) fn new(cx: AssertionContext, subject: T, next: A) -> Self { + WhenReadAsyncFuture { + subject, + buffer: vec![0; 32], + result: Vec::new(), + next: Some((cx, next)), + } + } +} + +impl Future for WhenReadAsyncFuture +where + T: AsyncRead, + A: Assertion, Output: IntoInitializableOutput>, +{ + type Output = ::Initialized; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let mut projected = self.project(); + + // Read the subject + loop { + let result = ready!(projected.subject.as_mut().poll_read(cx, projected.buffer)); + match result { + Ok(0) => break, + Ok(n) => { + projected.result.extend(&projected.buffer[..n]); + + // Check if we can grow the buffer for the next read + if n == projected.buffer.len() { + projected.buffer.reserve(32); + projected.buffer.resize(projected.buffer.capacity(), 0); + } + } + Err(error) => { + let (mut cx, _) = projected.next.take().expect("poll after ready"); + cx.annotate("error", error); + return Poll::Ready(cx.fail("failed to read")); + } + }; + } + + let (mut cx, next) = projected.next.take().expect("poll after ready"); + cx.annotate("read bytes", projected.result.len()); + Poll::Ready( + next.execute(cx, std::mem::take(projected.result)) + .into_initialized(), + ) + } +} + +#[cfg(test)] +mod tests { + use futures::io::Cursor; + + use crate::prelude::*; + + #[tokio::test] + async fn long_data() { + let subject = "Hello, world! ".repeat(100); + expect!( + Cursor::new(subject.clone()), + when_read_async, + as_utf8, + to_equal(subject), + ) + .await; + } +} diff --git a/src/assertions/read/extensions.rs b/src/assertions/read/extensions.rs index b7f7ae0..bb8a477 100644 --- a/src/assertions/read/extensions.rs +++ b/src/assertions/read/extensions.rs @@ -2,7 +2,7 @@ use std::io::Read; use crate::assertions::AssertionBuilder; -use super::WhenReadAsBytesModifier; +use super::WhenReadModifier; /// Modifiers for types that implement [`Read`]. pub trait ReadExtensions @@ -31,7 +31,7 @@ where /// struct MyReader; /// /// impl Read for MyReader { - /// fn read(&mut self, _: &mut [u8]) -> std::io::Result { + /// fn read(&mut self, _buf: &mut [u8]) -> std::io::Result { /// Err(Error::new(ErrorKind::Other, "always fail")) /// } /// } @@ -43,7 +43,7 @@ where /// to_be_greater_than_or_equal_to(0), /// ); /// ``` - fn when_read(self) -> AssertionBuilder, WhenReadAsBytesModifier>; + fn when_read(self) -> AssertionBuilder, WhenReadModifier>; } impl ReadExtensions for AssertionBuilder @@ -51,7 +51,7 @@ where T: Read, { #[inline] - fn when_read(self) -> AssertionBuilder, WhenReadAsBytesModifier> { - AssertionBuilder::modify(self, WhenReadAsBytesModifier::new) + fn when_read(self) -> AssertionBuilder, WhenReadModifier> { + AssertionBuilder::modify(self, WhenReadModifier::new) } } diff --git a/src/assertions/read/modifiers.rs b/src/assertions/read/modifiers.rs index d1aff2f..2c644e7 100644 --- a/src/assertions/read/modifiers.rs +++ b/src/assertions/read/modifiers.rs @@ -1,3 +1,3 @@ -mod as_bytes; +mod when_read; -pub use as_bytes::*; +pub use when_read::*; diff --git a/src/assertions/read/modifiers/as_bytes.rs b/src/assertions/read/modifiers/when_read.rs similarity index 73% rename from src/assertions/read/modifiers/as_bytes.rs rename to src/assertions/read/modifiers/when_read.rs index 91fb092..d04b7da 100644 --- a/src/assertions/read/modifiers/as_bytes.rs +++ b/src/assertions/read/modifiers/when_read.rs @@ -7,35 +7,35 @@ use crate::assertions::{ /// Reads a subject into a buffer. #[derive(Clone, Debug)] -pub struct WhenReadAsBytesModifier { +pub struct WhenReadModifier { prev: M, } -impl WhenReadAsBytesModifier { +impl WhenReadModifier { #[inline] pub(crate) fn new(prev: M) -> Self { - WhenReadAsBytesModifier { prev } + WhenReadModifier { prev } } } -impl AssertionModifier for WhenReadAsBytesModifier +impl AssertionModifier for WhenReadModifier where - M: AssertionModifier>, + M: AssertionModifier>, { type Output = M::Output; fn apply(self, cx: AssertionContextBuilder, next: A) -> Self::Output { - self.prev.apply(cx, WhenReadAsBytesAssertion { next }) + self.prev.apply(cx, WhenReadAssertion { next }) } } /// Reads the subject into a buffer and executes the inner assertion on it. #[derive(Clone, Debug)] -pub struct WhenReadAsBytesAssertion { +pub struct WhenReadAssertion { next: A, } -impl Assertion for WhenReadAsBytesAssertion +impl Assertion for WhenReadAssertion where T: Read, A: Assertion, Output: IntoInitializableOutput>, @@ -51,6 +51,7 @@ where } }; + cx.annotate("read bytes", bytes.len()); self.next.execute(cx, bytes).into_initialized() } } diff --git a/src/prelude.rs b/src/prelude.rs index 1a14fdb..90f8687 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -24,4 +24,4 @@ pub use crate::{ }; #[cfg(feature = "futures")] -pub use crate::assertions::futures::FutureAssertions; +pub use crate::assertions::{async_read::AsyncReadAssertions, futures::FutureAssertions};