Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tokio taskname registration for use in tokio-console #89

Merged
merged 19 commits into from
Nov 24, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ jobs:
uses: taiki-e/install-action@cross

- name: Build
run: cross build --all-features --release --target=${{ matrix.target }}
run: cross build --release --target=${{ matrix.target }}

build-examples:
name: Build Examples
runs-on: ubuntu-latest
needs: [lints, docs]
env:
RUSTFLAGS: "-D warnings"
RUSTFLAGS: "-D warnings --cfg tokio_unstable"
steps:
- name: Checkout sources
uses: actions/checkout@v4
Expand Down Expand Up @@ -165,7 +165,7 @@ jobs:
run: cargo fmt --all -- --check

- name: Run cargo clippy
run: cargo clippy --all-features --all-targets -- -D warnings
run: cargo clippy --all-targets -- -D warnings

docs:
name: Documentation
Expand Down
3 changes: 1 addition & 2 deletions .github/workflows/coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@ jobs:
uses: actions/checkout@v4
- name: Install llvm-cov
uses: taiki-e/install-action@cargo-llvm-cov
#- uses: Swatinem/rust-cache@v1
- name: Compute Coverage
run:
cargo llvm-cov --all-features --workspace --ignore-filename-regex tests.rs --codecov --output-path codecov.json
cargo llvm-cov --workspace --ignore-filename-regex tests.rs --codecov --output-path codecov.json
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v4
env:
Expand Down
1 change: 0 additions & 1 deletion .github/workflows/rust-clippy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ jobs:
- name: Run rust-clippy
run:
cargo clippy
--all-features
--all-targets
--message-format=json | clippy-sarif | tee rust-clippy-results.sarif | sarif-fmt
continue-on-error: true
Expand Down
10 changes: 10 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ exclude = [
"/UPCOMING_VERSION_CHANGES.txt",
]

[features]
tokio-unstable = ["tokio/tracing"]

[dependencies]
tracing = { version = "0.1.37", default-features = false }

Expand Down Expand Up @@ -67,10 +70,17 @@ headers = ">= 0.3.5" # Required to fix minimal-versions
serde_urlencoded = ">= 0.7.1" # Required to fix minimal-versions
unicode-linebreak = ">= 0.1.5" # Required to fix minimal-versions

# tokio-console
console-subscriber = "0.2.0"

# For testing unix signals
[target.'cfg(unix)'.dev-dependencies]
nix = { version = "0.29.0", default-features = false, features = ["signal"] }

# Make leak sanitizer more reliable
[profile.dev]
opt-level = 1

# Define `tokio_unstable` config for linter
[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }
61 changes: 61 additions & 0 deletions examples/tokio_console.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
//! This example demonstrates how to use the tokio-console application for tracing tokio tasks's
//! runtime behaviour. Subsystems will appear under their registration names.
//!
//! Run this example with:
//!
//! ```
//! RUSTFLAGS="--cfg tokio_unstable" cargo run --features "tokio-unstable" --example tokio_console
//! ```
//!
//! Then, open the `tokio-console` application (see https://crates.io/crates/tokio-console) to
//! follow the subsystem tasks live.
xxx
Fixed Show fixed Hide fixed
use miette::Result;
Fixed Show fixed Hide fixed
use tokio::time::{sleep, Duration};
use tokio_graceful_shutdown::{FutureExt, SubsystemBuilder, SubsystemHandle, Toplevel};
use tracing_subscriber::prelude::*;

async fn subsys(subsys: SubsystemHandle) -> Result<()> {
tracing::info!("Parent started.");

let mut iteration = 0;
while !subsys.is_shutdown_requested() {
subsys.start(SubsystemBuilder::new(format!("child{iteration}"), child));
iteration += 1;

sleep(Duration::from_millis(1000))
.cancel_on_shutdown(&subsys)
.await
.ok();
}

tracing::info!("Parent stopped.");
Ok(())
}

async fn child(subsys: SubsystemHandle) -> Result<()> {
sleep(Duration::from_millis(3000))
.cancel_on_shutdown(&subsys)
.await
.ok();
Ok(())
}

#[tokio::main]
async fn main() -> Result<()> {
let console_layer = console_subscriber::spawn();
// Init logging
tracing_subscriber::registry()
.with(console_layer)
.with(tracing_subscriber::fmt::layer().compact())
.init();

// Setup and execute subsystem tree
Toplevel::new(|s| async move {
s.start(SubsystemBuilder::new("parent", subsys));
})
.catch_signals()
.handle_shutdown_requests(Duration::from_millis(1000))
.await
.map_err(Into::into)
}
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,6 @@ pub use subsystem::SubsystemBuilder;
pub use subsystem::SubsystemFinishedFuture;
pub use subsystem::SubsystemHandle;
pub use toplevel::Toplevel;

mod tokio_unstable;
use tokio_unstable::spawn;
4 changes: 2 additions & 2 deletions src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl SubsystemRunner {
Err: Into<ErrType>,
{
let future = async { run_subsystem(name, subsystem, subsystem_handle, guard).await };
let aborthandle = tokio::spawn(future).abort_handle();
let aborthandle = crate::spawn(future, "subsystem_runner").abort_handle();
SubsystemRunner { aborthandle }
}
}
Expand All @@ -57,7 +57,7 @@ async fn run_subsystem<Fut, Subsys, ErrType: ErrTypeTraits, Err>(
let mut redirected_subsystem_handle = subsystem_handle.delayed_clone();

let future = async { subsystem(subsystem_handle).await.map_err(|e| e.into()) };
let join_handle = tokio::spawn(future);
let join_handle = crate::spawn(future, &name);

// Abort on drop
guard.on_cancel({
Expand Down
21 changes: 21 additions & 0 deletions src/tokio_unstable.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use std::future::Future;
use tokio::task::JoinHandle;

#[cfg(not(all(tokio_unstable, feature = "tokio-unstable")))]
pub(crate) fn spawn<F: Future + Send + 'static>(f: F, _name: &str) -> JoinHandle<F::Output>
where
<F as Future>::Output: Send + 'static,
{
tokio::spawn(f)
}

#[cfg(all(tokio_unstable, feature = "tokio-unstable"))]
pub(crate) fn spawn<F: Future + Send + 'static>(f: F, name: &str) -> JoinHandle<F::Output>
where
<F as Future>::Output: Send + 'static,
{
tokio::task::Builder::new()
.name(name)
.spawn(f)
.expect("a task should be spawned")
}
11 changes: 7 additions & 4 deletions src/toplevel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,13 @@ impl<ErrType: ErrTypeTraits> Toplevel<ErrType> {
pub fn catch_signals(self) -> Self {
let shutdown_token = self.root_handle.get_cancellation_token().clone();

tokio::spawn(async move {
wait_for_signal().await;
shutdown_token.cancel();
});
crate::spawn(
async move {
wait_for_signal().await;
shutdown_token.cancel();
},
"catch_signals",
);

self
}
Expand Down
Loading