diff --git a/crates/compression/src/stream.rs b/crates/compression/src/stream.rs index 219c4ac31..b89bf2964 100644 --- a/crates/compression/src/stream.rs +++ b/crates/compression/src/stream.rs @@ -9,7 +9,7 @@ use bytes::Bytes; use futures_util::stream::{BoxStream, Stream}; use tokio::task::{spawn_blocking, JoinHandle}; -use salvo_core::http::body::{Body, HyperBody}; +use salvo_core::http::body::{Body, BytesFrame, HyperBody}; use salvo_core::BoxedError; use super::{CompressionAlgo, CompressionLevel, Encoder}; @@ -40,6 +40,14 @@ impl EncodeStream>> { Stream::poll_next(Pin::new(&mut self.body), cx).map_err(|e| IoError::new(ErrorKind::Other, e)) } } +impl EncodeStream>> { + #[inline] + fn poll_chunk(&mut self, cx: &mut Context<'_>) -> Poll>> { + Stream::poll_next(Pin::new(&mut self.body), cx) + .map_ok(|f| f.into_data().unwrap_or_default()) + .map_err(|e| IoError::new(ErrorKind::Other, e)) + } +} impl EncodeStream { #[inline] fn poll_chunk(&mut self, cx: &mut Context<'_>) -> Poll>> { @@ -139,6 +147,7 @@ macro_rules! impl_stream { }; } impl_stream!(BoxStream<'static, Result>); +impl_stream!(BoxStream<'static, Result>); impl_stream!(HyperBody); impl_stream!(Option); impl_stream!(VecDeque); diff --git a/crates/core/src/http/body/mod.rs b/crates/core/src/http/body/mod.rs index f35e68777..dc07bc41a 100644 --- a/crates/core/src/http/body/mod.rs +++ b/crates/core/src/http/body/mod.rs @@ -1,4 +1,5 @@ //! Http body. + pub use hyper::body::{Body, Frame, SizeHint}; mod req; @@ -10,3 +11,96 @@ pub use hyper::body::Incoming as HyperBody; pub use res::ResBody; mod channel; pub use channel::{BodyReceiver, BodySender}; + +use std::ops::{Deref, DerefMut}; + +use bytes::Bytes; + +use crate::http::HeaderMap; + +/// Frame with it's DATA type is [`Bytes`]. +pub struct BytesFrame(pub Frame); +impl BytesFrame { + /// Create a DATA frame with the provided [`Bytes`]. + pub fn data(buf: impl Into) -> Self { + Self(Frame::data(buf.into())) + } + + /// Consumes self into the buf of the DATA frame. + /// + /// Returns an [`Err`] containing the original [`Frame`] when frame is not a DATA frame. + /// `Frame::is_data` can also be used to determine if the frame is a DATA frame. + pub fn into_data(self) -> Result { + self.0.into_data().map_err(Self) + } + + /// Consumes self into the buf of the trailers frame. + /// + /// Returns an [`Err`] containing the original [`Frame`] when frame is not a trailers frame. + /// `Frame::is_trailers` can also be used to determine if the frame is a trailers frame. + pub fn into_trailers(self) -> Result { + self.0.into_trailers().map_err(Self) + } +} +impl AsRef> for BytesFrame { + fn as_ref(&self) -> &Frame { + &self.0 + } +} +impl Deref for BytesFrame { + type Target = Frame; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} +impl DerefMut for BytesFrame { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl From for BytesFrame { + fn from(value: Bytes) -> Self { + Self::data(value) + } +} +impl From for BytesFrame { + #[inline] + fn from(value: String) -> Self { + Self::data(value) + } +} + +impl From<&'static [u8]> for BytesFrame { + fn from(value: &'static [u8]) -> Self { + Self::data(value) + } +} + +impl From<&'static str> for BytesFrame { + fn from(value: &'static str) -> Self { + Self::data(value) + } +} + +impl From> for BytesFrame { + fn from(value: Vec) -> Self { + Self::data(value) + } +} + +impl From> for BytesFrame +where + T: Into, +{ + fn from(value: Box) -> Self { + (*value).into() + } +} + +impl From for Bytes { + fn from(value: BytesFrame) -> Self { + value.0.into_data().unwrap_or_default() + } +} diff --git a/crates/core/src/http/body/req.rs b/crates/core/src/http/body/req.rs index 7e1b49901..4d6cf1f27 100644 --- a/crates/core/src/http/body/req.rs +++ b/crates/core/src/http/body/req.rs @@ -114,43 +114,46 @@ impl Stream for ReqBody { } impl From for ReqBody { - fn from(value: Bytes) -> ReqBody { + fn from(value: Bytes) -> Self { Self::Once(value) } } impl From for ReqBody { - fn from(value: Incoming) -> ReqBody { + fn from(value: Incoming) -> Self { Self::Hyper(value) } } impl From for ReqBody { #[inline] - fn from(value: String) -> ReqBody { + fn from(value: String) -> Self { Self::Once(value.into()) } } impl From<&'static [u8]> for ReqBody { - fn from(value: &'static [u8]) -> ReqBody { - Self::Once(value.into()) + fn from(value: &'static [u8]) -> Self { + Self::Once(Bytes::from_static(value)) } } impl From<&'static str> for ReqBody { - fn from(value: &'static str) -> ReqBody { - Self::Once(value.into()) + fn from(value: &'static str) -> Self { + Self::Once(Bytes::from_static(value.as_bytes())) } } impl From> for ReqBody { - fn from(value: Vec) -> ReqBody { + fn from(value: Vec) -> Self { Self::Once(value.into()) } } -impl From> for ReqBody { - fn from(value: Box<[u8]>) -> ReqBody { - Self::Once(value.into()) +impl From> for ReqBody +where + T: Into, +{ + fn from(value: Box) -> Self { + (*value).into() } } diff --git a/crates/core/src/http/body/res.rs b/crates/core/src/http/body/res.rs index 2500e4add..a0a836ac5 100644 --- a/crates/core/src/http/body/res.rs +++ b/crates/core/src/http/body/res.rs @@ -17,7 +17,7 @@ use sync_wrapper::SyncWrapper; use bytes::Bytes; use crate::error::BoxedError; -use crate::http::body::{BodyReceiver, BodySender}; +use crate::http::body::{BodyReceiver, BodySender, BytesFrame}; use crate::prelude::StatusError; /// Response body type. @@ -37,7 +37,7 @@ pub enum ResBody { /// Boxed body. Boxed(Pin + Send + Sync + 'static>>), /// Stream body. - Stream(SyncWrapper>>), + Stream(SyncWrapper>>), /// Channel body. Channel(BodyReceiver), /// Error body will be process in catcher. @@ -88,7 +88,7 @@ impl ResBody { pub fn stream(stream: S) -> Self where S: Stream> + Send + 'static, - O: Into + 'static, + O: Into + 'static, E: Into + 'static, { let mapped = stream.map_ok(Into::into).map_err(Into::into); @@ -168,7 +168,7 @@ impl Body for ResBody { .get_mut() .as_mut() .poll_next(cx) - .map_ok(Frame::data) + .map_ok(|frame| frame.0) .map_err(|e| IoError::new(ErrorKind::Other, e)), Self::Channel(rx) => { if !rx.data_rx.is_terminated() { @@ -232,48 +232,51 @@ impl Stream for ResBody { } impl From<()> for ResBody { - fn from(_value: ()) -> ResBody { + fn from(_value: ()) -> Self { Self::None } } impl From for ResBody { - fn from(value: Bytes) -> ResBody { + fn from(value: Bytes) -> Self { Self::Once(value) } } impl From for ResBody { - fn from(value: Incoming) -> ResBody { + fn from(value: Incoming) -> Self { Self::Hyper(value) } } impl From for ResBody { #[inline] - fn from(value: String) -> ResBody { + fn from(value: String) -> Self { Self::Once(value.into()) } } impl From<&'static [u8]> for ResBody { - fn from(value: &'static [u8]) -> ResBody { - Self::Once(value.into()) + fn from(value: &'static [u8]) -> Self { + Self::Once(Bytes::from_static(value)) } } impl From<&'static str> for ResBody { - fn from(value: &'static str) -> ResBody { - Self::Once(value.into()) + fn from(value: &'static str) -> Self { + Self::Once(Bytes::from_static(value.as_bytes())) } } impl From> for ResBody { - fn from(value: Vec) -> ResBody { + fn from(value: Vec) -> Self { Self::Once(value.into()) } } -impl From> for ResBody { - fn from(value: Box<[u8]>) -> ResBody { - Self::Once(value.into()) +impl From> for ResBody +where + T: Into, +{ + fn from(value: Box) -> Self { + (*value).into() } } diff --git a/crates/core/src/http/response.rs b/crates/core/src/http/response.rs index 18dcfea8f..0d7d66485 100644 --- a/crates/core/src/http/response.rs +++ b/crates/core/src/http/response.rs @@ -16,7 +16,7 @@ use crate::http::{StatusCode, StatusError}; use crate::{BoxedError, Error, Scribe}; use bytes::Bytes; -pub use crate::http::body::{BodySender, ResBody}; +pub use crate::http::body::{BodySender, BytesFrame, ResBody}; /// Represents an HTTP response #[non_exhaustive] @@ -450,7 +450,7 @@ impl Response { pub fn stream(&mut self, stream: S) where S: Stream> + Send + 'static, - O: Into + 'static, + O: Into + 'static, E: Into + 'static, { self.body = ResBody::stream(stream);