Skip to content

Commit

Permalink
Add tokio taskname registration for use in tokio-console
Browse files Browse the repository at this point in the history
This uses the `tokio-unstable` feature that has to be enabled and also
the runtime variable `RUSTFLAGS="--cfg tokio_unstable"` must be set to
enable tokio taskname registration.

See example program `tokio-console.rs` for usage.
  • Loading branch information
hirschenberger committed Oct 29, 2024
1 parent 689208e commit a43abad
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 2 deletions.
6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ exclude = [
"/UPCOMING_VERSION_CHANGES.txt",
]

[features]
tokio-unstable = ["tokio/tracing"]

[dependencies]
tracing = { version = "0.1.37", default-features = false }

Expand Down Expand Up @@ -67,6 +70,9 @@ headers = ">= 0.3.5" # Required to fix minimal-versions
serde_urlencoded = ">= 0.7.1" # Required to fix minimal-versions
unicode-linebreak = ">= 0.1.5" # Required to fix minimal-versions

# tokio-console
console-subscriber = "0.2.0"

# For testing unix signals
[target.'cfg(unix)'.dev-dependencies]
nix = { version = "0.29.0", default-features = false, features = ["signal"] }
Expand Down
60 changes: 60 additions & 0 deletions examples/tokio-console.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
//! This example demonstrates how to use the tokio-console application for tracing tokio tasks's
//! runtime behaviour. Subsystems will appear under their registration names.
//!
//! To make this work,
//!
//! * Compile `tokio-graceful-shutdown` with the `tokio-unstable` feature to register subsystem
//! task names.
//!
//! * Run this example with the environment variable:
//!
//! ```
//! RUSTFLAGS=="--cfg tokio_unstable"
//! ```
//!
//! * Run the `tokio-console` CLI application and watch your snappy low-latency tasks
use miette::Result;
use tokio::time::{sleep, Duration};
use tokio_graceful_shutdown::{SubsystemBuilder, SubsystemHandle, Toplevel};
use tracing_subscriber::prelude::*;

async fn subsys1(subsys: SubsystemHandle) -> Result<()> {
subsys.start(SubsystemBuilder::new("Subsys2", subsys2));
tracing::info!("Subsystem1 started.");
subsys.on_shutdown_requested().await;
tracing::info!("Shutting down Subsystem1 ...");
sleep(Duration::from_millis(500)).await;
tracing::info!("Subsystem1 stopped.");
Ok(())
}

async fn subsys2(subsys: SubsystemHandle) -> Result<()> {
tracing::info!("Subsystem2 started.");
subsys.on_shutdown_requested().await;
tracing::info!("Shutting down Subsystem2 ...");
sleep(Duration::from_millis(500)).await;
tracing::info!("Subsystem2 stopped.");
Ok(())
}

#[tokio::main]
async fn main() -> Result<()> {
let console_layer = console_subscriber::spawn();
// Init logging
tracing_subscriber::registry()
.with(console_layer)
.with(tracing_subscriber::fmt::layer())
.init();

// Setup and execute subsystem tree
Toplevel::new(|s| async move {
s.start(SubsystemBuilder::new("Subsys1", subsys1));
s.start(SubsystemBuilder::new("Subsys2", subsys2));
s.start(SubsystemBuilder::new("Subsys3", subsys1));
})
.catch_signals()
.handle_shutdown_requests(Duration::from_millis(1000))
.await
.map_err(Into::into)
}
30 changes: 28 additions & 2 deletions src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
use std::{future::Future, sync::Arc};

use tokio::task::AbortHandle;

use crate::{
errors::{SubsystemError, SubsystemFailure},
ErrTypeTraits, SubsystemHandle,
Expand All @@ -32,12 +34,36 @@ impl SubsystemRunner {
Fut: 'static + Future<Output = Result<(), Err>> + Send,
Err: Into<ErrType>,
{
let future = async { run_subsystem(name, subsystem, subsystem_handle, guard).await };
let aborthandle = tokio::spawn(future).abort_handle();
let future = {
let name = name.clone();
async move { run_subsystem(name, subsystem, subsystem_handle, guard).await }
};

let aborthandle = spawn(future, name);
SubsystemRunner { aborthandle }
}
}

#[cfg(not(feature = "tokio-unstable"))]
fn spawn<F: Future + Send + 'static>(f: F, _name: Arc<str>) -> AbortHandle
where
<F as Future>::Output: Send,
{
tokio::spawn(f).abort_handle()
}

#[cfg(feature = "tokio-unstable")]
fn spawn<F: Future + Send + 'static>(f: F, name: Arc<str>) -> AbortHandle
where
<F as Future>::Output: Send,
{
tokio::task::Builder::new()

Check failure

Code scanning / clippy

failed to resolve: could not find Builder in task Error

failed to resolve: could not find Builder in task
.name(&name)
.spawn(f)
.expect("spawning a task does not fail")
.abort_handle()
}

impl Drop for SubsystemRunner {
fn drop(&mut self) {
self.aborthandle.abort()
Expand Down

0 comments on commit a43abad

Please sign in to comment.