From 8bfdd6f6331a3760cc6a95a0f78f35f470911402 Mon Sep 17 00:00:00 2001 From: Finomnis Date: Tue, 21 Nov 2023 03:30:57 +0100 Subject: [PATCH 1/4] Rework hyper example; add `SubsystemToken::create_cancellation_token()` --- Cargo.toml | 8 +- examples/hyper.rs | 121 +++++++++++++++++++++++------- src/subsystem/subsystem_handle.rs | 12 +++ 3 files changed, 111 insertions(+), 30 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f3de681..ce20dfe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,7 +29,7 @@ tokio = { version = "1.32.0", default-features = false, features = [ "macros", "time", ] } -tokio-util = { version = "0.7.8", default-features = false } +tokio-util = { version = "0.7.10", default-features = false } pin-project-lite = "0.2.13" thiserror = "1.0.49" @@ -52,7 +52,11 @@ tracing-test = { version = "0.2.4", features = ["no-env-filter"] } tokio = { version = "1.32.0", features = ["full"] } # Hyper example -hyper = { version = "0.14.20", features = ["full"] } +hyper = { version = "1.0.1", features = ["server", "http1"] } +hyper-util = { version = "0.1.1", features = ["tokio"] } +tokio-util = { version = "0.7.10", default-features = false, features = ["rt"] } +bytes = "1.5.0" +http-body-util = "0.1.0" # Warp example warp = "0.3.1" diff --git a/examples/hyper.rs b/examples/hyper.rs index 72348d4..d6b7b5e 100644 --- a/examples/hyper.rs +++ b/examples/hyper.rs @@ -3,45 +3,110 @@ //! //! This example closely follows hyper's "hello" example. //! -//! Note that we have to wait for a long time in `handle_shutdown_requests` because -//! hyper's graceful shutdown waits for all connections to be closed naturally -//! instead of terminating them. +//! Note that while we could spawn one subsystem per connection, +//! tokio-graceful-shutdown's subsystems are quite heavy. +//! So for a large amount of dynamic tasks like this, it is +//! recommended to use CancellationToken + TaskTracker instead. -use miette::{miette, Result}; +use miette::{Context, IntoDiagnostic, Result}; use tokio::time::Duration; -use tokio_graceful_shutdown::{SubsystemBuilder, SubsystemHandle, Toplevel}; +use tokio_graceful_shutdown::errors::CancelledByShutdown; +use tokio_graceful_shutdown::{FutureExt, SubsystemBuilder, SubsystemHandle, Toplevel}; use std::convert::Infallible; +use std::net::SocketAddr; +use std::pin::pin; -use hyper::service::{make_service_fn, service_fn}; -use hyper::{Body, Request, Response, Server}; +use bytes::Bytes; +use http_body_util::Full; +use hyper::server::conn::http1; +use hyper::service::service_fn; +use hyper::{Request, Response}; +use hyper_util::rt::TokioIo; +use tokio::net::TcpListener; +use tokio_util::task::TaskTracker; -async fn hello(_: Request) -> Result, Infallible> { - Ok(Response::new(Body::from("Hello World!"))) +// An async function that consumes a request, does nothing with it and returns a +// response. +async fn hello(_: Request) -> Result>, Infallible> { + Ok(Response::new(Full::new(Bytes::from("Hello World!")))) +} + +async fn connection_handler( + subsys: SubsystemHandle, + listener: TcpListener, + connection_tracker: TaskTracker, +) -> Result<()> { + loop { + let connection = match listener.accept().cancel_on_shutdown(&subsys).await { + Ok(connection) => connection, + Err(CancelledByShutdown) => break, + }; + let (tcp, addr) = connection + .into_diagnostic() + .context("Error while waiting for connection")?; + let io = TokioIo::new(tcp); + + // Spawn handler on connection tracker to give the parent subsystem + // the chance to wait for the shutdown to finish + connection_tracker.spawn({ + let cancellation_token = subsys.create_cancellation_token(); + async move { + tracing::info!("Connected to {} ...", addr); + + let mut connection = + pin!(http1::Builder::new().serve_connection(io, service_fn(hello))); + + let result = tokio::select! { + e = connection.as_mut() => e, + _ = cancellation_token.cancelled() => { + // If the system shuts down, shut down the connection + // and continue serving, as specified in the hyper docs. + tracing::info!("Shutting down connection to {} ...", addr); + connection.as_mut().graceful_shutdown(); + connection.await + }, + }; + + if let Err(err) = result { + tracing::warn!("Error serving connection: {:?}", err); + } else { + tracing::info!("Connection to {} closed.", addr); + } + } + }); + } + + Ok(()) } async fn hyper_subsystem(subsys: SubsystemHandle) -> Result<()> { - // For every connection, we must make a `Service` to handle all - // incoming HTTP requests on said connection. - let make_svc = make_service_fn(|_conn| { - // This is the `Service` that will handle the connection. - // `service_fn` is a helper to convert a function that - // returns a Response into a `Service`. - async { Ok::<_, Infallible>(service_fn(hello)) } - }); - - let addr = ([127, 0, 0, 1], 12345).into(); - let server = Server::bind(&addr).serve(make_svc); + let addr: SocketAddr = ([127, 0, 0, 1], 12345).into(); + // Bind to the port and listen for incoming TCP connections + let listener = TcpListener::bind(addr) + .await + .into_diagnostic() + .context("Unable to start tcp server")?; tracing::info!("Listening on http://{}", addr); - // This is the connection between our crate and hyper. - // Hyper already anticipated our use case and provides a very - // convenient inverface. - server - .with_graceful_shutdown(subsys.on_shutdown_requested()) - .await - .map_err(|err| miette! {err}) + // Use a tasktracker instead of spawning a subsystem for every connection, + // as this would result in a lot of overhead. + let connection_tracker = TaskTracker::new(); + + let listener = subsys.start(SubsystemBuilder::new("Hyper Listener", { + let connection_tracker = connection_tracker.clone(); + move |subsys| connection_handler(subsys, listener, connection_tracker) + })); + + // Make sure no more tasks can be spawned before we close the tracker + listener.join().await?; + + // Wait for connections to close + connection_tracker.close(); + connection_tracker.wait().await; + + Ok(()) } #[tokio::main] @@ -56,7 +121,7 @@ async fn main() -> Result<()> { s.start(SubsystemBuilder::new("Hyper", hyper_subsystem)); }) .catch_signals() - .handle_shutdown_requests(Duration::from_secs(60)) + .handle_shutdown_requests(Duration::from_secs(5)) .await .map_err(Into::into) } diff --git a/src/subsystem/subsystem_handle.rs b/src/subsystem/subsystem_handle.rs index 0cc4c99..86b032c 100644 --- a/src/subsystem/subsystem_handle.rs +++ b/src/subsystem/subsystem_handle.rs @@ -304,6 +304,18 @@ impl SubsystemHandle { pub(crate) fn get_cancellation_token(&self) -> &CancellationToken { &self.inner.cancellation_token } + + /// Creates a cancellation token that will get triggered once the + /// subsystem shuts down. + /// + /// This is intended for more lightweight situations where + /// creating full-blown subsystems would be too much overhead, + /// like spawning connection handlers of a webserver. + /// + /// For more information, see the [hyper example](https://github.com/Finomnis/tokio-graceful-shutdown/blob/main/examples/hyper.rs). + pub fn create_cancellation_token(&self) -> CancellationToken { + self.inner.cancellation_token.child_token() + } } impl Drop for SubsystemHandle { From 4a1245e19e452ce8eaecede1c33595bd3d142408 Mon Sep 17 00:00:00 2001 From: Finomnis Date: Tue, 21 Nov 2023 03:31:32 +0100 Subject: [PATCH 2/4] Bump version to 0.14.1 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index ce20dfe..44d9b95 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "tokio-graceful-shutdown" authors = ["Finomnis "] -version = "0.14.0" +version = "0.14.1" edition = "2021" rust-version = "1.63" license = "MIT OR Apache-2.0" From a022ecd7592b3ec4de2742c6f0e6033c18851320 Mon Sep 17 00:00:00 2001 From: Finomnis Date: Tue, 21 Nov 2023 03:41:14 +0100 Subject: [PATCH 3/4] Add test for cancellation token --- tests/integration_test_2.rs | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/tests/integration_test_2.rs b/tests/integration_test_2.rs index 601f000..3059a41 100644 --- a/tests/integration_test_2.rs +++ b/tests/integration_test_2.rs @@ -199,3 +199,29 @@ async fn shutdown_through_signal_2() { }, ); } + +#[tokio::test] +#[traced_test] +async fn cancellation_token() { + let subsystem = |subsys: SubsystemHandle| async move { + let cancellation_token = subsys.create_cancellation_token(); + + assert!(!cancellation_token.is_cancelled()); + subsys.on_shutdown_requested().await; + assert!(cancellation_token.is_cancelled()); + + BoxedResult::Ok(()) + }; + + let toplevel = Toplevel::new(move |s| async move { + s.start(SubsystemBuilder::new("subsys", subsystem)); + + sleep(Duration::from_millis(100)).await; + s.request_shutdown(); + }); + + let result = toplevel + .handle_shutdown_requests(Duration::from_millis(400)) + .await; + assert!(result.is_ok()); +} From 69a9a2c0fdb77deb7d0d2cd9e577e173c44bc3be Mon Sep 17 00:00:00 2001 From: Finomnis Date: Tue, 21 Nov 2023 03:44:40 +0100 Subject: [PATCH 4/4] Add another test for cancellation token --- tests/integration_test_2.rs | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/tests/integration_test_2.rs b/tests/integration_test_2.rs index 3059a41..4d67c98 100644 --- a/tests/integration_test_2.rs +++ b/tests/integration_test_2.rs @@ -225,3 +225,25 @@ async fn cancellation_token() { .await; assert!(result.is_ok()); } + +#[tokio::test] +#[traced_test] +async fn cancellation_token_does_not_propagate_up() { + let subsystem = |subsys: SubsystemHandle| async move { + let cancellation_token = subsys.create_cancellation_token(); + + cancellation_token.cancel(); + assert!(!subsys.is_shutdown_requested()); + + BoxedResult::Ok(()) + }; + + let toplevel = Toplevel::new(move |s| async move { + s.start(SubsystemBuilder::new("subsys", subsystem)); + }); + + let result = toplevel + .handle_shutdown_requests(Duration::from_millis(400)) + .await; + assert!(result.is_ok()); +}