diff --git a/examples/src/h2c/client.rs b/examples/src/h2c/client.rs index fc833575a..240a874cb 100644 --- a/examples/src/h2c/client.rs +++ b/examples/src/h2c/client.rs @@ -39,14 +39,14 @@ mod h2c { client::legacy::{connect::HttpConnector, Client}, rt::TokioExecutor, }; - use tonic::body::BoxBody; + use tonic::body::Body; use tower::Service; pub struct H2cChannel { - pub client: Client, + pub client: Client, } - impl Service> for H2cChannel { + impl Service> for H2cChannel { type Response = http::Response; type Error = hyper::Error; type Future = @@ -56,7 +56,7 @@ mod h2c { Poll::Ready(Ok(())) } - fn call(&mut self, request: http::Request) -> Self::Future { + fn call(&mut self, request: http::Request) -> Self::Future { let client = self.client.clone(); Box::pin(async move { @@ -65,7 +65,7 @@ mod h2c { let h2c_req = hyper::Request::builder() .uri(origin) .header(http::header::UPGRADE, "h2c") - .body(BoxBody::default()) + .body(Body::default()) .unwrap(); let res = client.request(h2c_req).await.unwrap(); diff --git a/examples/src/h2c/server.rs b/examples/src/h2c/server.rs index 1b959821e..0dc3cb289 100644 --- a/examples/src/h2c/server.rs +++ b/examples/src/h2c/server.rs @@ -69,7 +69,7 @@ mod h2c { use http::{Request, Response}; use hyper::body::Incoming; use hyper_util::{rt::TokioExecutor, service::TowerToHyperService}; - use tonic::body::BoxBody; + use tonic::body::Body; use tower::{Service, ServiceExt}; #[derive(Clone)] @@ -81,12 +81,12 @@ mod h2c { impl Service> for H2c where - S: Service, Response = Response> + Clone + Send + 'static, + S: Service, Response = Response> + Clone + Send + 'static, S::Future: Send + 'static, S::Error: Into + Sync + Send + 'static, S::Response: Send + 'static, { - type Response = hyper::Response; + type Response = hyper::Response; type Error = hyper::Error; type Future = Pin> + Send>>; @@ -99,11 +99,11 @@ mod h2c { } fn call(&mut self, req: hyper::Request) -> Self::Future { - let mut req = req.map(tonic::body::boxed); + let mut req = req.map(Body::new); let svc = self .s .clone() - .map_request(|req: Request<_>| req.map(tonic::body::boxed)); + .map_request(|req: Request<_>| req.map(Body::new)); Box::pin(async move { tokio::spawn(async move { let upgraded_io = hyper::upgrade::on(&mut req).await.unwrap(); @@ -114,7 +114,7 @@ mod h2c { .unwrap(); }); - let mut res = hyper::Response::new(BoxBody::default()); + let mut res = hyper::Response::new(Body::default()); *res.status_mut() = http::StatusCode::SWITCHING_PROTOCOLS; res.headers_mut().insert( hyper::header::UPGRADE, diff --git a/examples/src/tls_rustls/server.rs b/examples/src/tls_rustls/server.rs index 7cc464572..afdaec40d 100644 --- a/examples/src/tls_rustls/server.rs +++ b/examples/src/tls_rustls/server.rs @@ -14,7 +14,7 @@ use tokio_rustls::{ rustls::{pki_types::CertificateDer, ServerConfig}, TlsAcceptor, }; -use tonic::{body::boxed, service::Routes, Request, Response, Status}; +use tonic::{body::Body, service::Routes, Request, Response, Status}; use tower::ServiceExt; use tower_http::ServiceBuilderExt; @@ -79,7 +79,9 @@ async fn main() -> Result<(), Box> { http.serve_connection( TokioIo::new(conn), - TowerToHyperService::new(svc.map_request(|req: http::Request<_>| req.map(boxed))), + TowerToHyperService::new( + svc.map_request(|req: http::Request<_>| req.map(Body::new)), + ), ) .await .unwrap(); diff --git a/examples/src/tower/client.rs b/examples/src/tower/client.rs index b268a6ebb..86a539aa6 100644 --- a/examples/src/tower/client.rs +++ b/examples/src/tower/client.rs @@ -43,7 +43,7 @@ mod service { use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; - use tonic::body::BoxBody; + use tonic::body::Body; use tonic::transport::Channel; use tower::Service; @@ -57,8 +57,8 @@ mod service { } } - impl Service> for AuthSvc { - type Response = Response; + impl Service> for AuthSvc { + type Response = Response; type Error = Box; #[allow(clippy::type_complexity)] type Future = Pin> + Send>>; @@ -67,7 +67,7 @@ mod service { self.inner.poll_ready(cx).map_err(Into::into) } - fn call(&mut self, req: Request) -> Self::Future { + fn call(&mut self, req: Request) -> Self::Future { // See: https://docs.rs/tower/latest/tower/trait.Service.html#be-careful-when-cloning-inner-services let clone = self.inner.clone(); let mut inner = std::mem::replace(&mut self.inner, clone); diff --git a/interop/src/server.rs b/interop/src/server.rs index 70f86fa6e..199a21b88 100644 --- a/interop/src/server.rs +++ b/interop/src/server.rs @@ -1,14 +1,14 @@ use crate::pb::{self, *}; use async_stream::try_stream; use http::header::{HeaderName, HeaderValue}; -use http_body::Body; +use http_body::Body as HttpBody; use std::future::Future; use std::pin::Pin; use std::result::Result as StdResult; use std::task::{ready, Context, Poll}; use std::time::Duration; use tokio_stream::StreamExt; -use tonic::{body::BoxBody, server::NamedService, Code, Request, Response, Status}; +use tonic::{body::Body, server::NamedService, Code, Request, Response, Status}; use tower::Service; pub use pb::test_service_server::TestServiceServer; @@ -180,9 +180,9 @@ impl EchoHeadersSvc { } } -impl Service> for EchoHeadersSvc +impl Service> for EchoHeadersSvc where - S: Service, Response = http::Response> + Send, + S: Service, Response = http::Response> + Send, S::Future: Send + 'static, { type Response = S::Response; @@ -193,7 +193,7 @@ where Ok(()).into() } - fn call(&mut self, req: http::Request) -> Self::Future { + fn call(&mut self, req: http::Request) -> Self::Future { let echo_header = req.headers().get("x-grpc-test-echo-initial").cloned(); let echo_trailer = req @@ -212,7 +212,7 @@ where .insert("x-grpc-test-echo-initial", echo_header); Ok(res .map(|b| MergeTrailers::new(b, echo_trailer)) - .map(BoxBody::new)) + .map(Body::new)) } else { Ok(res) } @@ -231,7 +231,7 @@ impl MergeTrailers { } } -impl Body for MergeTrailers { +impl HttpBody for MergeTrailers { type Data = B::Data; type Error = B::Error; diff --git a/tests/compression/src/util.rs b/tests/compression/src/util.rs index d7e250ce4..8c761ddf5 100644 --- a/tests/compression/src/util.rs +++ b/tests/compression/src/util.rs @@ -1,6 +1,6 @@ use super::*; use bytes::{Buf, Bytes}; -use http_body::{Body, Frame}; +use http_body::{Body as HttpBody, Frame}; use http_body_util::BodyExt as _; use hyper_util::rt::TokioIo; use pin_project::pin_project; @@ -13,7 +13,7 @@ use std::{ task::{ready, Context, Poll}, }; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; -use tonic::body::BoxBody; +use tonic::body::Body; use tonic::codec::CompressionEncoding; use tonic::transport::{server::Connected, Channel}; use tower_http::map_request_body::MapRequestBodyLayer; @@ -42,9 +42,9 @@ pub struct CountBytesBody { pub counter: Arc, } -impl Body for CountBytesBody +impl HttpBody for CountBytesBody where - B: Body, + B: HttpBody, { type Data = B::Data; type Error = B::Error; @@ -95,7 +95,7 @@ impl ChannelBody { } } -impl Body for ChannelBody +impl HttpBody for ChannelBody where T: Buf, { @@ -114,8 +114,8 @@ where #[allow(dead_code)] pub fn measure_request_body_size_layer( bytes_sent_counter: Arc, -) -> MapRequestBodyLayer BoxBody + Clone> { - MapRequestBodyLayer::new(move |mut body: BoxBody| { +) -> MapRequestBodyLayer Body + Clone> { + MapRequestBodyLayer::new(move |mut body: Body| { let (tx, new_body) = ChannelBody::new(); let bytes_sent_counter = bytes_sent_counter.clone(); @@ -128,7 +128,7 @@ pub fn measure_request_body_size_layer( } }); - new_body.boxed_unsync() + Body::new(new_body) }) } @@ -157,7 +157,7 @@ impl AssertRightEncoding { Self { encoding } } - pub fn call(self, req: http::Request) -> http::Request { + pub fn call(self, req: http::Request) -> http::Request { let expected = match self.encoding { CompressionEncoding::Gzip => "gzip", CompressionEncoding::Zstd => "zstd", diff --git a/tests/integration_tests/tests/extensions.rs b/tests/integration_tests/tests/extensions.rs index 231477358..1ac90bb35 100644 --- a/tests/integration_tests/tests/extensions.rs +++ b/tests/integration_tests/tests/extensions.rs @@ -8,7 +8,7 @@ use std::{ }; use tokio::sync::oneshot; use tonic::{ - body::BoxBody, + body::Body, server::NamedService, transport::{Endpoint, Server}, Request, Response, Status, @@ -112,9 +112,9 @@ struct InterceptedService { inner: S, } -impl Service> for InterceptedService +impl Service> for InterceptedService where - S: Service, Response = http::Response> + S: Service, Response = http::Response> + NamedService + Clone + Send @@ -129,7 +129,7 @@ where self.inner.poll_ready(cx) } - fn call(&mut self, mut req: http::Request) -> Self::Future { + fn call(&mut self, mut req: http::Request) -> Self::Future { let clone = self.inner.clone(); let mut inner = std::mem::replace(&mut self.inner, clone); diff --git a/tests/integration_tests/tests/origin.rs b/tests/integration_tests/tests/origin.rs index e41287245..6c71c54b4 100644 --- a/tests/integration_tests/tests/origin.rs +++ b/tests/integration_tests/tests/origin.rs @@ -76,9 +76,9 @@ struct OriginService { inner: S, } -impl Service> for OriginService +impl Service> for OriginService where - T: Service>, + T: Service>, T::Future: Send + 'static, T::Error: Into>, { @@ -90,7 +90,7 @@ where self.inner.poll_ready(cx).map_err(Into::into) } - fn call(&mut self, req: Request) -> Self::Future { + fn call(&mut self, req: Request) -> Self::Future { assert_eq!(req.uri().host(), Some("docs.rs")); let fut = self.inner.call(req); diff --git a/tests/web/tests/grpc_web.rs b/tests/web/tests/grpc_web.rs index 1c00773a3..c0c3c3fdd 100644 --- a/tests/web/tests/grpc_web.rs +++ b/tests/web/tests/grpc_web.rs @@ -11,7 +11,7 @@ use hyper_util::rt::TokioExecutor; use prost::Message; use tokio::net::TcpListener; use tokio_stream::wrappers::TcpListenerStream; -use tonic::body::BoxBody; +use tonic::body::Body; use tonic::transport::Server; use test_web::pb::{test_server::TestServer, Input, Output}; @@ -108,7 +108,7 @@ fn encode_body() -> Bytes { buf.split_to(len + 5).freeze() } -fn build_request(base_uri: String, content_type: &str, accept: &str) -> Request { +fn build_request(base_uri: String, content_type: &str, accept: &str) -> Request { use header::{ACCEPT, CONTENT_TYPE, ORIGIN}; let request_uri = format!("{}/{}/{}", base_uri, "test.Test", "UnaryCall") @@ -129,7 +129,7 @@ fn build_request(base_uri: String, content_type: &str, accept: &str) -> Request< .header(ORIGIN, "http://example.com") .header(ACCEPT, format!("application/{}", accept)) .uri(request_uri) - .body(BoxBody::new( + .body(Body::new( Full::new(bytes).map_err(|err| Status::internal(err.to_string())), )) .unwrap() diff --git a/tonic-build/src/client.rs b/tonic-build/src/client.rs index 42b9fdf47..56c847585 100644 --- a/tonic-build/src/client.rs +++ b/tonic-build/src/client.rs @@ -67,7 +67,7 @@ pub(crate) fn generate_internal( impl #service_ident where - T: tonic::client::GrpcService, + T: tonic::client::GrpcService, T::Error: Into, T::ResponseBody: Body + std::marker::Send + 'static, ::Error: Into + std::marker::Send, @@ -87,10 +87,10 @@ pub(crate) fn generate_internal( F: tonic::service::Interceptor, T::ResponseBody: Default, T: tonic::codegen::Service< - http::Request, - Response = http::Response<>::ResponseBody> + http::Request, + Response = http::Response<>::ResponseBody> >, - >>::Error: Into + std::marker::Send + std::marker::Sync, + >>::Error: Into + std::marker::Send + std::marker::Sync, { #service_ident::new(InterceptedService::new(inner, interceptor)) } diff --git a/tonic-build/src/server.rs b/tonic-build/src/server.rs index 877749fbd..e2d0aacd9 100644 --- a/tonic-build/src/server.rs +++ b/tonic-build/src/server.rs @@ -152,7 +152,7 @@ pub(crate) fn generate_internal( B: Body + std::marker::Send + 'static, B::Error: Into + std::marker::Send + 'static, { - type Response = http::Response; + type Response = http::Response; type Error = std::convert::Infallible; type Future = BoxFuture; @@ -165,7 +165,7 @@ pub(crate) fn generate_internal( #methods _ => Box::pin(async move { - let mut response = http::Response::new(tonic::body::BoxBody::default()); + let mut response = http::Response::new(tonic::body::Body::default()); let headers = response.headers_mut(); headers.insert(tonic::Status::GRPC_STATUS, (tonic::Code::Unimplemented as i32).into()); headers.insert(http::header::CONTENT_TYPE, tonic::metadata::GRPC_CONTENT_TYPE); diff --git a/tonic-health/Cargo.toml b/tonic-health/Cargo.toml index a77048179..25b36dc63 100644 --- a/tonic-health/Cargo.toml +++ b/tonic-health/Cargo.toml @@ -43,6 +43,5 @@ allowed_external_types = [ "prost::*", "futures_core::stream::Stream", - "http_body_util::combinators::box_body::UnsyncBoxBody", "tower_service::Service", ] diff --git a/tonic-health/src/generated/grpc_health_v1.rs b/tonic-health/src/generated/grpc_health_v1.rs index ffa19c36b..67ec57c98 100644 --- a/tonic-health/src/generated/grpc_health_v1.rs +++ b/tonic-health/src/generated/grpc_health_v1.rs @@ -72,7 +72,7 @@ pub mod health_client { } impl HealthClient where - T: tonic::client::GrpcService, + T: tonic::client::GrpcService, T::Error: Into, T::ResponseBody: Body + std::marker::Send + 'static, ::Error: Into + std::marker::Send, @@ -93,13 +93,13 @@ pub mod health_client { F: tonic::service::Interceptor, T::ResponseBody: Default, T: tonic::codegen::Service< - http::Request, + http::Request, Response = http::Response< - >::ResponseBody, + >::ResponseBody, >, >, , + http::Request, >>::Error: Into + std::marker::Send + std::marker::Sync, { HealthClient::new(InterceptedService::new(inner, interceptor)) @@ -315,7 +315,7 @@ pub mod health_server { B: Body + std::marker::Send + 'static, B::Error: Into + std::marker::Send + 'static, { - type Response = http::Response; + type Response = http::Response; type Error = std::convert::Infallible; type Future = BoxFuture; fn poll_ready( @@ -420,7 +420,7 @@ pub mod health_server { _ => { Box::pin(async move { let mut response = http::Response::new( - tonic::body::BoxBody::default(), + tonic::body::Body::default(), ); let headers = response.headers_mut(); headers diff --git a/tonic-reflection/Cargo.toml b/tonic-reflection/Cargo.toml index 910f92046..df14e971a 100644 --- a/tonic-reflection/Cargo.toml +++ b/tonic-reflection/Cargo.toml @@ -49,6 +49,5 @@ allowed_external_types = [ "prost_types::*", "futures_core::stream::Stream", - "http_body_util::combinators::box_body::UnsyncBoxBody", "tower_service::Service", ] diff --git a/tonic-reflection/src/generated/grpc_reflection_v1.rs b/tonic-reflection/src/generated/grpc_reflection_v1.rs index aca6b2692..569ee6715 100644 --- a/tonic-reflection/src/generated/grpc_reflection_v1.rs +++ b/tonic-reflection/src/generated/grpc_reflection_v1.rs @@ -161,7 +161,7 @@ pub mod server_reflection_client { } impl ServerReflectionClient where - T: tonic::client::GrpcService, + T: tonic::client::GrpcService, T::Error: Into, T::ResponseBody: Body + std::marker::Send + 'static, ::Error: Into + std::marker::Send, @@ -182,13 +182,13 @@ pub mod server_reflection_client { F: tonic::service::Interceptor, T::ResponseBody: Default, T: tonic::codegen::Service< - http::Request, + http::Request, Response = http::Response< - >::ResponseBody, + >::ResponseBody, >, >, , + http::Request, >>::Error: Into + std::marker::Send + std::marker::Sync, { ServerReflectionClient::new(InterceptedService::new(inner, interceptor)) @@ -356,7 +356,7 @@ pub mod server_reflection_server { B: Body + std::marker::Send + 'static, B::Error: Into + std::marker::Send + 'static, { - type Response = http::Response; + type Response = http::Response; type Error = std::convert::Infallible; type Future = BoxFuture; fn poll_ready( @@ -422,7 +422,7 @@ pub mod server_reflection_server { _ => { Box::pin(async move { let mut response = http::Response::new( - tonic::body::BoxBody::default(), + tonic::body::Body::default(), ); let headers = response.headers_mut(); headers diff --git a/tonic-reflection/src/generated/grpc_reflection_v1alpha.rs b/tonic-reflection/src/generated/grpc_reflection_v1alpha.rs index b7f0395d3..685d9b0a1 100644 --- a/tonic-reflection/src/generated/grpc_reflection_v1alpha.rs +++ b/tonic-reflection/src/generated/grpc_reflection_v1alpha.rs @@ -161,7 +161,7 @@ pub mod server_reflection_client { } impl ServerReflectionClient where - T: tonic::client::GrpcService, + T: tonic::client::GrpcService, T::Error: Into, T::ResponseBody: Body + std::marker::Send + 'static, ::Error: Into + std::marker::Send, @@ -182,13 +182,13 @@ pub mod server_reflection_client { F: tonic::service::Interceptor, T::ResponseBody: Default, T: tonic::codegen::Service< - http::Request, + http::Request, Response = http::Response< - >::ResponseBody, + >::ResponseBody, >, >, , + http::Request, >>::Error: Into + std::marker::Send + std::marker::Sync, { ServerReflectionClient::new(InterceptedService::new(inner, interceptor)) @@ -356,7 +356,7 @@ pub mod server_reflection_server { B: Body + std::marker::Send + 'static, B::Error: Into + std::marker::Send + 'static, { - type Response = http::Response; + type Response = http::Response; type Error = std::convert::Infallible; type Future = BoxFuture; fn poll_ready( @@ -422,7 +422,7 @@ pub mod server_reflection_server { _ => { Box::pin(async move { let mut response = http::Response::new( - tonic::body::BoxBody::default(), + tonic::body::Body::default(), ); let headers = response.headers_mut(); headers diff --git a/tonic-web/Cargo.toml b/tonic-web/Cargo.toml index 8a6e4dbcd..b26d93a93 100644 --- a/tonic-web/Cargo.toml +++ b/tonic-web/Cargo.toml @@ -20,7 +20,6 @@ bytes = "1" tokio-stream = "0.1" http = "1" http-body = "1" -http-body-util = "0.1" pin-project = "1" tonic = { version = "0.13.0", path = "../tonic", default-features = false } tower-service = "0.3" @@ -42,7 +41,6 @@ allowed_external_types = [ # not major released "futures_core::stream::Stream", - "http_body_util::combinators::box_body::UnsyncBoxBody", "tower_http::cors::Cors", "tower_layer::Layer", "tower_service::Service", diff --git a/tonic-web/src/lib.rs b/tonic-web/src/lib.rs index 4783a12d2..051d3d547 100644 --- a/tonic-web/src/lib.rs +++ b/tonic-web/src/lib.rs @@ -109,7 +109,7 @@ mod service; use http::header::HeaderName; use std::time::Duration; -use tonic::{body::BoxBody, server::NamedService, Status}; +use tonic::{body::Body, server::NamedService, Status}; use tower_http::cors::{AllowOrigin, CorsLayer}; use tower_layer::Layer; use tower_service::Service; @@ -136,7 +136,7 @@ const DEFAULT_ALLOW_HEADERS: [HeaderName; 4] = [ )] pub fn enable(service: S) -> CorsGrpcWeb where - S: Service, Response = http::Response>, + S: Service, Response = http::Response>, { let cors = CorsLayer::new() .allow_origin(AllowOrigin::mirror_request()) @@ -153,14 +153,14 @@ where #[derive(Debug, Clone)] pub struct CorsGrpcWeb(tower_http::cors::Cors>); -impl Service> for CorsGrpcWeb +impl Service> for CorsGrpcWeb where - S: Service, Response = http::Response>, + S: Service, Response = http::Response>, { type Response = S::Response; type Error = S::Error; type Future = - > as Service>>::Future; + > as Service>>::Future; fn poll_ready( &mut self, @@ -169,7 +169,7 @@ where self.0.poll_ready(cx) } - fn call(&mut self, req: http::Request) -> Self::Future { + fn call(&mut self, req: http::Request) -> Self::Future { self.0.call(req) } } diff --git a/tonic-web/src/service.rs b/tonic-web/src/service.rs index 199dc4103..4ea1881c6 100644 --- a/tonic-web/src/service.rs +++ b/tonic-web/src/service.rs @@ -3,10 +3,9 @@ use std::pin::Pin; use std::task::{ready, Context, Poll}; use http::{header, HeaderMap, HeaderValue, Method, Request, Response, StatusCode, Version}; -use http_body_util::BodyExt; use pin_project::pin_project; use tonic::metadata::GRPC_CONTENT_TYPE; -use tonic::{body::BoxBody, server::NamedService}; +use tonic::{body::Body, server::NamedService}; use tower_service::Service; use tracing::{debug, trace}; @@ -45,7 +44,7 @@ impl GrpcWebService { impl GrpcWebService where - S: Service, Response = Response>, + S: Service, Response = Response>, { fn response(&self, status: StatusCode) -> ResponseFuture { ResponseFuture { @@ -53,7 +52,7 @@ where res: Some( Response::builder() .status(status) - .body(BoxBody::default()) + .body(Body::default()) .unwrap(), ), }, @@ -61,9 +60,9 @@ where } } -impl Service> for GrpcWebService +impl Service> for GrpcWebService where - S: Service, Response = Response>, + S: Service, Response = Response>, { type Response = S::Response; type Error = S::Error; @@ -73,7 +72,7 @@ where self.inner.poll_ready(cx) } - fn call(&mut self, req: Request) -> Self::Future { + fn call(&mut self, req: Request) -> Self::Future { match RequestKind::new(req.headers(), req.method(), req.version()) { // A valid grpc-web request, regardless of HTTP version. // @@ -148,15 +147,15 @@ enum Case { future: F, }, ImmediateResponse { - res: Option>, + res: Option>, }, } impl Future for ResponseFuture where - F: Future, E>>, + F: Future, E>>, { - type Output = Result, E>; + type Output = Result, E>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut this = self.project(); @@ -194,7 +193,7 @@ impl<'a> RequestKind<'a> { // Mutating request headers to conform to a gRPC request is not really // necessary for us at this point. We could remove most of these except // maybe for inserting `header::TE`, which tonic should check? -fn coerce_request(mut req: Request, encoding: Encoding) -> Request { +fn coerce_request(mut req: Request, encoding: Encoding) -> Request { req.headers_mut().remove(header::CONTENT_LENGTH); req.headers_mut() @@ -208,13 +207,13 @@ fn coerce_request(mut req: Request, encoding: Encoding) -> Request, encoding: Encoding) -> Response { +fn coerce_response(res: Response, encoding: Encoding) -> Response { let mut res = res .map(|b| GrpcWebCall::response(b, encoding)) - .map(BoxBody::new); + .map(Body::new); res.headers_mut().insert( header::CONTENT_TYPE, @@ -238,8 +237,8 @@ mod tests { #[derive(Debug, Clone)] struct Svc; - impl tower_service::Service> for Svc { - type Response = Response; + impl tower_service::Service> for Svc { + type Response = Response; type Error = String; type Future = BoxFuture; @@ -247,8 +246,8 @@ mod tests { Poll::Ready(Ok(())) } - fn call(&mut self, _: Request) -> Self::Future { - Box::pin(async { Ok(Response::new(BoxBody::default())) }) + fn call(&mut self, _: Request) -> Self::Future { + Box::pin(async { Ok(Response::new(Body::default())) }) } } @@ -258,7 +257,7 @@ mod tests { fn enable(service: S) -> tower_http::cors::Cors> where - S: Service, Response = http::Response>, + S: Service, Response = http::Response>, { tower_layer::Stack::new( crate::GrpcWebLayer::new(), @@ -271,12 +270,12 @@ mod tests { use super::*; use tower_layer::Layer; - fn request() -> Request { + fn request() -> Request { Request::builder() .method(Method::POST) .header(CONTENT_TYPE, GRPC_WEB) .header(ORIGIN, "http://example.com") - .body(BoxBody::default()) + .body(Body::default()) .unwrap() } @@ -352,13 +351,13 @@ mod tests { mod options { use super::*; - fn request() -> Request { + fn request() -> Request { Request::builder() .method(Method::OPTIONS) .header(ORIGIN, "http://example.com") .header(ACCESS_CONTROL_REQUEST_HEADERS, "x-grpc-web") .header(ACCESS_CONTROL_REQUEST_METHOD, "POST") - .body(BoxBody::default()) + .body(Body::default()) .unwrap() } @@ -374,11 +373,11 @@ mod tests { mod grpc { use super::*; - fn request() -> Request { + fn request() -> Request { Request::builder() .version(Version::HTTP_2) .header(CONTENT_TYPE, GRPC_CONTENT_TYPE) - .body(BoxBody::default()) + .body(Body::default()) .unwrap() } @@ -398,7 +397,7 @@ mod tests { let req = Request::builder() .header(CONTENT_TYPE, GRPC_CONTENT_TYPE) - .body(BoxBody::default()) + .body(Body::default()) .unwrap(); let res = svc.call(req).await.unwrap(); @@ -426,10 +425,10 @@ mod tests { mod other { use super::*; - fn request() -> Request { + fn request() -> Request { Request::builder() .header(CONTENT_TYPE, "application/text") - .body(BoxBody::default()) + .body(Body::default()) .unwrap() } diff --git a/tonic/Cargo.toml b/tonic/Cargo.toml index 186d3c03d..78b9c8a58 100644 --- a/tonic/Cargo.toml +++ b/tonic/Cargo.toml @@ -133,7 +133,6 @@ allowed_external_types = [ "axum::routing::Router", "futures_core::stream::Stream", "h2::error::Error", - "http_body_util::combinators::box_body::UnsyncBoxBody", "tower::discover::Change", "tower_service::Service", "tower_layer::Layer", diff --git a/tonic/src/body.rs b/tonic/src/body.rs index 5e5ba2c54..046c6a631 100644 --- a/tonic/src/body.rs +++ b/tonic/src/body.rs @@ -1,21 +1,94 @@ //! HTTP specific body utilities. -use http_body_util::BodyExt; - -/// A type erased HTTP body used for tonic services. -pub type BoxBody = http_body_util::combinators::UnsyncBoxBody; - -/// Convert a [`http_body::Body`] into a [`BoxBody`]. -pub fn boxed(body: B) -> BoxBody -where - B: http_body::Body + Send + 'static, - B::Error: Into, -{ - body.map_err(crate::Status::map_error).boxed_unsync() +use std::{pin::Pin, task::Poll}; + +use http_body_util::BodyExt as _; + +// A type erased HTTP body. +type BoxBody = http_body_util::combinators::UnsyncBoxBody; + +/// A body type used in `tonic`. +#[derive(Debug)] +pub struct Body { + kind: Kind, +} + +#[derive(Debug)] +enum Kind { + Empty, + Wrap(BoxBody), } -/// Create an empty `BoxBody` -#[deprecated(since = "0.12.4", note = "use `BoxBody::default()` instead")] -pub fn empty_body() -> BoxBody { - BoxBody::default() +impl Body { + fn from_kind(kind: Kind) -> Self { + Self { kind } + } + + /// Create a new empty `Body`. + pub const fn empty() -> Self { + Self { kind: Kind::Empty } + } + + /// Create a new `Body` from an existing `Body`. + pub fn new(body: B) -> Self + where + B: http_body::Body + Send + 'static, + B::Error: Into, + { + if body.is_end_stream() { + return Self::empty(); + } + + let mut body = Some(body); + + if let Some(body) = ::downcast_mut::>(&mut body) { + return body.take().unwrap(); + } + + if let Some(body) = ::downcast_mut::>(&mut body) { + return Self::from_kind(Kind::Wrap(body.take().unwrap())); + } + + let body = body + .unwrap() + .map_err(crate::Status::map_error) + .boxed_unsync(); + + Self::from_kind(Kind::Wrap(body)) + } +} + +impl Default for Body { + fn default() -> Self { + Self::empty() + } +} + +impl http_body::Body for Body { + type Data = bytes::Bytes; + type Error = crate::Status; + + fn poll_frame( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll, Self::Error>>> { + match &mut self.kind { + Kind::Empty => Poll::Ready(None), + Kind::Wrap(body) => Pin::new(body).poll_frame(cx), + } + } + + fn size_hint(&self) -> http_body::SizeHint { + match &self.kind { + Kind::Empty => http_body::SizeHint::with_exact(0), + Kind::Wrap(body) => body.size_hint(), + } + } + + fn is_end_stream(&self) -> bool { + match &self.kind { + Kind::Empty => true, + Kind::Wrap(body) => body.is_end_stream(), + } + } } diff --git a/tonic/src/client/grpc.rs b/tonic/src/client/grpc.rs index 10fa87867..2d9685f7c 100644 --- a/tonic/src/client/grpc.rs +++ b/tonic/src/client/grpc.rs @@ -2,7 +2,7 @@ use crate::codec::compression::{CompressionEncoding, EnabledCompressionEncodings use crate::codec::EncodeBody; use crate::metadata::GRPC_CONTENT_TYPE; use crate::{ - body::BoxBody, + body::Body, client::GrpcService, codec::{Codec, Decoder, Streaming}, request::SanitizeHeaders, @@ -12,7 +12,7 @@ use http::{ header::{HeaderValue, CONTENT_TYPE, TE}, uri::{PathAndQuery, Uri}, }; -use http_body::Body; +use http_body::Body as HttpBody; use std::{fmt, future, pin::pin}; use tokio_stream::{Stream, StreamExt}; @@ -198,7 +198,7 @@ impl Grpc { /// accept one more request. pub async fn ready(&mut self) -> Result<(), T::Error> where - T: GrpcService, + T: GrpcService, { future::poll_fn(|cx| self.inner.poll_ready(cx)).await } @@ -211,9 +211,9 @@ impl Grpc { codec: C, ) -> Result, Status> where - T: GrpcService, - T::ResponseBody: Body + Send + 'static, - ::Error: Into, + T: GrpcService, + T::ResponseBody: HttpBody + Send + 'static, + ::Error: Into, C: Codec, M1: Send + Sync + 'static, M2: Send + Sync + 'static, @@ -230,9 +230,9 @@ impl Grpc { codec: C, ) -> Result, Status> where - T: GrpcService, - T::ResponseBody: Body + Send + 'static, - ::Error: Into, + T: GrpcService, + T::ResponseBody: HttpBody + Send + 'static, + ::Error: Into, S: Stream + Send + 'static, C: Codec, M1: Send + Sync + 'static, @@ -267,9 +267,9 @@ impl Grpc { codec: C, ) -> Result>, Status> where - T: GrpcService, - T::ResponseBody: Body + Send + 'static, - ::Error: Into, + T: GrpcService, + T::ResponseBody: HttpBody + Send + 'static, + ::Error: Into, C: Codec, M1: Send + Sync + 'static, M2: Send + Sync + 'static, @@ -286,9 +286,9 @@ impl Grpc { mut codec: C, ) -> Result>, Status> where - T: GrpcService, - T::ResponseBody: Body + Send + 'static, - ::Error: Into, + T: GrpcService, + T::ResponseBody: HttpBody + Send + 'static, + ::Error: Into, S: Stream + Send + 'static, C: Codec, M1: Send + Sync + 'static, @@ -303,7 +303,7 @@ impl Grpc { self.config.max_encoding_message_size, ) }) - .map(BoxBody::new); + .map(Body::new); let request = self.config.prepare_request(request, path); @@ -326,9 +326,9 @@ impl Grpc { response: http::Response, ) -> Result>, Status> where - T: GrpcService, - T::ResponseBody: Body + Send + 'static, - ::Error: Into, + T: GrpcService, + T::ResponseBody: HttpBody + Send + 'static, + ::Error: Into, { let encoding = CompressionEncoding::from_encoding_header( response.headers(), @@ -369,11 +369,7 @@ impl Grpc { } impl GrpcConfig { - fn prepare_request( - &self, - request: Request, - path: PathAndQuery, - ) -> http::Request { + fn prepare_request(&self, request: Request, path: PathAndQuery) -> http::Request { let mut parts = self.origin.clone().into_parts(); match &parts.path_and_query { diff --git a/tonic/src/codec/decode.rs b/tonic/src/codec/decode.rs index ebad0249a..164bb9d5a 100644 --- a/tonic/src/codec/decode.rs +++ b/tonic/src/codec/decode.rs @@ -1,9 +1,9 @@ use super::compression::{decompress, CompressionEncoding, CompressionSettings}; use super::{BufferSettings, DecodeBuf, Decoder, DEFAULT_MAX_RECV_MESSAGE_SIZE, HEADER_SIZE}; -use crate::{body::BoxBody, metadata::MetadataMap, Code, Status}; +use crate::{body::Body, metadata::MetadataMap, Code, Status}; use bytes::{Buf, BufMut, BytesMut}; use http::{HeaderMap, StatusCode}; -use http_body::Body; +use http_body::Body as HttpBody; use http_body_util::BodyExt; use std::{ fmt, future, @@ -24,7 +24,7 @@ pub struct Streaming { } struct StreamingInner { - body: BoxBody, + body: Body, state: State, direction: Direction, buf: BytesMut, @@ -64,7 +64,7 @@ impl Streaming { max_message_size: Option, ) -> Self where - B: Body + Send + 'static, + B: HttpBody + Send + 'static, B::Error: Into, D: Decoder + Send + 'static, { @@ -80,7 +80,7 @@ impl Streaming { /// Create empty response. For creating responses that have no content (headers + trailers only) pub fn new_empty(decoder: D, body: B) -> Self where - B: Body + Send + 'static, + B: HttpBody + Send + 'static, B::Error: Into, D: Decoder + Send + 'static, { @@ -96,7 +96,7 @@ impl Streaming { max_message_size: Option, ) -> Self where - B: Body + Send + 'static, + B: HttpBody + Send + 'static, B::Error: Into, D: Decoder + Send + 'static, { @@ -117,7 +117,7 @@ impl Streaming { max_message_size: Option, ) -> Self where - B: Body + Send + 'static, + B: HttpBody + Send + 'static, B::Error: Into, D: Decoder + Send + 'static, { @@ -125,10 +125,12 @@ impl Streaming { Self { decoder: Box::new(decoder), inner: StreamingInner { - body: body - .map_frame(|frame| frame.map_data(|mut buf| buf.copy_to_bytes(buf.remaining()))) - .map_err(|err| Status::map_error(err.into())) - .boxed_unsync(), + body: Body::new( + body.map_frame(|frame| { + frame.map_data(|mut buf| buf.copy_to_bytes(buf.remaining())) + }) + .map_err(|err| Status::map_error(err.into())), + ), state: State::ReadHeader, direction, buf: BytesMut::with_capacity(buffer_size), diff --git a/tonic/src/codegen.rs b/tonic/src/codegen.rs index a9fed8ba7..216536705 100644 --- a/tonic/src/codegen.rs +++ b/tonic/src/codegen.rs @@ -19,6 +19,3 @@ pub use http_body::Body; pub type BoxFuture = self::Pin> + Send + 'static>>; pub type BoxStream = self::Pin> + Send + 'static>>; - -#[allow(deprecated)] -pub use crate::body::empty_body; diff --git a/tonic/src/server/grpc.rs b/tonic/src/server/grpc.rs index 81afd93d1..aff4ce0df 100644 --- a/tonic/src/server/grpc.rs +++ b/tonic/src/server/grpc.rs @@ -4,12 +4,12 @@ use crate::codec::compression::{ use crate::codec::EncodeBody; use crate::metadata::GRPC_CONTENT_TYPE; use crate::{ - body::BoxBody, + body::Body, codec::{Codec, Streaming}, server::{ClientStreamingService, ServerStreamingService, StreamingService, UnaryService}, Request, Status, }; -use http_body::Body; +use http_body::Body as HttpBody; use std::{fmt, pin::pin}; use tokio_stream::{Stream, StreamExt}; @@ -224,10 +224,10 @@ where &mut self, mut service: S, req: http::Request, - ) -> http::Response + ) -> http::Response where S: UnaryService, - B: Body + Send + 'static, + B: HttpBody + Send + 'static, B::Error: Into + Send, { let accept_encoding = CompressionEncoding::from_accept_encoding_header( @@ -267,11 +267,11 @@ where &mut self, mut service: S, req: http::Request, - ) -> http::Response + ) -> http::Response where S: ServerStreamingService, S::ResponseStream: Send + 'static, - B: Body + Send + 'static, + B: HttpBody + Send + 'static, B::Error: Into + Send, { let accept_encoding = CompressionEncoding::from_accept_encoding_header( @@ -308,10 +308,10 @@ where &mut self, mut service: S, req: http::Request, - ) -> http::Response + ) -> http::Response where S: ClientStreamingService, - B: Body + Send + 'static, + B: HttpBody + Send + 'static, B::Error: Into + Send + 'static, { let accept_encoding = CompressionEncoding::from_accept_encoding_header( @@ -341,11 +341,11 @@ where &mut self, mut service: S, req: http::Request, - ) -> http::Response + ) -> http::Response where S: StreamingService + Send, S::ResponseStream: Send + 'static, - B: Body + Send + 'static, + B: HttpBody + Send + 'static, B::Error: Into + Send, { let accept_encoding = CompressionEncoding::from_accept_encoding_header( @@ -370,7 +370,7 @@ where request: http::Request, ) -> Result, Status> where - B: Body + Send + 'static, + B: HttpBody + Send + 'static, B::Error: Into + Send, { let request_compression_encoding = self.request_encoding_if_supported(&request)?; @@ -403,7 +403,7 @@ where request: http::Request, ) -> Result>, Status> where - B: Body + Send + 'static, + B: HttpBody + Send + 'static, B::Error: Into + Send, { let encoding = self.request_encoding_if_supported(&request)?; @@ -426,7 +426,7 @@ where accept_encoding: Option, compression_override: SingleMessageCompressionOverride, max_message_size: Option, - ) -> http::Response + ) -> http::Response where B: Stream> + Send + 'static, { @@ -456,7 +456,7 @@ where max_message_size, ); - http::Response::from_parts(parts, BoxBody::new(body)) + http::Response::from_parts(parts, Body::new(body)) } fn request_encoding_if_supported( diff --git a/tonic/src/service/router.rs b/tonic/src/service/router.rs index c059e908a..a2d42e99a 100644 --- a/tonic/src/service/router.rs +++ b/tonic/src/service/router.rs @@ -1,9 +1,4 @@ -use crate::{ - body::{boxed, BoxBody}, - metadata::GRPC_CONTENT_TYPE, - server::NamedService, - Status, -}; +use crate::{body::Body, metadata::GRPC_CONTENT_TYPE, server::NamedService, Status}; use http::{HeaderValue, Request, Response}; use std::{ convert::Infallible, @@ -30,7 +25,7 @@ impl RoutesBuilder { /// Add a new service. pub fn add_service(&mut self, svc: S) -> &mut Self where - S: Service, Error = Infallible> + NamedService + Clone + Send + 'static, + S: Service, Error = Infallible> + NamedService + Clone + Send + 'static, S::Response: axum::response::IntoResponse, S::Future: Send + 'static, { @@ -57,7 +52,7 @@ impl Routes { /// Create a new routes with `svc` already added to it. pub fn new(svc: S) -> Self where - S: Service, Error = Infallible> + NamedService + Clone + Send + 'static, + S: Service, Error = Infallible> + NamedService + Clone + Send + 'static, S::Response: axum::response::IntoResponse, S::Future: Send + 'static, { @@ -72,13 +67,13 @@ impl Routes { /// Add a new service. pub fn add_service(mut self, svc: S) -> Self where - S: Service, Error = Infallible> + NamedService + Clone + Send + 'static, + S: Service, Error = Infallible> + NamedService + Clone + Send + 'static, S::Response: axum::response::IntoResponse, S::Future: Send + 'static, { self.router = self.router.route_service( &format!("/{}/*rest", S::NAME), - svc.map_request(|req: Request| req.map(boxed)), + svc.map_request(|req: Request| req.map(Body::new)), ); self } @@ -145,7 +140,7 @@ where B: http_body::Body + Send + 'static, B::Error: Into, { - type Response = Response; + type Response = Response; type Error = crate::BoxError; type Future = RoutesFuture; @@ -168,11 +163,11 @@ impl fmt::Debug for RoutesFuture { } impl Future for RoutesFuture { - type Output = Result, crate::BoxError>; + type Output = Result, crate::BoxError>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match ready!(Pin::new(&mut self.as_mut().0).poll(cx)) { - Ok(res) => Ok(res.map(boxed)).into(), + Ok(res) => Ok(res.map(Body::new)).into(), // NOTE: This pattern is not needed from Rust 1.82. // See https://github.com/rust-lang/rust/pull/122792. #[allow(unreachable_patterns)] diff --git a/tonic/src/transport/channel/mod.rs b/tonic/src/transport/channel/mod.rs index c1bee0140..2b9a30553 100644 --- a/tonic/src/transport/channel/mod.rs +++ b/tonic/src/transport/channel/mod.rs @@ -10,7 +10,7 @@ pub use endpoint::Endpoint; pub use tls::ClientTlsConfig; use self::service::{Connection, DynamicServiceStream, Executor, SharedExec}; -use crate::body::BoxBody; +use crate::body::Body; use bytes::Bytes; use http::{ uri::{InvalidUri, Uri}, @@ -63,14 +63,14 @@ const DEFAULT_BUFFER_SIZE: usize = 1024; /// cloning the `Channel` type is cheap and encouraged. #[derive(Clone)] pub struct Channel { - svc: Buffer, BoxFuture<'static, Result, crate::BoxError>>>, + svc: Buffer, BoxFuture<'static, Result, crate::BoxError>>>, } /// A future that resolves to an HTTP response. /// /// This is returned by the `Service::call` on [`Channel`]. pub struct ResponseFuture { - inner: BufferResponseFuture, crate::BoxError>>>, + inner: BufferResponseFuture, crate::BoxError>>>, } impl Channel { @@ -203,8 +203,8 @@ impl Channel { } } -impl Service> for Channel { - type Response = http::Response; +impl Service> for Channel { + type Response = http::Response; type Error = super::Error; type Future = ResponseFuture; @@ -212,7 +212,7 @@ impl Service> for Channel { Service::poll_ready(&mut self.svc, cx).map_err(super::Error::from_source) } - fn call(&mut self, request: http::Request) -> Self::Future { + fn call(&mut self, request: http::Request) -> Self::Future { let inner = Service::call(&mut self.svc, request); ResponseFuture { inner } @@ -220,7 +220,7 @@ impl Service> for Channel { } impl Future for ResponseFuture { - type Output = Result, super::Error>; + type Output = Result, super::Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { Pin::new(&mut self.inner) diff --git a/tonic/src/transport/channel/service/connection.rs b/tonic/src/transport/channel/service/connection.rs index d7aa64baa..4e84ac92e 100644 --- a/tonic/src/transport/channel/service/connection.rs +++ b/tonic/src/transport/channel/service/connection.rs @@ -1,6 +1,6 @@ use super::{AddOrigin, Reconnect, SharedExec, UserAgent}; use crate::{ - body::{boxed, BoxBody}, + body::Body, transport::{channel::BoxFuture, service::GrpcTimeout, Endpoint}, }; use http::{Request, Response, Uri}; @@ -21,7 +21,7 @@ use tower::{ use tower_service::Service; pub(crate) struct Connection { - inner: BoxService, Response, crate::BoxError>, + inner: BoxService, Response, crate::BoxError>, } impl Connection { @@ -101,8 +101,8 @@ impl Connection { } } -impl Service> for Connection { - type Response = Response; +impl Service> for Connection { + type Response = Response; type Error = crate::BoxError; type Future = BoxFuture<'static, Result>; @@ -110,7 +110,7 @@ impl Service> for Connection { Service::poll_ready(&mut self.inner, cx).map_err(Into::into) } - fn call(&mut self, req: Request) -> Self::Future { + fn call(&mut self, req: Request) -> Self::Future { self.inner.call(req) } } @@ -130,17 +130,17 @@ impl fmt::Debug for Connection { } struct SendRequest { - inner: hyper::client::conn::http2::SendRequest, + inner: hyper::client::conn::http2::SendRequest, } -impl From> for SendRequest { - fn from(inner: hyper::client::conn::http2::SendRequest) -> Self { +impl From> for SendRequest { + fn from(inner: hyper::client::conn::http2::SendRequest) -> Self { Self { inner } } } -impl tower::Service> for SendRequest { - type Response = Response; +impl tower::Service> for SendRequest { + type Response = Response; type Error = crate::BoxError; type Future = BoxFuture<'static, Result>; @@ -148,10 +148,10 @@ impl tower::Service> for SendRequest { self.inner.poll_ready(cx).map_err(Into::into) } - fn call(&mut self, req: Request) -> Self::Future { + fn call(&mut self, req: Request) -> Self::Future { let fut = self.inner.send_request(req); - Box::pin(async move { fut.await.map_err(Into::into).map(|res| res.map(boxed)) }) + Box::pin(async move { fut.await.map_err(Into::into).map(|res| res.map(Body::new)) }) } } diff --git a/tonic/src/transport/mod.rs b/tonic/src/transport/mod.rs index c81208d0e..0c0168fd4 100644 --- a/tonic/src/transport/mod.rs +++ b/tonic/src/transport/mod.rs @@ -21,7 +21,6 @@ //! # #[cfg(feature = "rustls")] //! # use tonic::transport::{Channel, Certificate, ClientTlsConfig}; //! # use std::time::Duration; -//! # use tonic::body::BoxBody; //! # use tonic::client::GrpcService;; //! # use http::Request; //! # #[cfg(feature = "rustls")] @@ -49,20 +48,20 @@ //! # use std::convert::Infallible; //! # #[cfg(feature = "rustls")] //! # use tonic::transport::{Server, Identity, ServerTlsConfig}; -//! # use tonic::body::BoxBody; +//! # use tonic::body::Body; //! # use tower::Service; //! # #[cfg(feature = "rustls")] //! # async fn do_thing() -> Result<(), Box> { //! # #[derive(Clone)] //! # pub struct Svc; -//! # impl Service> for Svc { -//! # type Response = hyper::Response; +//! # impl Service> for Svc { +//! # type Response = hyper::Response; //! # type Error = Infallible; //! # type Future = std::future::Ready>; //! # fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> std::task::Poll> { //! # Ok(()).into() //! # } -//! # fn call(&mut self, _req: hyper::Request) -> Self::Future { +//! # fn call(&mut self, _req: hyper::Request) -> Self::Future { //! # unimplemented!() //! # } //! # } diff --git a/tonic/src/transport/server/incoming.rs b/tonic/src/transport/server/incoming.rs index e0751e739..8efa92070 100644 --- a/tonic/src/transport/server/incoming.rs +++ b/tonic/src/transport/server/incoming.rs @@ -176,13 +176,13 @@ impl TcpIncoming { /// ```no_run /// # use tower_service::Service; /// # use http::{request::Request, response::Response}; - /// # use tonic::{body::BoxBody, server::NamedService, transport::{Server, server::TcpIncoming}}; + /// # use tonic::{body::Body, server::NamedService, transport::{Server, server::TcpIncoming}}; /// # use core::convert::Infallible; /// # use std::error::Error; /// # fn main() { } // Cannot have type parameters, hence instead define: /// # fn run(some_service: S) -> Result<(), Box> /// # where - /// # S: Service, Response = Response, Error = Infallible> + NamedService + Clone + Send + 'static, + /// # S: Service, Response = Response, Error = Infallible> + NamedService + Clone + Send + 'static, /// # S::Future: Send + 'static, /// # { /// // Find a free port diff --git a/tonic/src/transport/server/mod.rs b/tonic/src/transport/server/mod.rs index a8b23a1ff..f54dd5bdb 100644 --- a/tonic/src/transport/server/mod.rs +++ b/tonic/src/transport/server/mod.rs @@ -38,7 +38,7 @@ use crate::transport::Error; use self::service::{RecoverError, ServerIo}; use super::service::GrpcTimeout; -use crate::body::{boxed, BoxBody}; +use crate::body::Body; use crate::server::NamedService; use bytes::Bytes; use http::{Request, Response}; @@ -68,8 +68,7 @@ use tower::{ Service, ServiceBuilder, ServiceExt, }; -type BoxService = - tower::util::BoxCloneService, Response, crate::BoxError>; +type BoxService = tower::util::BoxCloneService, Response, crate::BoxError>; type TraceInterceptor = Arc) -> tracing::Span + Send + Sync + 'static>; const DEFAULT_HTTP2_KEEPALIVE_TIMEOUT_SECS: u64 = 20; @@ -399,7 +398,7 @@ impl Server { /// route around different services. pub fn add_service(&mut self, svc: S) -> Router where - S: Service, Response = Response, Error = Infallible> + S: Service, Response = Response, Error = Infallible> + NamedService + Clone + Send @@ -420,7 +419,7 @@ impl Server { /// As a result, one cannot use this to toggle between two identically named implementations. pub fn add_optional_service(&mut self, svc: Option) -> Router where - S: Service, Response = Response, Error = Infallible> + S: Service, Response = Response, Error = Infallible> + NamedService + Clone + Send @@ -536,10 +535,9 @@ impl Server { ) -> Result<(), super::Error> where L: Layer, - L::Service: - Service, Response = Response> + Clone + Send + 'static, - <>::Service as Service>>::Future: Send + 'static, - <>::Service as Service>>::Error: + L::Service: Service, Response = Response> + Clone + Send + 'static, + <>::Service as Service>>::Future: Send + 'static, + <>::Service as Service>>::Error: Into + Send + 'static, I: Stream>, IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static, @@ -645,7 +643,7 @@ impl Server { .map_err(super::Error::from_source)?; let hyper_io = TokioIo::new(io); - let hyper_svc = TowerToHyperService::new(req_svc.map_request(|req: Request| req.map(boxed))); + let hyper_svc = TowerToHyperService::new(req_svc.map_request(|req: Request| req.map(Body::new))); serve_connection(hyper_io, hyper_svc, server.clone(), graceful.then(|| signal_rx.clone()), max_connection_age); } @@ -738,7 +736,7 @@ impl Router { /// Add a new service to this router. pub fn add_service(mut self, svc: S) -> Self where - S: Service, Response = Response, Error = Infallible> + S: Service, Response = Response, Error = Infallible> + NamedService + Clone + Send @@ -757,7 +755,7 @@ impl Router { #[allow(clippy::type_complexity)] pub fn add_optional_service(mut self, svc: Option) -> Self where - S: Service, Response = Response, Error = Infallible> + S: Service, Response = Response, Error = Infallible> + NamedService + Clone + Send @@ -784,10 +782,9 @@ impl Router { pub async fn serve(self, addr: SocketAddr) -> Result<(), super::Error> where L: Layer + Clone, - L::Service: - Service, Response = Response> + Clone + Send + 'static, - <>::Service as Service>>::Future: Send + 'static, - <>::Service as Service>>::Error: + L::Service: Service, Response = Response> + Clone + Send + 'static, + <>::Service as Service>>::Future: Send + 'static, + <>::Service as Service>>::Error: Into + Send, ResBody: http_body::Body + Send + 'static, ResBody::Error: Into, @@ -816,10 +813,9 @@ impl Router { ) -> Result<(), super::Error> where L: Layer, - L::Service: - Service, Response = Response> + Clone + Send + 'static, - <>::Service as Service>>::Future: Send + 'static, - <>::Service as Service>>::Error: + L::Service: Service, Response = Response> + Clone + Send + 'static, + <>::Service as Service>>::Future: Send + 'static, + <>::Service as Service>>::Error: Into + Send, ResBody: http_body::Body + Send + 'static, ResBody::Error: Into, @@ -847,10 +843,10 @@ impl Router { IO::ConnectInfo: Clone + Send + Sync + 'static, IE: Into, L: Layer, - L::Service: - Service, Response = Response> + Clone + Send + 'static, - <>::Service as Service>>::Future: Send + 'static, - <>::Service as Service>>::Error: + + L::Service: Service, Response = Response> + Clone + Send + 'static, + <>::Service as Service>>::Future: Send + 'static, + <>::Service as Service>>::Error: Into + Send, ResBody: http_body::Body + Send + 'static, ResBody::Error: Into, @@ -884,10 +880,9 @@ impl Router { IE: Into, F: Future, L: Layer, - L::Service: - Service, Response = Response> + Clone + Send + 'static, - <>::Service as Service>>::Future: Send + 'static, - <>::Service as Service>>::Error: + L::Service: Service, Response = Response> + Clone + Send + 'static, + <>::Service as Service>>::Future: Send + 'static, + <>::Service as Service>>::Error: Into + Send, ResBody: http_body::Body + Send + 'static, ResBody::Error: Into, @@ -918,14 +913,14 @@ struct Svc { trace_interceptor: Option, } -impl Service> for Svc +impl Service> for Svc where - S: Service, Response = Response>, + S: Service, Response = Response>, S::Error: Into, ResBody: http_body::Body + Send + 'static, ResBody::Error: Into, { - type Response = Response; + type Response = Response; type Error = crate::BoxError; type Future = SvcFuture; @@ -933,7 +928,7 @@ where self.inner.poll_ready(cx).map_err(Into::into) } - fn call(&mut self, mut req: Request) -> Self::Future { + fn call(&mut self, mut req: Request) -> Self::Future { let span = if let Some(trace_interceptor) = &self.trace_interceptor { let (parts, body) = req.into_parts(); let bodyless_request = Request::from_parts(parts, ()); @@ -969,14 +964,14 @@ where ResBody: http_body::Body + Send + 'static, ResBody::Error: Into, { - type Output = Result, crate::BoxError>; + type Output = Result, crate::BoxError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); let _guard = this.span.enter(); let response: Response = ready!(this.inner.poll(cx)).map_err(Into::into)?; - let response = response.map(|body| boxed(body.map_err(Into::into))); + let response = response.map(|body| Body::new(body.map_err(Into::into))); Poll::Ready(Ok(response)) } } @@ -999,7 +994,7 @@ struct MakeSvc { impl Service<&ServerIo> for MakeSvc where IO: Connected, - S: Service, Response = Response> + Clone + Send + 'static, + S: Service, Response = Response> + Clone + Send + 'static, S::Future: Send + 'static, S::Error: Into + Send, ResBody: http_body::Body + Send + 'static, @@ -1029,7 +1024,7 @@ where let svc = ServiceBuilder::new() .layer(BoxCloneService::layer()) - .map_request(move |mut request: Request| { + .map_request(move |mut request: Request| { match &conn_info { tower::util::Either::Left(inner) => { request.extensions_mut().insert(inner.clone());