Skip to content

Commit

Permalink
Add documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
Finomnis committed Oct 13, 2023
1 parent 4403b14 commit 1010d07
Show file tree
Hide file tree
Showing 6 changed files with 250 additions and 10 deletions.
18 changes: 18 additions & 0 deletions src/error_action.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,26 @@
use bytemuck::NoUninit;

/// Possible ways a subsystem can react to errors.
///
/// An error will propagate upwards in the subsystem tree until
/// it reaches a subsystem that won't forward it to its parent.
///
/// If an error reaches the [`Toplevel`](crate::Toplevel), a global shutdown will be initiated.
///
/// Also see:
/// - [`SubsystemBuilder::on_failure`](crate::SubsystemBuilder::on_failure)
/// - [`SubsystemBuilder::on_panic`](crate::SubsystemBuilder::on_panic)
/// - [`NestedSubsystem::change_failure_action`](crate::NestedSubsystem::change_failure_action)
/// - [`NestedSubsystem::change_panic_action`](crate::NestedSubsystem::change_panic_action)
///
#[derive(Clone, Copy, Debug, Eq, PartialEq, NoUninit)]
#[repr(u8)]
pub enum ErrorAction {
/// Pass the error on to the parent subsystem, but don't react to it.
Forward,
/// Store the error so it can be retrieved through
/// [`NestedSubsystem::join`](crate::NestedSubsystem::join),
/// then initiate a shutdown of the subsystem and its children.
/// Do not forward the error to the parent subsystem.
CatchAndLocalShutdown,
}
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
//!
#![deny(unreachable_pub)]
//#![deny(missing_docs)]
#![deny(missing_docs)]
#![doc(
issue_tracker_base_url = "https://github.com/Finomnis/tokio-graceful-shutdown/issues",
test(no_crate_inject, attr(deny(warnings))),
Expand Down
9 changes: 9 additions & 0 deletions src/subsystem/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@ use crate::{utils::JoinerTokenRef, ErrTypeTraits, ErrorAction};
use atomic::Atomic;
use tokio_util::sync::CancellationToken;

/// A nested subsystem.
///
/// Can be used to control the subsystem or wait for it to finish.
///
/// Dropping this value does not perform any action - the subsystem
/// will be neither cancelled, shut down or detached.
///
/// For more information, look through the examples directory in
/// the source code.
pub struct NestedSubsystem<ErrType: ErrTypeTraits> {
joiner: JoinerTokenRef,
cancellation_token: CancellationToken,
Expand Down
53 changes: 53 additions & 0 deletions src/subsystem/nested_subsystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,50 @@ use crate::{errors::SubsystemJoinError, ErrTypeTraits, ErrorAction};
use super::NestedSubsystem;

impl<ErrType: ErrTypeTraits> NestedSubsystem<ErrType> {
/// Wait for the subsystem to be finished.
///
/// If its failure/panic action is set to [`ErrorAction::CatchAndLocalShutdown`],
/// this function will return the list of errors caught by the subsystem.
///
/// # Returns
///
/// A [`SubsystemJoinError`] on failure.
///
/// # Examples
///
/// ```
/// use miette::Result;
/// use tokio::time::{sleep, Duration};
/// use tokio_graceful_shutdown::SubsystemHandle;
///
/// async fn nested_subsystem(subsys: SubsystemHandle) -> Result<()> {
/// // This subsystem does nothing but wait for the shutdown to happen
/// subsys.on_shutdown_requested().await;
/// Ok(())
/// }
///
/// async fn subsystem(subsys: SubsystemHandle) -> Result<()> {
/// // This subsystem waits for one second and then performs a partial shutdown
///
/// // Spawn nested subsystem.
/// // Make sure to catch errors, so that they are properly
/// // returned at `.join()`.
/// let nested = subsys.start(
/// SubsystemBuilder::new("nested", nested_subsystem)
/// .on_failure(ErrorAction::CatchAndLocalShutdown)
/// .on_panic(ErrorAction::CatchAndLocalShutdown)
/// );
///
/// // Wait for a second
/// sleep(Duration::from_millis(1000)).await;
///
/// // Perform a partial shutdown of the nested subsystem
/// nested.initiate_shutdown();
/// nested.join().await?;
///
/// Ok(())
/// }
/// ```
pub async fn join(&self) -> Result<(), SubsystemJoinError<ErrType>> {
self.joiner.join().await;

Expand All @@ -16,16 +60,25 @@ impl<ErrType: ErrTypeTraits> NestedSubsystem<ErrType> {
}
}

/// Signals the subsystem and all of its children to shut down.
pub fn initiate_shutdown(&self) {
self.cancellation_token.cancel()
}

/// Changes the way this subsystem should react to failures,
/// meaning if it or one of its children returns an `Err` value.
///
/// For more information, see [ErrorAction].
pub fn change_failure_action(&self, action: ErrorAction) {
self.error_actions
.on_failure
.store(action, Ordering::Relaxed);
}

/// Changes the way this subsystem should react if it or one
/// of its children panic.
///
/// For more information, see [ErrorAction].
pub fn change_panic_action(&self, action: ErrorAction) {
self.error_actions.on_panic.store(action, Ordering::Relaxed);
}
Expand Down
22 changes: 22 additions & 0 deletions src/subsystem/subsystem_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use std::{borrow::Cow, future::Future, marker::PhantomData};

use crate::{ErrTypeTraits, ErrorAction, SubsystemHandle};

/// Configures a subsystem before it gets spawned through
/// [`SubsystemHandle::start`].
pub struct SubsystemBuilder<'a, ErrType, Err, Fut, Subsys>
where
ErrType: ErrTypeTraits,
Expand All @@ -24,6 +26,14 @@ where
Fut: 'static + Future<Output = Result<(), Err>> + Send,
Err: Into<ErrType>,
{
/// Creates a new SubsystemBuilder from a given subsystem
/// function.
///
/// # Arguments
///
/// * `name` - The name of the subsystem. Primarily to identify the
/// subsystem in error messages.
/// * `subsystem` - The subsystem function that the subsystem will execute.
pub fn new(name: impl Into<Cow<'a, str>>, subsystem: Subsys) -> Self {
Self {
name: name.into(),
Expand All @@ -34,11 +44,23 @@ where
}
}

/// Sets the way this subsystem should react to failures,
/// meaning if it or one of its children return an `Err` value.
///
/// The default is [`ErrorAction::Forward`].
///
/// For more information, see [ErrorAction].
pub fn on_failure(mut self, action: ErrorAction) -> Self {
self.failure_action = action;
self
}

/// Sets the way this subsystem should react if it or one
/// of its children panic.
///
/// The default is [`ErrorAction::Forward`].
///
/// For more information, see [ErrorAction].
pub fn on_panic(mut self, action: ErrorAction) -> Self {
self.panic_action = action;
self
Expand Down
156 changes: 147 additions & 9 deletions src/subsystem/subsystem_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ struct Inner<ErrType: ErrTypeTraits> {
children: RemotelyDroppableItems<SubsystemRunner>,
}

// All the things needed to manage nested subsystems and wait for cancellation
/// The handle given to each subsystem through which the subsystem can interact with this crate.
pub struct SubsystemHandle<ErrType: ErrTypeTraits = BoxedError> {
inner: ManuallyDrop<Inner<ErrType>>,
// When dropped, redirect Self into this channel.
Expand All @@ -40,6 +40,38 @@ pub(crate) struct WeakSubsystemHandle<ErrType: ErrTypeTraits> {
}

impl<ErrType: ErrTypeTraits> SubsystemHandle<ErrType> {
/// Start a nested subsystem.
///
/// Once called, the subsystem will be started immediately, similar to [`tokio::spawn`].
///
/// # Arguments
///
/// * `builder` - The [`SubsystemBuilder`] that contains all the information
/// about the subsystem that should be spawned.
///
/// # Returns
///
/// A [`NestedSubsystem`] that can be used to control or join the subsystem.
///
/// # Examples
///
/// ```
/// use miette::Result;
/// use tokio_graceful_shutdown::SubsystemHandle;
///
/// async fn nested_subsystem(subsys: SubsystemHandle) -> Result<()> {
/// subsys.on_shutdown_requested().await;
/// Ok(())
/// }
///
/// async fn my_subsystem(subsys: SubsystemHandle) -> Result<()> {
/// // start a nested subsystem
/// subsys.start(SubsystemBuilder::new("Nested", nested_subsystem));
///
/// subsys.on_shutdown_requested().await;
/// Ok(())
/// }
/// ```
pub fn start<Err, Fut, Subsys>(
&self,
builder: SubsystemBuilder<ErrType, Err, Fut, Subsys>,
Expand Down Expand Up @@ -134,6 +166,7 @@ impl<ErrType: ErrTypeTraits> SubsystemHandle<ErrType> {
}
}

/// Waits until all the children of this subsystem are finished.
pub async fn wait_for_children(&mut self) {
self.inner.joiner_token.join_children().await
}
Expand All @@ -149,22 +182,127 @@ impl<ErrType: ErrTypeTraits> SubsystemHandle<ErrType> {
receiver
}

pub fn initiate_shutdown(&self) {
self.inner.toplevel_cancellation_token.cancel();
}

pub fn initiate_local_shutdown(&self) {
self.inner.cancellation_token.cancel();
}

/// Wait for the shutdown mode to be triggered.
///
/// Once the shutdown mode is entered, all existing calls to this
/// method will be released and future calls to this method will
/// return immediately.
///
/// This is the primary method of subsystems to react to
/// the shutdown requests. Most often, it will be used in `tokio::select`
/// statements to cancel other code as soon as the shutdown is requested.
///
/// # Examples
///
/// ```
/// use miette::Result;
/// use tokio::time::{sleep, Duration};
/// use tokio_graceful_shutdown::SubsystemHandle;
///
/// async fn countdown() {
/// for i in (1..10).rev() {
/// log::info!("Countdown: {}", i);
/// sleep(Duration::from_millis(1000)).await;
/// }
/// }
///
/// async fn countdown_subsystem(subsys: SubsystemHandle) -> Result<()> {
/// log::info!("Starting countdown ...");
///
/// // This cancels the countdown as soon as shutdown
/// // mode was entered
/// tokio::select! {
/// _ = subsys.on_shutdown_requested() => {
/// log::info!("Countdown cancelled.");
/// },
/// _ = countdown() => {
/// log::info!("Countdown finished.");
/// }
/// };
///
/// Ok(())
/// }
/// ```
pub async fn on_shutdown_requested(&self) {
self.inner.cancellation_token.cancelled().await
}

/// Returns whether a shutdown should be performed now.
///
/// This method is provided for subsystems that need to query the shutdown
/// request state repeatedly.
///
/// This can be useful in scenarios where a subsystem depends on the graceful
/// shutdown of its nested coroutines before it can run final cleanup steps itself.
///
/// # Examples
///
/// ```
/// use miette::Result;
/// use tokio::time::{sleep, Duration};
/// use tokio_graceful_shutdown::SubsystemHandle;
///
/// async fn uncancellable_action(subsys: &SubsystemHandle) {
/// tokio::select! {
/// // Execute an action. A dummy `sleep` in this case.
/// _ = sleep(Duration::from_millis(1000)) => {
/// log::info!("Action finished.");
/// }
/// // Perform a shutdown if requested
/// _ = subsys.on_shutdown_requested() => {
/// log::info!("Action aborted.");
/// },
/// }
/// }
///
/// async fn my_subsystem(subsys: SubsystemHandle) -> Result<()> {
/// log::info!("Starting subsystem ...");
///
/// // We cannot do a `tokio::select` with `on_shutdown_requested`
/// // here, because a shutdown would cancel the action without giving
/// // it the chance to react first.
/// while !subsys.is_shutdown_requested() {
/// uncancellable_action(&subsys).await;
/// }
///
/// log::info!("Subsystem stopped.");
///
/// Ok(())
/// }
/// ```
pub fn is_shutdown_requested(&self) -> bool {
self.inner.cancellation_token.is_cancelled()
}

/// Triggers a shutdown of the entire subsystem tree.
///
/// # Examples
///
/// ```
/// use miette::Result;
/// use tokio::time::{sleep, Duration};
/// use tokio_graceful_shutdown::SubsystemHandle;
///
/// async fn stop_subsystem(subsys: SubsystemHandle) -> Result<()> {
/// // This subsystem wait for one second and then stops the program.
/// sleep(Duration::from_millis(1000)).await;
///
/// // Shut down the entire subsystem tree
/// subsys.initiate_shutdown();
///
/// Ok(())
/// }
/// ```
pub fn initiate_shutdown(&self) {
self.inner.toplevel_cancellation_token.cancel();
}

/// Triggers a shutdown of the current subsystem and all
/// of its children.
pub fn initiate_local_shutdown(&self) {
self.inner.cancellation_token.cancel();
}

pub(crate) fn get_cancellation_token(&self) -> &CancellationToken {
&self.inner.cancellation_token
}
Expand Down

0 comments on commit 1010d07

Please sign in to comment.