Skip to content

Commit

Permalink
feat(service): add adapter for tower services
Browse files Browse the repository at this point in the history
  • Loading branch information
davidpdrsn committed Nov 10, 2023
1 parent 11776bd commit b4c9203
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 0 deletions.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,7 @@ __internal_happy_eyeballs_tests = []
[[example]]
name = "client"
required-features = ["client", "http1", "tcp", "runtime"]

[[example]]
name = "tower_server"
required-features = ["server", "http1", "tcp", "runtime"]
27 changes: 27 additions & 0 deletions examples/tower_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use bytes::Bytes;
use http::{Request, Response};
use http_body_util::Full;
use hyper::body::Incoming;
use hyper_util::{rt::TokioExecutor, server::conn::auto::Builder};
use tokio::net::TcpListener;
use tower::BoxError;

async fn handle(_request: Request<Incoming>) -> Result<Response<Full<Bytes>>, BoxError> {
Ok(Response::new(Full::from("Hello, World!")))
}

#[tokio::main(flavor = "current_thread")]
async fn main() {
let tower_service = tower::service_fn(handle);

let listener = TcpListener::bind("0.0.0.0:3000").await.unwrap();
loop {
let (tcp_stream, _remote_addr) = listener.accept().await.unwrap();
if let Err(err) = Builder::new(TokioExecutor::new())
.serve_connection_with_upgrades_tower(tcp_stream, tower_service)
.await
{
eprintln!("failed to serve connection: {err:#}");
}
}
}
5 changes: 5 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,10 @@ pub mod client;
mod common;
pub mod rt;
pub mod server;
#[cfg(all(
any(feature = "http1", feature = "http2"),
any(feature = "server", feature = "client")
))]
pub mod service;

mod error;
75 changes: 75 additions & 0 deletions src/server/conn/auto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,40 @@ impl<E> Builder<E> {
Ok(())
}

/// Bind a connection together with a [`tower_service::Service`].
#[cfg(all(
any(feature = "http1", feature = "http2"),
any(feature = "server", feature = "client")
))]
pub async fn serve_connection_tower<I, S, B>(&self, io: I, service: S) -> Result<()>
where
S: tower_service::Service<Request<Incoming>, Response = Response<B>> + Clone + Send,
S::Future: 'static,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
B: Body + Send + 'static,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
I: AsyncRead + AsyncWrite + Unpin + 'static,
E: Http2ConnExec<crate::service::TowerToHyperServiceFuture<S, Request<Incoming>>, B>,
{
let (version, io) = read_version(io).await?;
let io = TokioIo::new(io);
match version {
Version::H1 => {
self.http1
.serve_connection(io, crate::service::TowerToHyperService::new(service))
.await?
}
Version::H2 => {
self.http2
.serve_connection(io, crate::service::TowerToHyperService::new(service))
.await?
}
}

Ok(())
}

/// Bind a connection together with a [`Service`], with the ability to
/// handle HTTP upgrades. This requires that the IO object implements
/// `Send`.
Expand Down Expand Up @@ -115,6 +149,47 @@ impl<E> Builder<E> {

Ok(())
}

/// Bind a connection together with a [`tower_service::Service`], with the ability to
/// handle HTTP upgrades. This requires that the IO object implements
/// `Send`.
#[cfg(all(
any(feature = "http1", feature = "http2"),
any(feature = "server", feature = "client")
))]
pub async fn serve_connection_with_upgrades_tower<I, S, B>(
&self,
io: I,
service: S,
) -> Result<()>
where
S: tower_service::Service<Request<Incoming>, Response = Response<B>> + Clone + Send,
S::Future: 'static,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
B: Body + Send + 'static,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
E: Http2ConnExec<crate::service::TowerToHyperServiceFuture<S, Request<Incoming>>, B>,
{
let (version, io) = read_version(io).await?;
let io = TokioIo::new(io);
match version {
Version::H1 => {
self.http1
.serve_connection(io, crate::service::TowerToHyperService::new(service))
.with_upgrades()
.await?
}
Version::H2 => {
self.http2
.serve_connection(io, crate::service::TowerToHyperService::new(service))
.await?
}
}

Ok(())
}
}
#[derive(Copy, Clone)]
enum Version {
Expand Down
62 changes: 62 additions & 0 deletions src/service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
//! Service utilities.

use pin_project_lite::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tower::{util::Oneshot, ServiceExt};

/// A tower service converted into a hyper service.
#[derive(Debug, Copy, Clone)]
pub struct TowerToHyperService<S> {
service: S,
}

impl<S> TowerToHyperService<S> {
/// Create a new `TowerToHyperService` from a tower service.
pub fn new(tower_service: S) -> Self {
Self {
service: tower_service,
}
}
}

impl<S, R> hyper::service::Service<R> for TowerToHyperService<S>
where
S: tower_service::Service<R> + Clone,
{
type Response = S::Response;
type Error = S::Error;
type Future = TowerToHyperServiceFuture<S, R>;

fn call(&self, req: R) -> Self::Future {
TowerToHyperServiceFuture {
future: self.service.clone().oneshot(req),
}
}
}

pin_project! {
/// Response future for [`TowerToHyperService`].
pub struct TowerToHyperServiceFuture<S, R>
where
S: tower_service::Service<R>,
{
#[pin]
future: Oneshot<S, R>,
}
}

impl<S, R> Future for TowerToHyperServiceFuture<S, R>
where
S: tower_service::Service<R>,
{
type Output = Result<S::Response, S::Error>;

#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().future.poll(cx)
}
}

0 comments on commit b4c9203

Please sign in to comment.