From 9e6336318725bfd26ebdc6cfe009835db87d3476 Mon Sep 17 00:00:00 2001 From: David Pedersen Date: Fri, 10 Nov 2023 13:47:20 +0100 Subject: [PATCH] feat(service): add adapter for tower services --- examples/tower_server.rs | 27 ++++++++++++++++ src/lib.rs | 1 + src/server/conn/auto.rs | 68 ++++++++++++++++++++++++++++++++++++++++ src/service.rs | 62 ++++++++++++++++++++++++++++++++++++ 4 files changed, 158 insertions(+) create mode 100644 examples/tower_server.rs create mode 100644 src/service.rs diff --git a/examples/tower_server.rs b/examples/tower_server.rs new file mode 100644 index 0000000..3caf26d --- /dev/null +++ b/examples/tower_server.rs @@ -0,0 +1,27 @@ +use bytes::Bytes; +use http::{Request, Response}; +use http_body_util::Full; +use hyper::body::Incoming; +use hyper_util::{rt::TokioExecutor, server::conn::auto::Builder}; +use tokio::net::TcpListener; +use tower::BoxError; + +async fn handle(_request: Request) -> Result>, BoxError> { + Ok(Response::new(Full::from("Hello, World!"))) +} + +#[tokio::main(flavor = "current_thread")] +async fn main() { + let tower_service = tower::service_fn(handle); + + let listener = TcpListener::bind("0.0.0.0:3000").await.unwrap(); + loop { + let (tcp_stream, _remote_addr) = listener.accept().await.unwrap(); + if let Err(err) = Builder::new(TokioExecutor::new()) + .serve_connection_with_upgrades_tower(tcp_stream, tower_service) + .await + { + eprintln!("failed to serve connection: {err:#}"); + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 85f3d3b..54b5101 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,5 +8,6 @@ pub mod client; mod common; pub mod rt; pub mod server; +pub mod service; mod error; diff --git a/src/server/conn/auto.rs b/src/server/conn/auto.rs index f10450e..63b1199 100644 --- a/src/server/conn/auto.rs +++ b/src/server/conn/auto.rs @@ -20,6 +20,7 @@ use hyper::{ use pin_project_lite::pin_project; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +use crate::service::{TowerToHyperService, TowerToHyperServiceFuture}; use crate::{common::rewind::Rewind, rt::TokioIo}; type Result = std::result::Result>; @@ -87,6 +88,36 @@ impl Builder { Ok(()) } + /// Bind a connection together with a [`tower::Service`]. + pub async fn serve_connection_tower(&self, io: I, service: S) -> Result<()> + where + S: tower::Service, Response = Response> + Clone + Send, + S::Future: 'static, + S::Error: Into>, + B: Body + Send + 'static, + B::Data: Send, + B::Error: Into>, + I: AsyncRead + AsyncWrite + Unpin + 'static, + E: Http2ConnExec>, B>, + { + let (version, io) = read_version(io).await?; + let io = TokioIo::new(io); + match version { + Version::H1 => { + self.http1 + .serve_connection(io, TowerToHyperService::new(service)) + .await? + } + Version::H2 => { + self.http2 + .serve_connection(io, TowerToHyperService::new(service)) + .await? + } + } + + Ok(()) + } + /// Bind a connection together with a [`Service`], with the ability to /// handle HTTP upgrades. This requires that the IO object implements /// `Send`. @@ -115,6 +146,43 @@ impl Builder { Ok(()) } + + /// Bind a connection together with a [`tower::Service`], with the ability to + /// handle HTTP upgrades. This requires that the IO object implements + /// `Send`. + pub async fn serve_connection_with_upgrades_tower( + &self, + io: I, + service: S, + ) -> Result<()> + where + S: tower::Service, Response = Response> + Clone + Send, + S::Future: 'static, + S::Error: Into>, + B: Body + Send + 'static, + B::Data: Send, + B::Error: Into>, + I: AsyncRead + AsyncWrite + Unpin + Send + 'static, + E: Http2ConnExec>, B>, + { + let (version, io) = read_version(io).await?; + let io = TokioIo::new(io); + match version { + Version::H1 => { + self.http1 + .serve_connection(io, TowerToHyperService::new(service)) + .with_upgrades() + .await? + } + Version::H2 => { + self.http2 + .serve_connection(io, TowerToHyperService::new(service)) + .await? + } + } + + Ok(()) + } } #[derive(Copy, Clone)] enum Version { diff --git a/src/service.rs b/src/service.rs new file mode 100644 index 0000000..5b61b3c --- /dev/null +++ b/src/service.rs @@ -0,0 +1,62 @@ +//! Service utilities. + +use pin_project_lite::pin_project; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; +use tower::{util::Oneshot, ServiceExt}; + +/// A tower service converted into a hyper service. +#[derive(Debug, Copy, Clone)] +pub struct TowerToHyperService { + service: S, +} + +impl TowerToHyperService { + /// Create a new `TowerToHyperService` from a tower service. + pub fn new(tower_service: S) -> Self { + Self { + service: tower_service, + } + } +} + +impl hyper::service::Service for TowerToHyperService +where + S: tower::Service + Clone, +{ + type Response = S::Response; + type Error = S::Error; + type Future = TowerToHyperServiceFuture; + + fn call(&self, req: R) -> Self::Future { + TowerToHyperServiceFuture { + future: self.service.clone().oneshot(req), + } + } +} + +pin_project! { + /// Response future for [`TowerToHyperService`]. + pub struct TowerToHyperServiceFuture + where + S: tower::Service, + { + #[pin] + future: Oneshot, + } +} + +impl Future for TowerToHyperServiceFuture +where + S: tower::Service, +{ + type Output = Result; + + #[inline] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.project().future.poll(cx) + } +}