Skip to content

Commit

Permalink
Merge pull request #73 from Finomnis/hyper_1
Browse files Browse the repository at this point in the history
Update hyper example
  • Loading branch information
Finomnis authored Nov 21, 2023
2 parents 2ec614e + 69a9a2c commit cbd0791
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 31 deletions.
10 changes: 7 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "tokio-graceful-shutdown"
authors = ["Finomnis <[email protected]>"]
version = "0.14.0"
version = "0.14.1"
edition = "2021"
rust-version = "1.63"
license = "MIT OR Apache-2.0"
Expand Down Expand Up @@ -29,7 +29,7 @@ tokio = { version = "1.32.0", default-features = false, features = [
"macros",
"time",
] }
tokio-util = { version = "0.7.8", default-features = false }
tokio-util = { version = "0.7.10", default-features = false }

pin-project-lite = "0.2.13"
thiserror = "1.0.49"
Expand All @@ -52,7 +52,11 @@ tracing-test = { version = "0.2.4", features = ["no-env-filter"] }
tokio = { version = "1.32.0", features = ["full"] }

# Hyper example
hyper = { version = "0.14.20", features = ["full"] }
hyper = { version = "1.0.1", features = ["server", "http1"] }
hyper-util = { version = "0.1.1", features = ["tokio"] }
tokio-util = { version = "0.7.10", default-features = false, features = ["rt"] }
bytes = "1.5.0"
http-body-util = "0.1.0"

# Warp example
warp = "0.3.1"
Expand Down
121 changes: 93 additions & 28 deletions examples/hyper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,45 +3,110 @@
//!
//! This example closely follows hyper's "hello" example.
//!
//! Note that we have to wait for a long time in `handle_shutdown_requests` because
//! hyper's graceful shutdown waits for all connections to be closed naturally
//! instead of terminating them.
//! Note that while we could spawn one subsystem per connection,
//! tokio-graceful-shutdown's subsystems are quite heavy.
//! So for a large amount of dynamic tasks like this, it is
//! recommended to use CancellationToken + TaskTracker instead.
use miette::{miette, Result};
use miette::{Context, IntoDiagnostic, Result};
use tokio::time::Duration;
use tokio_graceful_shutdown::{SubsystemBuilder, SubsystemHandle, Toplevel};
use tokio_graceful_shutdown::errors::CancelledByShutdown;
use tokio_graceful_shutdown::{FutureExt, SubsystemBuilder, SubsystemHandle, Toplevel};

use std::convert::Infallible;
use std::net::SocketAddr;
use std::pin::pin;

use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server};
use bytes::Bytes;
use http_body_util::Full;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{Request, Response};
use hyper_util::rt::TokioIo;
use tokio::net::TcpListener;
use tokio_util::task::TaskTracker;

async fn hello(_: Request<Body>) -> Result<Response<Body>, Infallible> {
Ok(Response::new(Body::from("Hello World!")))
// An async function that consumes a request, does nothing with it and returns a
// response.
async fn hello(_: Request<hyper::body::Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {
Ok(Response::new(Full::new(Bytes::from("Hello World!"))))
}

async fn connection_handler(
subsys: SubsystemHandle,
listener: TcpListener,
connection_tracker: TaskTracker,
) -> Result<()> {
loop {
let connection = match listener.accept().cancel_on_shutdown(&subsys).await {
Ok(connection) => connection,
Err(CancelledByShutdown) => break,
};
let (tcp, addr) = connection
.into_diagnostic()
.context("Error while waiting for connection")?;
let io = TokioIo::new(tcp);

// Spawn handler on connection tracker to give the parent subsystem
// the chance to wait for the shutdown to finish
connection_tracker.spawn({
let cancellation_token = subsys.create_cancellation_token();
async move {
tracing::info!("Connected to {} ...", addr);

let mut connection =
pin!(http1::Builder::new().serve_connection(io, service_fn(hello)));

let result = tokio::select! {
e = connection.as_mut() => e,
_ = cancellation_token.cancelled() => {
// If the system shuts down, shut down the connection
// and continue serving, as specified in the hyper docs.
tracing::info!("Shutting down connection to {} ...", addr);
connection.as_mut().graceful_shutdown();
connection.await
},
};

if let Err(err) = result {
tracing::warn!("Error serving connection: {:?}", err);
} else {
tracing::info!("Connection to {} closed.", addr);
}
}
});
}

Ok(())
}

async fn hyper_subsystem(subsys: SubsystemHandle) -> Result<()> {
// For every connection, we must make a `Service` to handle all
// incoming HTTP requests on said connection.
let make_svc = make_service_fn(|_conn| {
// This is the `Service` that will handle the connection.
// `service_fn` is a helper to convert a function that
// returns a Response into a `Service`.
async { Ok::<_, Infallible>(service_fn(hello)) }
});

let addr = ([127, 0, 0, 1], 12345).into();
let server = Server::bind(&addr).serve(make_svc);
let addr: SocketAddr = ([127, 0, 0, 1], 12345).into();

// Bind to the port and listen for incoming TCP connections
let listener = TcpListener::bind(addr)
.await
.into_diagnostic()
.context("Unable to start tcp server")?;
tracing::info!("Listening on http://{}", addr);

// This is the connection between our crate and hyper.
// Hyper already anticipated our use case and provides a very
// convenient inverface.
server
.with_graceful_shutdown(subsys.on_shutdown_requested())
.await
.map_err(|err| miette! {err})
// Use a tasktracker instead of spawning a subsystem for every connection,
// as this would result in a lot of overhead.
let connection_tracker = TaskTracker::new();

let listener = subsys.start(SubsystemBuilder::new("Hyper Listener", {
let connection_tracker = connection_tracker.clone();
move |subsys| connection_handler(subsys, listener, connection_tracker)
}));

// Make sure no more tasks can be spawned before we close the tracker
listener.join().await?;

// Wait for connections to close
connection_tracker.close();
connection_tracker.wait().await;

Ok(())
}

#[tokio::main]
Expand All @@ -56,7 +121,7 @@ async fn main() -> Result<()> {
s.start(SubsystemBuilder::new("Hyper", hyper_subsystem));
})
.catch_signals()
.handle_shutdown_requests(Duration::from_secs(60))
.handle_shutdown_requests(Duration::from_secs(5))
.await
.map_err(Into::into)
}
12 changes: 12 additions & 0 deletions src/subsystem/subsystem_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,18 @@ impl<ErrType: ErrTypeTraits> SubsystemHandle<ErrType> {
pub(crate) fn get_cancellation_token(&self) -> &CancellationToken {
&self.inner.cancellation_token
}

/// Creates a cancellation token that will get triggered once the
/// subsystem shuts down.
///
/// This is intended for more lightweight situations where
/// creating full-blown subsystems would be too much overhead,
/// like spawning connection handlers of a webserver.
///
/// For more information, see the [hyper example](https://github.com/Finomnis/tokio-graceful-shutdown/blob/main/examples/hyper.rs).
pub fn create_cancellation_token(&self) -> CancellationToken {
self.inner.cancellation_token.child_token()
}
}

impl<ErrType: ErrTypeTraits> Drop for SubsystemHandle<ErrType> {
Expand Down
48 changes: 48 additions & 0 deletions tests/integration_test_2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,3 +199,51 @@ async fn shutdown_through_signal_2() {
},
);
}

#[tokio::test]
#[traced_test]
async fn cancellation_token() {
let subsystem = |subsys: SubsystemHandle| async move {
let cancellation_token = subsys.create_cancellation_token();

assert!(!cancellation_token.is_cancelled());
subsys.on_shutdown_requested().await;
assert!(cancellation_token.is_cancelled());

BoxedResult::Ok(())
};

let toplevel = Toplevel::new(move |s| async move {
s.start(SubsystemBuilder::new("subsys", subsystem));

sleep(Duration::from_millis(100)).await;
s.request_shutdown();
});

let result = toplevel
.handle_shutdown_requests(Duration::from_millis(400))
.await;
assert!(result.is_ok());
}

#[tokio::test]
#[traced_test]
async fn cancellation_token_does_not_propagate_up() {
let subsystem = |subsys: SubsystemHandle| async move {
let cancellation_token = subsys.create_cancellation_token();

cancellation_token.cancel();
assert!(!subsys.is_shutdown_requested());

BoxedResult::Ok(())
};

let toplevel = Toplevel::new(move |s| async move {
s.start(SubsystemBuilder::new("subsys", subsystem));
});

let result = toplevel
.handle_shutdown_requests(Duration::from_millis(400))
.await;
assert!(result.is_ok());
}

0 comments on commit cbd0791

Please sign in to comment.