diff --git a/Cargo.toml b/Cargo.toml index dbe911a..f822778 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,9 @@ exclude = [ "/UPCOMING_VERSION_CHANGES.txt", ] +[features] +tokio-unstable = ["tokio/tracing"] + [dependencies] tracing = { version = "0.1.37", default-features = false } @@ -67,6 +70,9 @@ headers = ">= 0.3.5" # Required to fix minimal-versions serde_urlencoded = ">= 0.7.1" # Required to fix minimal-versions unicode-linebreak = ">= 0.1.5" # Required to fix minimal-versions +# tokio-console +console-subscriber = "0.2.0" + # For testing unix signals [target.'cfg(unix)'.dev-dependencies] nix = { version = "0.29.0", default-features = false, features = ["signal"] } diff --git a/examples/tokio-console.rs b/examples/tokio-console.rs new file mode 100644 index 0000000..563f992 --- /dev/null +++ b/examples/tokio-console.rs @@ -0,0 +1,60 @@ +//! This example demonstrates how to use the tokio-console application for tracing tokio tasks's +//! runtime behaviour. Subsystems will appear under their registration names. +//! +//! To make this work, +//! +//! * Compile `tokio-graceful-shutdown` with the `tokio-unstable` feature to register subsystem +//! task names. +//! +//! * Run this example with the environment variable: +//! +//! ``` +//! RUSTFLAGS=="--cfg tokio_unstable" +//! ``` +//! +//! * Run the `tokio-console` CLI application and watch your snappy low-latency tasks + +use miette::Result; +use tokio::time::{sleep, Duration}; +use tokio_graceful_shutdown::{SubsystemBuilder, SubsystemHandle, Toplevel}; +use tracing_subscriber::prelude::*; + +async fn subsys1(subsys: SubsystemHandle) -> Result<()> { + subsys.start(SubsystemBuilder::new("Subsys2", subsys2)); + tracing::info!("Subsystem1 started."); + subsys.on_shutdown_requested().await; + tracing::info!("Shutting down Subsystem1 ..."); + sleep(Duration::from_millis(500)).await; + tracing::info!("Subsystem1 stopped."); + Ok(()) +} + +async fn subsys2(subsys: SubsystemHandle) -> Result<()> { + tracing::info!("Subsystem2 started."); + subsys.on_shutdown_requested().await; + tracing::info!("Shutting down Subsystem2 ..."); + sleep(Duration::from_millis(500)).await; + tracing::info!("Subsystem2 stopped."); + Ok(()) +} + +#[tokio::main] +async fn main() -> Result<()> { + let console_layer = console_subscriber::spawn(); + // Init logging + tracing_subscriber::registry() + .with(console_layer) + .with(tracing_subscriber::fmt::layer()) + .init(); + + // Setup and execute subsystem tree + Toplevel::new(|s| async move { + s.start(SubsystemBuilder::new("Subsys1", subsys1)); + s.start(SubsystemBuilder::new("Subsys2", subsys2)); + s.start(SubsystemBuilder::new("Subsys3", subsys1)); + }) + .catch_signals() + .handle_shutdown_requests(Duration::from_millis(1000)) + .await + .map_err(Into::into) +} diff --git a/src/runner.rs b/src/runner.rs index 837465c..7b66628 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -8,6 +8,8 @@ use std::{future::Future, sync::Arc}; +use tokio::task::AbortHandle; + use crate::{ errors::{SubsystemError, SubsystemFailure}, ErrTypeTraits, SubsystemHandle, @@ -32,12 +34,36 @@ impl SubsystemRunner { Fut: 'static + Future> + Send, Err: Into, { - let future = async { run_subsystem(name, subsystem, subsystem_handle, guard).await }; - let aborthandle = tokio::spawn(future).abort_handle(); + let future = { + let name = name.clone(); + async move { run_subsystem(name, subsystem, subsystem_handle, guard).await } + }; + + let aborthandle = spawn(future, name); SubsystemRunner { aborthandle } } } +#[cfg(not(feature = "tokio-unstable"))] +fn spawn(f: F, _name: Arc) -> AbortHandle +where + ::Output: Send, +{ + tokio::spawn(f).abort_handle() +} + +#[cfg(feature = "tokio-unstable")] +fn spawn(f: F, name: Arc) -> AbortHandle +where + ::Output: Send, +{ + tokio::task::Builder::new() + .name(&name) + .spawn(f) + .expect("spawning a task does not fail") + .abort_handle() +} + impl Drop for SubsystemRunner { fn drop(&mut self) { self.aborthandle.abort()