Skip to content

Commit

Permalink
Add BytesFrame type to wrap Frame<Bytes> (#447)
Browse files Browse the repository at this point in the history
* Add `BytesFrame` type to wrap `Frame<Bytes>`

* Format Rust code using rustfmt

---------

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
depth-liu and github-actions[bot] authored Oct 9, 2023
1 parent 2e8d18c commit 5a95482
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 30 deletions.
11 changes: 10 additions & 1 deletion crates/compression/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use bytes::Bytes;
use futures_util::stream::{BoxStream, Stream};
use tokio::task::{spawn_blocking, JoinHandle};

use salvo_core::http::body::{Body, HyperBody};
use salvo_core::http::body::{Body, BytesFrame, HyperBody};
use salvo_core::BoxedError;

use super::{CompressionAlgo, CompressionLevel, Encoder};
Expand Down Expand Up @@ -40,6 +40,14 @@ impl EncodeStream<BoxStream<'static, Result<Bytes, BoxedError>>> {
Stream::poll_next(Pin::new(&mut self.body), cx).map_err(|e| IoError::new(ErrorKind::Other, e))
}
}
impl EncodeStream<BoxStream<'static, Result<BytesFrame, BoxedError>>> {
#[inline]
fn poll_chunk(&mut self, cx: &mut Context<'_>) -> Poll<Option<IoResult<Bytes>>> {
Stream::poll_next(Pin::new(&mut self.body), cx)
.map_ok(|f| f.into_data().unwrap_or_default())
.map_err(|e| IoError::new(ErrorKind::Other, e))
}
}
impl EncodeStream<HyperBody> {
#[inline]
fn poll_chunk(&mut self, cx: &mut Context<'_>) -> Poll<Option<IoResult<Bytes>>> {
Expand Down Expand Up @@ -139,6 +147,7 @@ macro_rules! impl_stream {
};
}
impl_stream!(BoxStream<'static, Result<Bytes, BoxedError>>);
impl_stream!(BoxStream<'static, Result<BytesFrame, BoxedError>>);
impl_stream!(HyperBody);
impl_stream!(Option<Bytes>);
impl_stream!(VecDeque<Bytes>);
94 changes: 94 additions & 0 deletions crates/core/src/http/body/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! Http body.
pub use hyper::body::{Body, Frame, SizeHint};

mod req;
Expand All @@ -10,3 +11,96 @@ pub use hyper::body::Incoming as HyperBody;
pub use res::ResBody;
mod channel;
pub use channel::{BodyReceiver, BodySender};

use std::ops::{Deref, DerefMut};

use bytes::Bytes;

use crate::http::HeaderMap;

/// Frame with it's DATA type is [`Bytes`].
pub struct BytesFrame(pub Frame<Bytes>);
impl BytesFrame {
/// Create a DATA frame with the provided [`Bytes`].
pub fn data(buf: impl Into<Bytes>) -> Self {
Self(Frame::data(buf.into()))
}

/// Consumes self into the buf of the DATA frame.
///
/// Returns an [`Err`] containing the original [`Frame`] when frame is not a DATA frame.
/// `Frame::is_data` can also be used to determine if the frame is a DATA frame.
pub fn into_data(self) -> Result<Bytes, Self> {
self.0.into_data().map_err(Self)
}

/// Consumes self into the buf of the trailers frame.
///
/// Returns an [`Err`] containing the original [`Frame`] when frame is not a trailers frame.
/// `Frame::is_trailers` can also be used to determine if the frame is a trailers frame.
pub fn into_trailers(self) -> Result<HeaderMap, Self> {
self.0.into_trailers().map_err(Self)
}
}
impl AsRef<Frame<Bytes>> for BytesFrame {
fn as_ref(&self) -> &Frame<Bytes> {
&self.0
}
}
impl Deref for BytesFrame {
type Target = Frame<Bytes>;

fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for BytesFrame {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}

impl From<Bytes> for BytesFrame {
fn from(value: Bytes) -> Self {
Self::data(value)
}
}
impl From<String> for BytesFrame {
#[inline]
fn from(value: String) -> Self {
Self::data(value)
}
}

impl From<&'static [u8]> for BytesFrame {
fn from(value: &'static [u8]) -> Self {
Self::data(value)
}
}

impl From<&'static str> for BytesFrame {
fn from(value: &'static str) -> Self {
Self::data(value)
}
}

impl From<Vec<u8>> for BytesFrame {
fn from(value: Vec<u8>) -> Self {
Self::data(value)
}
}

impl<T> From<Box<T>> for BytesFrame
where
T: Into<BytesFrame>,
{
fn from(value: Box<T>) -> Self {
(*value).into()
}
}

impl From<BytesFrame> for Bytes {
fn from(value: BytesFrame) -> Self {
value.0.into_data().unwrap_or_default()
}
}
25 changes: 14 additions & 11 deletions crates/core/src/http/body/req.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,43 +114,46 @@ impl Stream for ReqBody {
}

impl From<Bytes> for ReqBody {
fn from(value: Bytes) -> ReqBody {
fn from(value: Bytes) -> Self {
Self::Once(value)
}
}
impl From<Incoming> for ReqBody {
fn from(value: Incoming) -> ReqBody {
fn from(value: Incoming) -> Self {
Self::Hyper(value)
}
}
impl From<String> for ReqBody {
#[inline]
fn from(value: String) -> ReqBody {
fn from(value: String) -> Self {
Self::Once(value.into())
}
}

impl From<&'static [u8]> for ReqBody {
fn from(value: &'static [u8]) -> ReqBody {
Self::Once(value.into())
fn from(value: &'static [u8]) -> Self {
Self::Once(Bytes::from_static(value))
}
}

impl From<&'static str> for ReqBody {
fn from(value: &'static str) -> ReqBody {
Self::Once(value.into())
fn from(value: &'static str) -> Self {
Self::Once(Bytes::from_static(value.as_bytes()))
}
}

impl From<Vec<u8>> for ReqBody {
fn from(value: Vec<u8>) -> ReqBody {
fn from(value: Vec<u8>) -> Self {
Self::Once(value.into())
}
}

impl From<Box<[u8]>> for ReqBody {
fn from(value: Box<[u8]>) -> ReqBody {
Self::Once(value.into())
impl<T> From<Box<T>> for ReqBody
where
T: Into<ReqBody>,
{
fn from(value: Box<T>) -> Self {
(*value).into()
}
}

Expand Down
35 changes: 19 additions & 16 deletions crates/core/src/http/body/res.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use sync_wrapper::SyncWrapper;
use bytes::Bytes;

use crate::error::BoxedError;
use crate::http::body::{BodyReceiver, BodySender};
use crate::http::body::{BodyReceiver, BodySender, BytesFrame};
use crate::prelude::StatusError;

/// Response body type.
Expand All @@ -37,7 +37,7 @@ pub enum ResBody {
/// Boxed body.
Boxed(Pin<Box<dyn Body<Data = Bytes, Error = BoxedError> + Send + Sync + 'static>>),
/// Stream body.
Stream(SyncWrapper<BoxStream<'static, Result<Bytes, BoxedError>>>),
Stream(SyncWrapper<BoxStream<'static, Result<BytesFrame, BoxedError>>>),
/// Channel body.
Channel(BodyReceiver),
/// Error body will be process in catcher.
Expand Down Expand Up @@ -88,7 +88,7 @@ impl ResBody {
pub fn stream<S, O, E>(stream: S) -> Self
where
S: Stream<Item = Result<O, E>> + Send + 'static,
O: Into<Bytes> + 'static,
O: Into<BytesFrame> + 'static,
E: Into<BoxedError> + 'static,
{
let mapped = stream.map_ok(Into::into).map_err(Into::into);
Expand Down Expand Up @@ -168,7 +168,7 @@ impl Body for ResBody {
.get_mut()
.as_mut()
.poll_next(cx)
.map_ok(Frame::data)
.map_ok(|frame| frame.0)
.map_err(|e| IoError::new(ErrorKind::Other, e)),
Self::Channel(rx) => {
if !rx.data_rx.is_terminated() {
Expand Down Expand Up @@ -232,48 +232,51 @@ impl Stream for ResBody {
}

impl From<()> for ResBody {
fn from(_value: ()) -> ResBody {
fn from(_value: ()) -> Self {
Self::None
}
}
impl From<Bytes> for ResBody {
fn from(value: Bytes) -> ResBody {
fn from(value: Bytes) -> Self {
Self::Once(value)
}
}
impl From<Incoming> for ResBody {
fn from(value: Incoming) -> ResBody {
fn from(value: Incoming) -> Self {
Self::Hyper(value)
}
}
impl From<String> for ResBody {
#[inline]
fn from(value: String) -> ResBody {
fn from(value: String) -> Self {
Self::Once(value.into())
}
}

impl From<&'static [u8]> for ResBody {
fn from(value: &'static [u8]) -> ResBody {
Self::Once(value.into())
fn from(value: &'static [u8]) -> Self {
Self::Once(Bytes::from_static(value))
}
}

impl From<&'static str> for ResBody {
fn from(value: &'static str) -> ResBody {
Self::Once(value.into())
fn from(value: &'static str) -> Self {
Self::Once(Bytes::from_static(value.as_bytes()))
}
}

impl From<Vec<u8>> for ResBody {
fn from(value: Vec<u8>) -> ResBody {
fn from(value: Vec<u8>) -> Self {
Self::Once(value.into())
}
}

impl From<Box<[u8]>> for ResBody {
fn from(value: Box<[u8]>) -> ResBody {
Self::Once(value.into())
impl<T> From<Box<T>> for ResBody
where
T: Into<ResBody>,
{
fn from(value: Box<T>) -> Self {
(*value).into()
}
}

Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/http/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::http::{StatusCode, StatusError};
use crate::{BoxedError, Error, Scribe};
use bytes::Bytes;

pub use crate::http::body::{BodySender, ResBody};
pub use crate::http::body::{BodySender, BytesFrame, ResBody};

/// Represents an HTTP response
#[non_exhaustive]
Expand Down Expand Up @@ -450,7 +450,7 @@ impl Response {
pub fn stream<S, O, E>(&mut self, stream: S)
where
S: Stream<Item = Result<O, E>> + Send + 'static,
O: Into<Bytes> + 'static,
O: Into<BytesFrame> + 'static,
E: Into<BoxedError> + 'static,
{
self.body = ResBody::stream(stream);
Expand Down

0 comments on commit 5a95482

Please sign in to comment.