Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Full rewrite #64

Merged
merged 27 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
b2fc26c
Migrate from https://github.com/Finomnis/tokio-graceful-shutdown-rewrite
Finomnis Oct 12, 2023
73c92b3
Fix clippy lint; add licenses
Finomnis Oct 12, 2023
a0fead6
Update Cargo.toml; introduce MSRV
Finomnis Oct 12, 2023
3910aaa
Update README
Finomnis Oct 12, 2023
90be21d
Attempt to fix CI
Finomnis Oct 12, 2023
9104d11
Attempt to fix MSRV and docs
Finomnis Oct 12, 2023
54a0a16
Beautification
Finomnis Oct 12, 2023
768cb45
Bump MSRV to 1.72, because of mpsc::Sender's Sync impl
Finomnis Oct 12, 2023
e35490f
Replace std::mpsc with tokio::mpsc to reduce MSRV down to 1.63
Finomnis Oct 13, 2023
bc02f02
Attempt to fix doc tests
Finomnis Oct 13, 2023
e78b117
Attempt to fix ASAN error
Finomnis Oct 13, 2023
4403b14
Fix security audit
Finomnis Oct 13, 2023
1010d07
Add documentation
Finomnis Oct 13, 2023
4baaf7f
Fix doc tests
Finomnis Oct 13, 2023
8836224
Fix race condition in error collection; add another warning for dropp…
Finomnis Oct 13, 2023
1aa7772
Remove obsolete runner tests
Finomnis Oct 18, 2023
ecc1695
Add integration tests
Finomnis Oct 18, 2023
d654c2f
Add cancel_on_shutdown test
Finomnis Oct 18, 2023
838c9c4
Fix unix signal test
Finomnis Oct 18, 2023
6204777
Fix warning
Finomnis Oct 18, 2023
26ef280
Attempt to fix error_action converage
Finomnis Oct 18, 2023
9e65603
Another attempt to fix error_action coverage
Finomnis Oct 18, 2023
0c5e380
Remove nested_toplevel tests
Finomnis Oct 19, 2023
36f802a
Remove temprorary notes
Finomnis Oct 19, 2023
286d339
Re-add TODO.txt file
Finomnis Oct 19, 2023
2248ee8
Fix comment
Finomnis Oct 19, 2023
cfbc130
Fix docs
Finomnis Oct 20, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/audit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions-rs/audit-check@v1
- uses: rustsec/audit-check@v1.4.1
with:
token: ${{ secrets.GITHUB_TOKEN }}
19 changes: 18 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,23 @@ jobs:
- name: Run cargo test
run: cargo test -- --test-threads 1

msrv:
name: Minimum Supported Rust Version
runs-on: ubuntu-latest
env:
RUSTFLAGS: "-D warnings"
steps:
- name: Checkout sources
uses: actions/checkout@v3

- name: Install MSRV toolchain
uses: dtolnay/[email protected]

#- uses: Swatinem/rust-cache@v1

- name: Run cargo build
run: cargo build

lints:
name: Lints
runs-on: ubuntu-latest
Expand Down Expand Up @@ -115,7 +132,7 @@ jobs:
runs-on: ubuntu-latest
environment: production
if: github.event_name == 'release'
needs: [build, test, lints, docs, leaks]
needs: [build, test, msrv, lints, docs, leaks]
steps:
- name: Checkout sources
uses: actions/checkout@v3
Expand Down
37 changes: 17 additions & 20 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
[package]
name = "tokio-graceful-shutdown"
authors = ["Finomnis <[email protected]>"]
version = "0.13.0"
edition = "2018"
version = "0.14.0"
edition = "2021"
rust-version = "1.63"
license = "MIT OR Apache-2.0"
readme = "README.md"
repository = "https://github.com/Finomnis/tokio-graceful-shutdown"
Expand All @@ -20,39 +21,35 @@ exclude = [
]

[dependencies]
# Error definitions
thiserror = "1.0.32"
miette = "5.3.0"
tracing = { version = "0.1.37", default-features = false }

# For async utilities
tokio = { version = "1.20.1", default-features = false, features = [
tokio = { version = "1.32.0", default-features = false, features = [
"signal",
"rt",
"macros",
"time",
] }
tokio-util = { version = "0.7.2", default-features = false }
futures = "0.3.23"
async-recursion = "1.0.0"
pin-project-lite = "0.2.9"
tokio-util = { version = "0.7.8", default-features = false }

# For 'IntoSubsystem' trait
async-trait = "0.1.57"

# For logging
log = "0.4.17"
pin-project-lite = "0.2.13"
thiserror = "1.0.49"
miette = "5.10.0"
async-trait = "0.1.73"
atomic = "0.6.0"
bytemuck = { version = "1.14.0", features = ["derive"] }

[dev-dependencies]
# Error propagation
anyhow = "1.0.61"
anyhow = "1.0.75"
eyre = "0.6.8"
miette = { version = "5.3.0", features = ["fancy"] }
miette = { version = "5.10.0", features = ["fancy"] }

# Logging
env_logger = "0.10.0"
tracing-subscriber = "0.3.17"
tracing-test = "0.2.4"

# Tokio
tokio = { version = "1.20.1", features = ["full"] }
tokio = { version = "1.32.0", features = ["full"] }

# Hyper example
hyper = { version = "0.14.20", features = ["full"] }
Expand Down
16 changes: 9 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,19 @@ This subsystem can now be executed like this:
```rust
#[tokio::main]
async fn main() -> Result<()> {
Toplevel::new()
.start("Subsys1", subsys1)
.catch_signals()
.handle_shutdown_requests(Duration::from_millis(1000))
.await
.map_err(Into::into)
Toplevel::new(|s| async move {
s.start(SubsystemBuilder::new("Subsys1", subsys1))
})
.catch_signals()
.handle_shutdown_requests(Duration::from_millis(1000))
.await
.map_err(Into::into)
}
```

The `Toplevel` object is the root object of the subsystem tree.
Subsystems can then be started using the `start()` functionality of the toplevel object.
Subsystems can then be started in it using the `start()` method
of its `SubsystemHandle` object.

The `catch_signals()` method signals the `Toplevel` object to listen for SIGINT/SIGTERM/Ctrl+C and initiate a shutdown thereafter.

Expand Down
29 changes: 29 additions & 0 deletions TODO.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
- Search for all TODOs in code
- Port over documentation
- Port over tests






Done:

- Name
- Error handling

- SubsystemBuilder (or PreparedSubsystem, or something similar)!
- Allows creating subsystems:
- from FnOnce (non-restartable)
- from Fn (restartable)
- from `trait Subsystem`
- from `trait RestartableSubsystem`
- maybe gets passed directly into `start`

- Solution to the entire "restartable subsystems" problem:
- Make single subsystems awaitable! (Through the object returned from `start`)
- Restart is then trivial to implement.
- Make subsystem "Shutdown on Error/Panic"
- Start/await it in a loop in the parent subsystem

- Error generic
48 changes: 48 additions & 0 deletions Thoughts.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
need:
- function that runs on all exit paths of the subsystem
- that also has access to the locked parent, to remove itself from the list of children
- is list of children really important?
- atomic counter could work
- but what about error propagation?

- error propagation maybe not necessary.
- register closure that will be executed on error/shutdown of the child
- every subsystem can have 'shutdown triggers' attached to it

fixed facts:
- subsystems will never change their parent

Suggestions:
Subsystem have **no** reference to their parents.
They
- Use `joiner_tokens` for awaiting children
- Use `cancellation_token` for shutdown
- Simply 'drop' a subsystem to hard cancel an entire tree

Open question: How to propagate errors?
- Not at all? (Do we need error propagation?)
- Through the joiner_tokens?
If through the joiner_tokens:
- Have `none`/`some(vec)` in every joiner_token for collecting errors
- While walking up, put the error in the first available location
- When dropping the token, propagate errors up if unconsumed
- Uncaught errors simply get printed
This should provide a quite natural way of propagating/dropping errors,
and should work well with partial shutdown.

Open question: How to deal with errors?
- Every spawned subsystem can register functions that handle errors of their children
- Possibilities are:
- pass further up
- Ignore
- shutdown self and children
- Probably a mechanism that will be inside of the joiner_token

Open question: Ownership?
- Parents should own children
- But: HOW do children remove themselves from the parent once they are finished?
- IMPORTANT QUESTION this is. Might be the one stone that breaks this construct.
- The solution *might* be: The joiner_token removes them. That way, there is no recursive dependency.
- Might need a fancy data structure that allows efficient RAII based object tracking
- solved! (?)
-> Implemented in remote_drop_collection
36 changes: 19 additions & 17 deletions examples/01_normal_shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,40 +7,42 @@
//! If custom arguments for the subsystem coroutines are required,
//! a struct has to be used instead, as seen in other examples.

use env_logger::{Builder, Env};
use miette::Result;
use tokio::time::{sleep, Duration};
use tokio_graceful_shutdown::{SubsystemHandle, Toplevel};
use tokio_graceful_shutdown::{SubsystemBuilder, SubsystemHandle, Toplevel};

async fn subsys1(subsys: SubsystemHandle) -> Result<()> {
log::info!("Subsystem1 started.");
tracing::info!("Subsystem1 started.");
subsys.on_shutdown_requested().await;
log::info!("Shutting down Subsystem1 ...");
tracing::info!("Shutting down Subsystem1 ...");
sleep(Duration::from_millis(400)).await;
log::info!("Subsystem1 stopped.");
tracing::info!("Subsystem1 stopped.");
Ok(())
}

async fn subsys2(subsys: SubsystemHandle) -> Result<()> {
log::info!("Subsystem2 started.");
tracing::info!("Subsystem2 started.");
subsys.on_shutdown_requested().await;
log::info!("Shutting down Subsystem2 ...");
tracing::info!("Shutting down Subsystem2 ...");
sleep(Duration::from_millis(500)).await;
log::info!("Subsystem2 stopped.");
tracing::info!("Subsystem2 stopped.");
Ok(())
}

#[tokio::main]
async fn main() -> Result<()> {
// Init logging
Builder::from_env(Env::default().default_filter_or("debug")).init();
tracing_subscriber::fmt()
.with_max_level(tracing::Level::TRACE)
.init();

// Create toplevel
Toplevel::new()
.start("Subsys1", subsys1)
.start("Subsys2", subsys2)
.catch_signals()
.handle_shutdown_requests(Duration::from_millis(1000))
.await
.map_err(Into::into)
// Setup and execute subsystem tree
Toplevel::new(|s| async move {
s.start(SubsystemBuilder::new("Subsys1", subsys1));
s.start(SubsystemBuilder::new("Subsys2", subsys2));
})
.catch_signals()
.handle_shutdown_requests(Duration::from_millis(1000))
.await
.map_err(Into::into)
}
38 changes: 20 additions & 18 deletions examples/02_structs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,26 @@
//! custom parameters to be passed to the subsystem.
//!
//! There are two ways of using structs as subsystems, by either
//! wrapping them in an async closure, or by implementing the
//! wrapping them in a closure, or by implementing the
//! IntoSubsystem trait. Note, though, that the IntoSubsystem
//! trait requires an additional dependency, `async-trait`.

use async_trait::async_trait;
use env_logger::{Builder, Env};
use miette::Result;
use tokio::time::{sleep, Duration};
use tokio_graceful_shutdown::{IntoSubsystem, SubsystemHandle, Toplevel};
use tokio_graceful_shutdown::{IntoSubsystem, SubsystemBuilder, SubsystemHandle, Toplevel};

struct Subsystem1 {
arg: u32,
}

impl Subsystem1 {
async fn run(self, subsys: SubsystemHandle) -> Result<()> {
log::info!("Subsystem1 started. Extra argument: {}", self.arg);
tracing::info!("Subsystem1 started. Extra argument: {}", self.arg);
subsys.on_shutdown_requested().await;
log::info!("Shutting down Subsystem1 ...");
tracing::info!("Shutting down Subsystem1 ...");
sleep(Duration::from_millis(500)).await;
log::info!("Subsystem1 stopped.");
tracing::info!("Subsystem1 stopped.");
Ok(())
}
}
Expand All @@ -34,29 +33,32 @@ struct Subsystem2 {
#[async_trait]
impl IntoSubsystem<miette::Report> for Subsystem2 {
async fn run(self, subsys: SubsystemHandle) -> Result<()> {
log::info!("Subsystem2 started. Extra argument: {}", self.arg);
tracing::info!("Subsystem2 started. Extra argument: {}", self.arg);
subsys.on_shutdown_requested().await;
log::info!("Shutting down Subsystem2 ...");
tracing::info!("Shutting down Subsystem2 ...");
sleep(Duration::from_millis(500)).await;
log::info!("Subsystem2 stopped.");
tracing::info!("Subsystem2 stopped.");
Ok(())
}
}

#[tokio::main]
async fn main() -> Result<()> {
// Init logging
Builder::from_env(Env::default().default_filter_or("debug")).init();
tracing_subscriber::fmt()
.with_max_level(tracing::Level::TRACE)
.init();

let subsys1 = Subsystem1 { arg: 42 };
let subsys2 = Subsystem2 { arg: 69 };

// Create toplevel
Toplevel::new()
.start("Subsys1", |a| subsys1.run(a))
.start("Subsys2", subsys2.into_subsystem())
.catch_signals()
.handle_shutdown_requests(Duration::from_millis(1000))
.await
.map_err(Into::into)
// Setup and execute subsystem tree
Toplevel::new(|s| async move {
s.start(SubsystemBuilder::new("Subsys1", |a| subsys1.run(a)));
s.start(SubsystemBuilder::new("Subsys2", subsys2.into_subsystem()));
})
.catch_signals()
.handle_shutdown_requests(Duration::from_millis(1000))
.await
.map_err(Into::into)
}
28 changes: 15 additions & 13 deletions examples/03_shutdown_timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,32 @@
//! so the subsystem gets cancelled and the program returns an appropriate
//! error code.

use env_logger::{Builder, Env};
use miette::Result;
use tokio::time::{sleep, Duration};
use tokio_graceful_shutdown::{SubsystemHandle, Toplevel};
use tokio_graceful_shutdown::{SubsystemBuilder, SubsystemHandle, Toplevel};

async fn subsys1(subsys: SubsystemHandle) -> Result<()> {
log::info!("Subsystem1 started.");
tracing::info!("Subsystem1 started.");
subsys.on_shutdown_requested().await;
log::info!("Shutting down Subsystem1 ...");
tracing::info!("Shutting down Subsystem1 ...");
sleep(Duration::from_millis(2000)).await;
log::info!("Subsystem1 stopped.");
tracing::info!("Subsystem1 stopped.");
Ok(())
}

#[tokio::main]
async fn main() -> Result<()> {
// Init logging
Builder::from_env(Env::default().default_filter_or("debug")).init();
tracing_subscriber::fmt()
.with_max_level(tracing::Level::TRACE)
.init();

// Create toplevel
Toplevel::new()
.start("Subsys1", subsys1)
.catch_signals()
.handle_shutdown_requests(Duration::from_millis(500))
.await
.map_err(Into::into)
// Setup and execute subsystem tree
Toplevel::new(|s| async move {
s.start(SubsystemBuilder::new("Subsys1", subsys1));
})
.catch_signals()
.handle_shutdown_requests(Duration::from_millis(500))
.await
.map_err(Into::into)
}
Loading
Loading