Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SubsystemBuilder::detach(), add orchestrated shutdown example #83

Merged
merged 5 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion examples/19_sequential_shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ async fn nested3(subsys: SubsystemHandle, nested2_finished: SubsystemFinishedFut
}

async fn root(subsys: SubsystemHandle) -> Result<()> {
// This subsystem shuts down the nested subsystem after 5 seconds.
tracing::info!("Root started.");

tracing::info!("Starting nested subsystems ...");
Expand Down
83 changes: 83 additions & 0 deletions examples/20_orchestrated_shutdown_order.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
//! This example demonstrates how a parent subsystem could orchestrate
//! the shutdown order of its children manually.
//!
//! This is done by spawning the children in 'detached' mode to prevent
//! that the shutdown signal gets passed to the children.
//! Then, the parent calls `initialize_shutdown` on each child manually.

use miette::Result;
use tokio::time::{sleep, Duration};
use tokio_graceful_shutdown::{FutureExt, SubsystemBuilder, SubsystemHandle, Toplevel};

async fn counter(id: &str) {
let mut i = 0;
loop {
tracing::info!("{id}: {i}");
i += 1;
sleep(Duration::from_millis(50)).await;
}
}

async fn child(name: &str, subsys: SubsystemHandle) -> Result<()> {
tracing::info!("{name} started.");
if counter(name).cancel_on_shutdown(&subsys).await.is_ok() {
tracing::info!("{name} counter finished.");
} else {
tracing::info!("{name} shutting down ...");
sleep(Duration::from_millis(200)).await;
}
subsys.on_shutdown_requested().await;
tracing::info!("{name} stopped.");
Ok(())
}

async fn parent(subsys: SubsystemHandle) -> Result<()> {
tracing::info!("Parent started.");

tracing::info!("Starting detached nested subsystems ...");
let nested1 =
subsys.start(SubsystemBuilder::new("Nested1", |s| child("Nested1", s)).detached());
let nested2 =
subsys.start(SubsystemBuilder::new("Nested2", |s| child("Nested2", s)).detached());
let nested3 =
subsys.start(SubsystemBuilder::new("Nested3", |s| child("Nested3", s)).detached());
tracing::info!("Nested subsystems started.");

// Wait for the shutdown to happen
subsys.on_shutdown_requested().await;

// Shut down children sequentially. As they are detached, they will not shutdown on their own,
// but need to be shut down manually via `initiate_shutdown`.
tracing::info!("Initiating Nested1 shutdown ...");
nested1.initiate_shutdown();
nested1.join().await?;
tracing::info!("Initiating Nested2 shutdown ...");
nested2.initiate_shutdown();
nested2.join().await?;
tracing::info!("Initiating Nested3 shutdown ...");
nested3.initiate_shutdown();
nested3.join().await?;

tracing::info!("All children finished, stopping Root ...");
sleep(Duration::from_millis(200)).await;
tracing::info!("Root stopped.");

Ok(())
}

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
// Init logging
tracing_subscriber::fmt()
.with_max_level(tracing::Level::TRACE)
.init();

// Setup and execute subsystem tree
Toplevel::new(|s| async move {
s.start(SubsystemBuilder::new("parent", parent));
})
.catch_signals()
.handle_shutdown_requests(Duration::from_millis(1000))
.await
.map_err(Into::into)
}
13 changes: 13 additions & 0 deletions src/subsystem/subsystem_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ where
pub(crate) subsystem: Subsys,
pub(crate) failure_action: ErrorAction,
pub(crate) panic_action: ErrorAction,
pub(crate) detached: bool,
#[allow(clippy::type_complexity)]
_phantom: PhantomData<fn() -> (Fut, ErrType, Err)>,
}
Expand All @@ -40,6 +41,7 @@ where
subsystem,
failure_action: ErrorAction::Forward,
panic_action: ErrorAction::Forward,
detached: false,
_phantom: Default::default(),
}
}
Expand All @@ -65,4 +67,15 @@ where
self.panic_action = action;
self
}

/// Detaches the subsystem from the parent, causing a shutdown request to not
/// be propagated from the parent to the child automatically.
///
/// If this option is set, the parent needs to call [`initiate_shutdown()`](crate::NestedSubsystem::initiate_shutdown)
/// on the child during shutdown, otherwise the child will not
/// react to the shutdown request. So use this option with care.
pub fn detached(mut self) -> Self {
self.detached = true;
self
}
}
8 changes: 7 additions & 1 deletion src/subsystem/subsystem_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ impl<ErrType: ErrTypeTraits> SubsystemHandle<ErrType> {
on_failure: Atomic::new(builder.failure_action),
on_panic: Atomic::new(builder.panic_action),
},
builder.detached,
)
}

Expand All @@ -96,6 +97,7 @@ impl<ErrType: ErrTypeTraits> SubsystemHandle<ErrType> {
name: Arc<str>,
subsystem: Subsys,
error_actions: ErrorActions,
detached: bool,
) -> NestedSubsystem<ErrType>
where
Subsys: 'static + FnOnce(SubsystemHandle<ErrType>) -> Fut + Send,
Expand All @@ -106,7 +108,11 @@ impl<ErrType: ErrTypeTraits> SubsystemHandle<ErrType> {

let (error_sender, errors) = mpsc::unbounded_channel();

let cancellation_token = self.inner.cancellation_token.child_token();
let cancellation_token = if detached {
CancellationToken::new()
} else {
self.inner.cancellation_token.child_token()
};

let error_actions = Arc::new(error_actions);

Expand Down
1 change: 1 addition & 0 deletions src/toplevel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ impl<ErrType: ErrTypeTraits> Toplevel<ErrType> {
on_failure: Atomic::new(ErrorAction::Forward),
on_panic: Atomic::new(ErrorAction::Forward),
},
false,
);

Self {
Expand Down
45 changes: 45 additions & 0 deletions tests/integration_test_2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,3 +284,48 @@ async fn subsystem_finished_works_correctly() {
.await;
assert!(result.is_ok());
}

#[tokio::test]
#[traced_test]
async fn shutdown_does_not_propagate_to_detached_subsystem() {
let (nested_started, set_nested_started) = Event::create();
let (nested_finished, set_nested_finished) = Event::create();

let detached_subsystem = |subsys: SubsystemHandle| async move {
set_nested_started();
subsys.on_shutdown_requested().await;
set_nested_finished();
BoxedResult::Ok(())
};

let subsystem = |subsys: SubsystemHandle| async move {
let nested = subsys.start(SubsystemBuilder::new("detached", detached_subsystem).detached());
sleep(Duration::from_millis(20)).await;
assert!(nested_started.get());
assert!(!nested_finished.get());

subsys.on_shutdown_requested().await;

sleep(Duration::from_millis(20)).await;
assert!(!nested_finished.get());

nested.initiate_shutdown();

sleep(Duration::from_millis(20)).await;
assert!(nested_finished.get());

BoxedResult::Ok(())
};

let toplevel = Toplevel::new(move |s| async move {
s.start(SubsystemBuilder::new("subsys", subsystem));

sleep(Duration::from_millis(100)).await;
s.request_shutdown();
});

let result = toplevel
.handle_shutdown_requests(Duration::from_millis(400))
.await;
assert!(result.is_ok());
}
Loading