From b4c9203fdedc78e0f1cf6cc62a9a36d1ae972fe4 Mon Sep 17 00:00:00 2001 From: David Pedersen Date: Fri, 10 Nov 2023 13:47:20 +0100 Subject: [PATCH] feat(service): add adapter for tower services --- Cargo.toml | 4 +++ examples/tower_server.rs | 27 +++++++++++++++ src/lib.rs | 5 +++ src/server/conn/auto.rs | 75 ++++++++++++++++++++++++++++++++++++++++ src/service.rs | 62 +++++++++++++++++++++++++++++++++ 5 files changed, 173 insertions(+) create mode 100644 examples/tower_server.rs create mode 100644 src/service.rs diff --git a/Cargo.toml b/Cargo.toml index e5da4ee..f19e95e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,3 +60,7 @@ __internal_happy_eyeballs_tests = [] [[example]] name = "client" required-features = ["client", "http1", "tcp", "runtime"] + +[[example]] +name = "tower_server" +required-features = ["server", "http1", "tcp", "runtime"] diff --git a/examples/tower_server.rs b/examples/tower_server.rs new file mode 100644 index 0000000..3caf26d --- /dev/null +++ b/examples/tower_server.rs @@ -0,0 +1,27 @@ +use bytes::Bytes; +use http::{Request, Response}; +use http_body_util::Full; +use hyper::body::Incoming; +use hyper_util::{rt::TokioExecutor, server::conn::auto::Builder}; +use tokio::net::TcpListener; +use tower::BoxError; + +async fn handle(_request: Request) -> Result>, BoxError> { + Ok(Response::new(Full::from("Hello, World!"))) +} + +#[tokio::main(flavor = "current_thread")] +async fn main() { + let tower_service = tower::service_fn(handle); + + let listener = TcpListener::bind("0.0.0.0:3000").await.unwrap(); + loop { + let (tcp_stream, _remote_addr) = listener.accept().await.unwrap(); + if let Err(err) = Builder::new(TokioExecutor::new()) + .serve_connection_with_upgrades_tower(tcp_stream, tower_service) + .await + { + eprintln!("failed to serve connection: {err:#}"); + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 85f3d3b..2344c7a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,5 +8,10 @@ pub mod client; mod common; pub mod rt; pub mod server; +#[cfg(all( + any(feature = "http1", feature = "http2"), + any(feature = "server", feature = "client") +))] +pub mod service; mod error; diff --git a/src/server/conn/auto.rs b/src/server/conn/auto.rs index f10450e..0a013c2 100644 --- a/src/server/conn/auto.rs +++ b/src/server/conn/auto.rs @@ -87,6 +87,40 @@ impl Builder { Ok(()) } + /// Bind a connection together with a [`tower_service::Service`]. + #[cfg(all( + any(feature = "http1", feature = "http2"), + any(feature = "server", feature = "client") + ))] + pub async fn serve_connection_tower(&self, io: I, service: S) -> Result<()> + where + S: tower_service::Service, Response = Response> + Clone + Send, + S::Future: 'static, + S::Error: Into>, + B: Body + Send + 'static, + B::Data: Send, + B::Error: Into>, + I: AsyncRead + AsyncWrite + Unpin + 'static, + E: Http2ConnExec>, B>, + { + let (version, io) = read_version(io).await?; + let io = TokioIo::new(io); + match version { + Version::H1 => { + self.http1 + .serve_connection(io, crate::service::TowerToHyperService::new(service)) + .await? + } + Version::H2 => { + self.http2 + .serve_connection(io, crate::service::TowerToHyperService::new(service)) + .await? + } + } + + Ok(()) + } + /// Bind a connection together with a [`Service`], with the ability to /// handle HTTP upgrades. This requires that the IO object implements /// `Send`. @@ -115,6 +149,47 @@ impl Builder { Ok(()) } + + /// Bind a connection together with a [`tower_service::Service`], with the ability to + /// handle HTTP upgrades. This requires that the IO object implements + /// `Send`. + #[cfg(all( + any(feature = "http1", feature = "http2"), + any(feature = "server", feature = "client") + ))] + pub async fn serve_connection_with_upgrades_tower( + &self, + io: I, + service: S, + ) -> Result<()> + where + S: tower_service::Service, Response = Response> + Clone + Send, + S::Future: 'static, + S::Error: Into>, + B: Body + Send + 'static, + B::Data: Send, + B::Error: Into>, + I: AsyncRead + AsyncWrite + Unpin + Send + 'static, + E: Http2ConnExec>, B>, + { + let (version, io) = read_version(io).await?; + let io = TokioIo::new(io); + match version { + Version::H1 => { + self.http1 + .serve_connection(io, crate::service::TowerToHyperService::new(service)) + .with_upgrades() + .await? + } + Version::H2 => { + self.http2 + .serve_connection(io, crate::service::TowerToHyperService::new(service)) + .await? + } + } + + Ok(()) + } } #[derive(Copy, Clone)] enum Version { diff --git a/src/service.rs b/src/service.rs new file mode 100644 index 0000000..4652a6a --- /dev/null +++ b/src/service.rs @@ -0,0 +1,62 @@ +//! Service utilities. + +use pin_project_lite::pin_project; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; +use tower::{util::Oneshot, ServiceExt}; + +/// A tower service converted into a hyper service. +#[derive(Debug, Copy, Clone)] +pub struct TowerToHyperService { + service: S, +} + +impl TowerToHyperService { + /// Create a new `TowerToHyperService` from a tower service. + pub fn new(tower_service: S) -> Self { + Self { + service: tower_service, + } + } +} + +impl hyper::service::Service for TowerToHyperService +where + S: tower_service::Service + Clone, +{ + type Response = S::Response; + type Error = S::Error; + type Future = TowerToHyperServiceFuture; + + fn call(&self, req: R) -> Self::Future { + TowerToHyperServiceFuture { + future: self.service.clone().oneshot(req), + } + } +} + +pin_project! { + /// Response future for [`TowerToHyperService`]. + pub struct TowerToHyperServiceFuture + where + S: tower_service::Service, + { + #[pin] + future: Oneshot, + } +} + +impl Future for TowerToHyperServiceFuture +where + S: tower_service::Service, +{ + type Output = Result; + + #[inline] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.project().future.poll(cx) + } +}