Skip to content

Commit

Permalink
feat: Add Body type
Browse files Browse the repository at this point in the history
  • Loading branch information
tottoto committed Oct 19, 2024
1 parent 758d4f9 commit b48c876
Show file tree
Hide file tree
Showing 31 changed files with 302 additions and 249 deletions.
10 changes: 5 additions & 5 deletions examples/src/h2c/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ mod h2c {
client::legacy::{connect::HttpConnector, Client},
rt::TokioExecutor,
};
use tonic::body::BoxBody;
use tonic::body::Body;
use tower::Service;

pub struct H2cChannel {
pub client: Client<HttpConnector, BoxBody>,
pub client: Client<HttpConnector, Body>,
}

impl Service<http::Request<BoxBody>> for H2cChannel {
impl Service<http::Request<Body>> for H2cChannel {
type Response = http::Response<Incoming>;
type Error = hyper::Error;
type Future =
Expand All @@ -56,7 +56,7 @@ mod h2c {
Poll::Ready(Ok(()))
}

fn call(&mut self, request: http::Request<BoxBody>) -> Self::Future {
fn call(&mut self, request: http::Request<Body>) -> Self::Future {
let client = self.client.clone();

Box::pin(async move {
Expand All @@ -65,7 +65,7 @@ mod h2c {
let h2c_req = hyper::Request::builder()
.uri(origin)
.header(http::header::UPGRADE, "h2c")
.body(BoxBody::default())
.body(Body::default())
.unwrap();

let res = client.request(h2c_req).await.unwrap();
Expand Down
12 changes: 6 additions & 6 deletions examples/src/h2c/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ mod h2c {
use http::{Request, Response};
use hyper::body::Incoming;
use hyper_util::{rt::TokioExecutor, service::TowerToHyperService};
use tonic::body::BoxBody;
use tonic::body::Body;
use tower::{Service, ServiceExt};

#[derive(Clone)]
Expand All @@ -81,12 +81,12 @@ mod h2c {

impl<S> Service<Request<Incoming>> for H2c<S>
where
S: Service<Request<BoxBody>, Response = Response<BoxBody>> + Clone + Send + 'static,
S: Service<Request<Body>, Response = Response<Body>> + Clone + Send + 'static,
S::Future: Send + 'static,
S::Error: Into<BoxError> + Sync + Send + 'static,
S::Response: Send + 'static,
{
type Response = hyper::Response<BoxBody>;
type Response = hyper::Response<Body>;
type Error = hyper::Error;
type Future =
Pin<Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + Send>>;
Expand All @@ -99,11 +99,11 @@ mod h2c {
}

fn call(&mut self, req: hyper::Request<Incoming>) -> Self::Future {
let mut req = req.map(tonic::body::boxed);
let mut req = req.map(Body::new);
let svc = self
.s
.clone()
.map_request(|req: Request<_>| req.map(tonic::body::boxed));
.map_request(|req: Request<_>| req.map(Body::new));
Box::pin(async move {
tokio::spawn(async move {
let upgraded_io = hyper::upgrade::on(&mut req).await.unwrap();
Expand All @@ -114,7 +114,7 @@ mod h2c {
.unwrap();
});

let mut res = hyper::Response::new(BoxBody::default());
let mut res = hyper::Response::new(Body::default());
*res.status_mut() = http::StatusCode::SWITCHING_PROTOCOLS;
res.headers_mut().insert(
hyper::header::UPGRADE,
Expand Down
6 changes: 4 additions & 2 deletions examples/src/tls_rustls/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tokio_rustls::{
rustls::{pki_types::CertificateDer, ServerConfig},
TlsAcceptor,
};
use tonic::{body::boxed, service::Routes, Request, Response, Status};
use tonic::{body::Body, service::Routes, Request, Response, Status};
use tower::ServiceExt;
use tower_http::ServiceBuilderExt;

Expand Down Expand Up @@ -79,7 +79,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

http.serve_connection(
TokioIo::new(conn),
TowerToHyperService::new(svc.map_request(|req: http::Request<_>| req.map(boxed))),
TowerToHyperService::new(
svc.map_request(|req: http::Request<_>| req.map(Body::new)),
),
)
.await
.unwrap();
Expand Down
8 changes: 4 additions & 4 deletions examples/src/tower/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ mod service {
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tonic::body::BoxBody;
use tonic::body::Body;
use tonic::transport::Channel;
use tower::Service;

Expand All @@ -57,8 +57,8 @@ mod service {
}
}

impl Service<Request<BoxBody>> for AuthSvc {
type Response = Response<BoxBody>;
impl Service<Request<Body>> for AuthSvc {
type Response = Response<Body>;
type Error = Box<dyn std::error::Error + Send + Sync>;
#[allow(clippy::type_complexity)]
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
Expand All @@ -67,7 +67,7 @@ mod service {
self.inner.poll_ready(cx).map_err(Into::into)
}

fn call(&mut self, req: Request<BoxBody>) -> Self::Future {
fn call(&mut self, req: Request<Body>) -> Self::Future {
// This is necessary because tonic internally uses `tower::buffer::Buffer`.
// See https://github.com/tower-rs/tower/issues/547#issuecomment-767629149
// for details on why this is necessary
Expand Down
14 changes: 7 additions & 7 deletions interop/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use crate::pb::{self, *};
use async_stream::try_stream;
use http::header::{HeaderName, HeaderValue};
use http_body::Body;
use http_body::Body as HttpBody;
use std::future::Future;
use std::pin::Pin;
use std::result::Result as StdResult;
use std::task::{ready, Context, Poll};
use std::time::Duration;
use tokio_stream::StreamExt;
use tonic::{body::BoxBody, server::NamedService, Code, Request, Response, Status};
use tonic::{body::Body, server::NamedService, Code, Request, Response, Status};
use tower::Service;

pub use pb::test_service_server::TestServiceServer;
Expand Down Expand Up @@ -180,9 +180,9 @@ impl<S> EchoHeadersSvc<S> {
}
}

impl<S> Service<http::Request<BoxBody>> for EchoHeadersSvc<S>
impl<S> Service<http::Request<Body>> for EchoHeadersSvc<S>
where
S: Service<http::Request<BoxBody>, Response = http::Response<BoxBody>> + Send,
S: Service<http::Request<Body>, Response = http::Response<Body>> + Send,
S::Future: Send + 'static,
{
type Response = S::Response;
Expand All @@ -193,7 +193,7 @@ where
Ok(()).into()
}

fn call(&mut self, req: http::Request<BoxBody>) -> Self::Future {
fn call(&mut self, req: http::Request<Body>) -> Self::Future {
let echo_header = req.headers().get("x-grpc-test-echo-initial").cloned();

let echo_trailer = req
Expand All @@ -212,7 +212,7 @@ where
.insert("x-grpc-test-echo-initial", echo_header);
Ok(res
.map(|b| MergeTrailers::new(b, echo_trailer))
.map(BoxBody::new))
.map(Body::new))
} else {
Ok(res)
}
Expand All @@ -231,7 +231,7 @@ impl<B> MergeTrailers<B> {
}
}

impl<B: Body + Unpin> Body for MergeTrailers<B> {
impl<B: HttpBody + Unpin> HttpBody for MergeTrailers<B> {
type Data = B::Data;
type Error = B::Error;

Expand Down
18 changes: 9 additions & 9 deletions tests/compression/src/util.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::*;
use bytes::{Buf, Bytes};
use http_body::{Body, Frame};
use http_body::{Body as HttpBody, Frame};
use http_body_util::BodyExt as _;
use hyper_util::rt::TokioIo;
use pin_project::pin_project;
Expand All @@ -13,7 +13,7 @@ use std::{
task::{ready, Context, Poll},
};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tonic::body::BoxBody;
use tonic::body::Body;
use tonic::codec::CompressionEncoding;
use tonic::transport::{server::Connected, Channel};
use tower_http::map_request_body::MapRequestBodyLayer;
Expand Down Expand Up @@ -42,9 +42,9 @@ pub struct CountBytesBody<B> {
pub counter: Arc<AtomicUsize>,
}

impl<B> Body for CountBytesBody<B>
impl<B> HttpBody for CountBytesBody<B>
where
B: Body<Data = Bytes>,
B: HttpBody<Data = Bytes>,
{
type Data = B::Data;
type Error = B::Error;
Expand Down Expand Up @@ -95,7 +95,7 @@ impl<T> ChannelBody<T> {
}
}

impl<T> Body for ChannelBody<T>
impl<T> HttpBody for ChannelBody<T>
where
T: Buf,
{
Expand All @@ -114,8 +114,8 @@ where
#[allow(dead_code)]
pub fn measure_request_body_size_layer(
bytes_sent_counter: Arc<AtomicUsize>,
) -> MapRequestBodyLayer<impl Fn(BoxBody) -> BoxBody + Clone> {
MapRequestBodyLayer::new(move |mut body: BoxBody| {
) -> MapRequestBodyLayer<impl Fn(Body) -> Body + Clone> {
MapRequestBodyLayer::new(move |mut body: Body| {
let (tx, new_body) = ChannelBody::new();

let bytes_sent_counter = bytes_sent_counter.clone();
Expand All @@ -128,7 +128,7 @@ pub fn measure_request_body_size_layer(
}
});

new_body.boxed_unsync()
Body::new(new_body)
})
}

Expand Down Expand Up @@ -157,7 +157,7 @@ impl AssertRightEncoding {
Self { encoding }
}

pub fn call<B: Body>(self, req: http::Request<B>) -> http::Request<B> {
pub fn call<B: HttpBody>(self, req: http::Request<B>) -> http::Request<B> {
let expected = match self.encoding {
CompressionEncoding::Gzip => "gzip",
CompressionEncoding::Zstd => "zstd",
Expand Down
8 changes: 4 additions & 4 deletions tests/integration_tests/tests/extensions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
};
use tokio::sync::oneshot;
use tonic::{
body::BoxBody,
body::Body,
server::NamedService,
transport::{Endpoint, Server},
Request, Response, Status,
Expand Down Expand Up @@ -112,9 +112,9 @@ struct InterceptedService<S> {
inner: S,
}

impl<S> Service<http::Request<BoxBody>> for InterceptedService<S>
impl<S> Service<http::Request<Body>> for InterceptedService<S>
where
S: Service<http::Request<BoxBody>, Response = http::Response<BoxBody>>
S: Service<http::Request<Body>, Response = http::Response<Body>>
+ NamedService
+ Clone
+ Send
Expand All @@ -129,7 +129,7 @@ where
self.inner.poll_ready(cx)
}

fn call(&mut self, mut req: http::Request<BoxBody>) -> Self::Future {
fn call(&mut self, mut req: http::Request<Body>) -> Self::Future {
let clone = self.inner.clone();
let mut inner = std::mem::replace(&mut self.inner, clone);

Expand Down
6 changes: 3 additions & 3 deletions tests/integration_tests/tests/origin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ struct OriginService<S> {
inner: S,
}

impl<T> Service<Request<tonic::body::BoxBody>> for OriginService<T>
impl<T> Service<Request<tonic::body::Body>> for OriginService<T>
where
T: Service<Request<tonic::body::BoxBody>>,
T: Service<Request<tonic::body::Body>>,
T::Future: Send + 'static,
T::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
Expand All @@ -90,7 +90,7 @@ where
self.inner.poll_ready(cx).map_err(Into::into)
}

fn call(&mut self, req: Request<tonic::body::BoxBody>) -> Self::Future {
fn call(&mut self, req: Request<tonic::body::Body>) -> Self::Future {
assert_eq!(req.uri().host(), Some("docs.rs"));
let fut = self.inner.call(req);

Expand Down
6 changes: 3 additions & 3 deletions tests/web/tests/grpc_web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use hyper_util::rt::TokioExecutor;
use prost::Message;
use tokio::net::TcpListener;
use tokio_stream::wrappers::TcpListenerStream;
use tonic::body::BoxBody;
use tonic::body::Body;
use tonic::transport::Server;

use test_web::pb::{test_server::TestServer, Input, Output};
Expand Down Expand Up @@ -108,7 +108,7 @@ fn encode_body() -> Bytes {
buf.split_to(len + 5).freeze()
}

fn build_request(base_uri: String, content_type: &str, accept: &str) -> Request<BoxBody> {
fn build_request(base_uri: String, content_type: &str, accept: &str) -> Request<Body> {
use header::{ACCEPT, CONTENT_TYPE, ORIGIN};

let request_uri = format!("{}/{}/{}", base_uri, "test.Test", "UnaryCall")
Expand All @@ -129,7 +129,7 @@ fn build_request(base_uri: String, content_type: &str, accept: &str) -> Request<
.header(ORIGIN, "http://example.com")
.header(ACCEPT, format!("application/{}", accept))
.uri(request_uri)
.body(BoxBody::new(
.body(Body::new(
Full::new(bytes).map_err(|err| Status::internal(err.to_string())),
))
.unwrap()
Expand Down
8 changes: 4 additions & 4 deletions tonic-build/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub(crate) fn generate_internal<T: Service>(

impl<T> #service_ident<T>
where
T: tonic::client::GrpcService<tonic::body::BoxBody>,
T: tonic::client::GrpcService<tonic::body::Body>,
T::Error: Into<StdError>,
T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
<T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
Expand All @@ -87,10 +87,10 @@ pub(crate) fn generate_internal<T: Service>(
F: tonic::service::Interceptor,
T::ResponseBody: Default,
T: tonic::codegen::Service<
http::Request<tonic::body::BoxBody>,
Response = http::Response<<T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody>
http::Request<tonic::body::Body>,
Response = http::Response<<T as tonic::client::GrpcService<tonic::body::Body>>::ResponseBody>
>,
<T as tonic::codegen::Service<http::Request<tonic::body::BoxBody>>>::Error: Into<StdError> + std::marker::Send + std::marker::Sync,
<T as tonic::codegen::Service<http::Request<tonic::body::Body>>>::Error: Into<StdError> + std::marker::Send + std::marker::Sync,
{
#service_ident::new(InterceptedService::new(inner, interceptor))
}
Expand Down
4 changes: 2 additions & 2 deletions tonic-build/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ pub(crate) fn generate_internal<T: Service>(
B: Body + std::marker::Send + 'static,
B::Error: Into<StdError> + std::marker::Send + 'static,
{
type Response = http::Response<tonic::body::BoxBody>;
type Response = http::Response<tonic::body::Body>;
type Error = std::convert::Infallible;
type Future = BoxFuture<Self::Response, Self::Error>;

Expand All @@ -165,7 +165,7 @@ pub(crate) fn generate_internal<T: Service>(
#methods

_ => Box::pin(async move {
let mut response = http::Response::new(tonic::body::BoxBody::default());
let mut response = http::Response::new(tonic::body::Body::default());
let headers = response.headers_mut();
headers.insert(tonic::Status::GRPC_STATUS, (tonic::Code::Unimplemented as i32).into());
headers.insert(http::header::CONTENT_TYPE, tonic::metadata::GRPC_CONTENT_TYPE);
Expand Down
1 change: 0 additions & 1 deletion tonic-health/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,5 @@ allowed_external_types = [
"prost::*",

"futures_core::stream::Stream",
"http_body_util::combinators::box_body::UnsyncBoxBody",
"tower_service::Service",
]
Loading

0 comments on commit b48c876

Please sign in to comment.