diff --git a/src/error_action.rs b/src/error_action.rs index 2d9406c..5155087 100644 --- a/src/error_action.rs +++ b/src/error_action.rs @@ -1,8 +1,26 @@ use bytemuck::NoUninit; +/// Possible ways a subsystem can react to errors. +/// +/// An error will propagate upwards in the subsystem tree until +/// it reaches a subsystem that won't forward it to its parent. +/// +/// If an error reaches the [`Toplevel`](crate::Toplevel), a global shutdown will be initiated. +/// +/// Also see: +/// - [`SubsystemBuilder::on_failure`](crate::SubsystemBuilder::on_failure) +/// - [`SubsystemBuilder::on_panic`](crate::SubsystemBuilder::on_panic) +/// - [`NestedSubsystem::change_failure_action`](crate::NestedSubsystem::change_failure_action) +/// - [`NestedSubsystem::change_panic_action`](crate::NestedSubsystem::change_panic_action) +/// #[derive(Clone, Copy, Debug, Eq, PartialEq, NoUninit)] #[repr(u8)] pub enum ErrorAction { + /// Pass the error on to the parent subsystem, but don't react to it. Forward, + /// Store the error so it can be retrieved through + /// [`NestedSubsystem::join`](crate::NestedSubsystem::join), + /// then initiate a shutdown of the subsystem and its children. + /// Do not forward the error to the parent subsystem. CatchAndLocalShutdown, } diff --git a/src/lib.rs b/src/lib.rs index c86acc8..cca453b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -85,7 +85,7 @@ //! #![deny(unreachable_pub)] -//#![deny(missing_docs)] +#![deny(missing_docs)] #![doc( issue_tracker_base_url = "https://github.com/Finomnis/tokio-graceful-shutdown/issues", test(no_crate_inject, attr(deny(warnings))), diff --git a/src/subsystem/mod.rs b/src/subsystem/mod.rs index b9a6665..7073c1e 100644 --- a/src/subsystem/mod.rs +++ b/src/subsystem/mod.rs @@ -15,6 +15,15 @@ use crate::{utils::JoinerTokenRef, ErrTypeTraits, ErrorAction}; use atomic::Atomic; use tokio_util::sync::CancellationToken; +/// A nested subsystem. +/// +/// Can be used to control the subsystem or wait for it to finish. +/// +/// Dropping this value does not perform any action - the subsystem +/// will be neither cancelled, shut down or detached. +/// +/// For more information, look through the examples directory in +/// the source code. pub struct NestedSubsystem { joiner: JoinerTokenRef, cancellation_token: CancellationToken, diff --git a/src/subsystem/nested_subsystem.rs b/src/subsystem/nested_subsystem.rs index b9d453c..6347ac9 100644 --- a/src/subsystem/nested_subsystem.rs +++ b/src/subsystem/nested_subsystem.rs @@ -5,6 +5,50 @@ use crate::{errors::SubsystemJoinError, ErrTypeTraits, ErrorAction}; use super::NestedSubsystem; impl NestedSubsystem { + /// Wait for the subsystem to be finished. + /// + /// If its failure/panic action is set to [`ErrorAction::CatchAndLocalShutdown`], + /// this function will return the list of errors caught by the subsystem. + /// + /// # Returns + /// + /// A [`SubsystemJoinError`] on failure. + /// + /// # Examples + /// + /// ``` + /// use miette::Result; + /// use tokio::time::{sleep, Duration}; + /// use tokio_graceful_shutdown::SubsystemHandle; + /// + /// async fn nested_subsystem(subsys: SubsystemHandle) -> Result<()> { + /// // This subsystem does nothing but wait for the shutdown to happen + /// subsys.on_shutdown_requested().await; + /// Ok(()) + /// } + /// + /// async fn subsystem(subsys: SubsystemHandle) -> Result<()> { + /// // This subsystem waits for one second and then performs a partial shutdown + /// + /// // Spawn nested subsystem. + /// // Make sure to catch errors, so that they are properly + /// // returned at `.join()`. + /// let nested = subsys.start( + /// SubsystemBuilder::new("nested", nested_subsystem) + /// .on_failure(ErrorAction::CatchAndLocalShutdown) + /// .on_panic(ErrorAction::CatchAndLocalShutdown) + /// ); + /// + /// // Wait for a second + /// sleep(Duration::from_millis(1000)).await; + /// + /// // Perform a partial shutdown of the nested subsystem + /// nested.initiate_shutdown(); + /// nested.join().await?; + /// + /// Ok(()) + /// } + /// ``` pub async fn join(&self) -> Result<(), SubsystemJoinError> { self.joiner.join().await; @@ -16,16 +60,25 @@ impl NestedSubsystem { } } + /// Signals the subsystem and all of its children to shut down. pub fn initiate_shutdown(&self) { self.cancellation_token.cancel() } + /// Changes the way this subsystem should react to failures, + /// meaning if it or one of its children returns an `Err` value. + /// + /// For more information, see [ErrorAction]. pub fn change_failure_action(&self, action: ErrorAction) { self.error_actions .on_failure .store(action, Ordering::Relaxed); } + /// Changes the way this subsystem should react if it or one + /// of its children panic. + /// + /// For more information, see [ErrorAction]. pub fn change_panic_action(&self, action: ErrorAction) { self.error_actions.on_panic.store(action, Ordering::Relaxed); } diff --git a/src/subsystem/subsystem_builder.rs b/src/subsystem/subsystem_builder.rs index 65331ca..c1fbca9 100644 --- a/src/subsystem/subsystem_builder.rs +++ b/src/subsystem/subsystem_builder.rs @@ -2,6 +2,8 @@ use std::{borrow::Cow, future::Future, marker::PhantomData}; use crate::{ErrTypeTraits, ErrorAction, SubsystemHandle}; +/// Configures a subsystem before it gets spawned through +/// [`SubsystemHandle::start`]. pub struct SubsystemBuilder<'a, ErrType, Err, Fut, Subsys> where ErrType: ErrTypeTraits, @@ -24,6 +26,14 @@ where Fut: 'static + Future> + Send, Err: Into, { + /// Creates a new SubsystemBuilder from a given subsystem + /// function. + /// + /// # Arguments + /// + /// * `name` - The name of the subsystem. Primarily to identify the + /// subsystem in error messages. + /// * `subsystem` - The subsystem function that the subsystem will execute. pub fn new(name: impl Into>, subsystem: Subsys) -> Self { Self { name: name.into(), @@ -34,11 +44,23 @@ where } } + /// Sets the way this subsystem should react to failures, + /// meaning if it or one of its children return an `Err` value. + /// + /// The default is [`ErrorAction::Forward`]. + /// + /// For more information, see [ErrorAction]. pub fn on_failure(mut self, action: ErrorAction) -> Self { self.failure_action = action; self } + /// Sets the way this subsystem should react if it or one + /// of its children panic. + /// + /// The default is [`ErrorAction::Forward`]. + /// + /// For more information, see [ErrorAction]. pub fn on_panic(mut self, action: ErrorAction) -> Self { self.panic_action = action; self diff --git a/src/subsystem/subsystem_handle.rs b/src/subsystem/subsystem_handle.rs index f4b5d26..603e1a6 100644 --- a/src/subsystem/subsystem_handle.rs +++ b/src/subsystem/subsystem_handle.rs @@ -25,7 +25,7 @@ struct Inner { children: RemotelyDroppableItems, } -// All the things needed to manage nested subsystems and wait for cancellation +/// The handle given to each subsystem through which the subsystem can interact with this crate. pub struct SubsystemHandle { inner: ManuallyDrop>, // When dropped, redirect Self into this channel. @@ -40,6 +40,38 @@ pub(crate) struct WeakSubsystemHandle { } impl SubsystemHandle { + /// Start a nested subsystem. + /// + /// Once called, the subsystem will be started immediately, similar to [`tokio::spawn`]. + /// + /// # Arguments + /// + /// * `builder` - The [`SubsystemBuilder`] that contains all the information + /// about the subsystem that should be spawned. + /// + /// # Returns + /// + /// A [`NestedSubsystem`] that can be used to control or join the subsystem. + /// + /// # Examples + /// + /// ``` + /// use miette::Result; + /// use tokio_graceful_shutdown::SubsystemHandle; + /// + /// async fn nested_subsystem(subsys: SubsystemHandle) -> Result<()> { + /// subsys.on_shutdown_requested().await; + /// Ok(()) + /// } + /// + /// async fn my_subsystem(subsys: SubsystemHandle) -> Result<()> { + /// // start a nested subsystem + /// subsys.start(SubsystemBuilder::new("Nested", nested_subsystem)); + /// + /// subsys.on_shutdown_requested().await; + /// Ok(()) + /// } + /// ``` pub fn start( &self, builder: SubsystemBuilder, @@ -134,6 +166,7 @@ impl SubsystemHandle { } } + /// Waits until all the children of this subsystem are finished. pub async fn wait_for_children(&mut self) { self.inner.joiner_token.join_children().await } @@ -149,22 +182,127 @@ impl SubsystemHandle { receiver } - pub fn initiate_shutdown(&self) { - self.inner.toplevel_cancellation_token.cancel(); - } - - pub fn initiate_local_shutdown(&self) { - self.inner.cancellation_token.cancel(); - } - + /// Wait for the shutdown mode to be triggered. + /// + /// Once the shutdown mode is entered, all existing calls to this + /// method will be released and future calls to this method will + /// return immediately. + /// + /// This is the primary method of subsystems to react to + /// the shutdown requests. Most often, it will be used in `tokio::select` + /// statements to cancel other code as soon as the shutdown is requested. + /// + /// # Examples + /// + /// ``` + /// use miette::Result; + /// use tokio::time::{sleep, Duration}; + /// use tokio_graceful_shutdown::SubsystemHandle; + /// + /// async fn countdown() { + /// for i in (1..10).rev() { + /// log::info!("Countdown: {}", i); + /// sleep(Duration::from_millis(1000)).await; + /// } + /// } + /// + /// async fn countdown_subsystem(subsys: SubsystemHandle) -> Result<()> { + /// log::info!("Starting countdown ..."); + /// + /// // This cancels the countdown as soon as shutdown + /// // mode was entered + /// tokio::select! { + /// _ = subsys.on_shutdown_requested() => { + /// log::info!("Countdown cancelled."); + /// }, + /// _ = countdown() => { + /// log::info!("Countdown finished."); + /// } + /// }; + /// + /// Ok(()) + /// } + /// ``` pub async fn on_shutdown_requested(&self) { self.inner.cancellation_token.cancelled().await } + /// Returns whether a shutdown should be performed now. + /// + /// This method is provided for subsystems that need to query the shutdown + /// request state repeatedly. + /// + /// This can be useful in scenarios where a subsystem depends on the graceful + /// shutdown of its nested coroutines before it can run final cleanup steps itself. + /// + /// # Examples + /// + /// ``` + /// use miette::Result; + /// use tokio::time::{sleep, Duration}; + /// use tokio_graceful_shutdown::SubsystemHandle; + /// + /// async fn uncancellable_action(subsys: &SubsystemHandle) { + /// tokio::select! { + /// // Execute an action. A dummy `sleep` in this case. + /// _ = sleep(Duration::from_millis(1000)) => { + /// log::info!("Action finished."); + /// } + /// // Perform a shutdown if requested + /// _ = subsys.on_shutdown_requested() => { + /// log::info!("Action aborted."); + /// }, + /// } + /// } + /// + /// async fn my_subsystem(subsys: SubsystemHandle) -> Result<()> { + /// log::info!("Starting subsystem ..."); + /// + /// // We cannot do a `tokio::select` with `on_shutdown_requested` + /// // here, because a shutdown would cancel the action without giving + /// // it the chance to react first. + /// while !subsys.is_shutdown_requested() { + /// uncancellable_action(&subsys).await; + /// } + /// + /// log::info!("Subsystem stopped."); + /// + /// Ok(()) + /// } + /// ``` pub fn is_shutdown_requested(&self) -> bool { self.inner.cancellation_token.is_cancelled() } + /// Triggers a shutdown of the entire subsystem tree. + /// + /// # Examples + /// + /// ``` + /// use miette::Result; + /// use tokio::time::{sleep, Duration}; + /// use tokio_graceful_shutdown::SubsystemHandle; + /// + /// async fn stop_subsystem(subsys: SubsystemHandle) -> Result<()> { + /// // This subsystem wait for one second and then stops the program. + /// sleep(Duration::from_millis(1000)).await; + /// + /// // Shut down the entire subsystem tree + /// subsys.initiate_shutdown(); + /// + /// Ok(()) + /// } + /// ``` + pub fn initiate_shutdown(&self) { + self.inner.toplevel_cancellation_token.cancel(); + } + + /// Triggers a shutdown of the current subsystem and all + /// of its children. + pub fn initiate_local_shutdown(&self) { + self.inner.cancellation_token.cancel(); + } + pub(crate) fn get_cancellation_token(&self) -> &CancellationToken { &self.inner.cancellation_token }