Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase test coverage #67

Merged
merged 14 commits into from
Oct 22, 2023
Merged
34 changes: 31 additions & 3 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,21 @@ use std::sync::Arc;

use miette::Diagnostic;
use thiserror::Error;
use tokio::sync::mpsc;

use crate::ErrTypeTraits;

/// This enum contains all the possible errors that could be returned
/// by [`handle_shutdown_requests()`](crate::Toplevel::handle_shutdown_requests).
#[derive(Error, Debug, Diagnostic)]
#[derive(Debug, Error, Diagnostic)]
pub enum GracefulShutdownError<ErrType: ErrTypeTraits = crate::BoxedError> {
/// At least one subsystem caused an error.
#[error("at least one subsystem returned an error")]
#[diagnostic(code(graceful_shutdown::failed))]
#[error("at least one subsystem returned an error")]
SubsystemsFailed(#[related] Box<[SubsystemError<ErrType>]>),
/// The shutdown did not finish within the given timeout.
#[error("shutdown timed out")]
#[diagnostic(code(graceful_shutdown::timeout))]
#[error("shutdown timed out")]
ShutdownTimeout(#[related] Box<[SubsystemError<ErrType>]>),
}

Expand Down Expand Up @@ -124,7 +125,34 @@ impl<ErrType: ErrTypeTraits> SubsystemError<ErrType> {
/// [`cancel_on_shutdown()`](crate::FutureExt::cancel_on_shutdown).
#[derive(Error, Debug, Diagnostic)]
#[error("A shutdown request caused this task to be cancelled")]
#[diagnostic(code(graceful_shutdown::future::cancelled_by_shutdown))]
pub struct CancelledByShutdown;

// This function contains code that stems from the principle
// of defensive coding - meaning, handle potential errors
// gracefully, even if they should not happen.
// Therefore it is in this special function, so we don't
// get coverage problems.
pub(crate) fn handle_dropped_error<ErrType: ErrTypeTraits>(
result: Result<(), mpsc::error::SendError<ErrType>>,
) {
if let Err(mpsc::error::SendError(e)) = result {
tracing::warn!("An error got dropped: {e:?}");
}
}

// This function contains code that stems from the principle
// of defensive coding - meaning, handle potential errors
// gracefully, even if they should not happen.
// Therefore it is in this special function, so we don't
// get coverage problems.
pub(crate) fn handle_unhandled_stopreason<ErrType: ErrTypeTraits>(
maybe_stop_reason: Option<SubsystemError<ErrType>>,
) {
if let Some(stop_reason) = maybe_stop_reason {
tracing::warn!("Unhandled stop reason: {:?}", stop_reason);
}
}

#[cfg(test)]
mod tests;
55 changes: 46 additions & 9 deletions src/errors/tests.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
use tracing_test::traced_test;

use crate::BoxedError;

use super::*;

fn examine_report(report: miette::Report) {
fn examine_report(
error: impl miette::Diagnostic + std::error::Error + std::fmt::Debug + Sync + Send + 'static,
) {
println!("{}", error);
println!("{:?}", error);
println!("{:?}", error.source());
println!("{}", error.code().unwrap());
// Convert to report
let report: miette::Report = error.into();
println!("{}", report);
println!("{:?}", report);
// Convert to std::error::Error
Expand All @@ -13,14 +23,21 @@ fn examine_report(report: miette::Report) {

#[test]
fn errors_can_be_converted_to_diagnostic() {
examine_report(GracefulShutdownError::ShutdownTimeout::<BoxedError>(Box::new([])).into());
examine_report(GracefulShutdownError::SubsystemsFailed::<BoxedError>(Box::new([])).into());
examine_report(SubsystemJoinError::SubsystemsFailed::<BoxedError>(Arc::new([])).into());
examine_report(SubsystemError::Panicked::<BoxedError>("".into()).into());
examine_report(
SubsystemError::Failed::<BoxedError>("".into(), SubsystemFailure("".into())).into(),
);
examine_report(CancelledByShutdown.into());
examine_report(GracefulShutdownError::ShutdownTimeout::<BoxedError>(
Box::new([]),
));
examine_report(GracefulShutdownError::SubsystemsFailed::<BoxedError>(
Box::new([]),
));
examine_report(SubsystemJoinError::SubsystemsFailed::<BoxedError>(
Arc::new([]),
));
examine_report(SubsystemError::Panicked::<BoxedError>("".into()));
examine_report(SubsystemError::Failed::<BoxedError>(
"".into(),
SubsystemFailure("".into()),
));
examine_report(CancelledByShutdown);
}

#[test]
Expand Down Expand Up @@ -61,3 +78,23 @@ fn extract_contained_error_from_convert_subsystem_failure() {
assert_eq!(msg, *failure);
assert_eq!(msg, failure.into_error());
}

#[test]
#[traced_test]
fn handle_dropped_errors() {
handle_dropped_error(Err(mpsc::error::SendError(BoxedError::from(String::from(
"ABC",
)))));

assert!(logs_contain("An error got dropped: \"ABC\""));
}

#[test]
#[traced_test]
fn handle_unhandled_stopreasons() {
handle_unhandled_stopreason(Some(SubsystemError::<BoxedError>::Panicked(Arc::from(
"def",
))));

assert!(logs_contain("Unhandled stop reason: Panicked(\"def\")"));
}
18 changes: 8 additions & 10 deletions src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,10 @@ async fn run_subsystem<Fut, Subsys, ErrType: ErrTypeTraits, Err>(
Ok(Ok(())) => None,
Ok(Err(e)) => Some(SubsystemError::Failed(name, SubsystemFailure(e))),
Err(e) => {
if e.is_panic() {
Some(SubsystemError::Panicked(name))
} else {
// Don't do anything in case of a cancellation;
// cancellations can't be forwarded (because the
// current function we are in will be cancelled
// simultaneously)
None
}
// We can assume that this is a panic, because a cancellation
// can never happen as long as we still hold `guard`.
assert!(e.is_panic());
Some(SubsystemError::Panicked(name))
}
};

Expand All @@ -95,7 +90,10 @@ async fn run_subsystem<Fut, Subsys, ErrType: ErrTypeTraits, Err>(
// It is still important that the handle does not leak out of the subsystem.
let subsystem_handle = match redirected_subsystem_handle.try_recv() {
Ok(s) => s,
Err(_) => panic!("The SubsystemHandle object must not be leaked out of the subsystem!"),
Err(_) => {
tracing::error!("The SubsystemHandle object must not be leaked out of the subsystem!");
panic!("The SubsystemHandle object must not be leaked out of the subsystem!");
}
};

// Raise potential errors
Expand Down
8 changes: 3 additions & 5 deletions src/subsystem/subsystem_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tokio::sync::{mpsc, oneshot};
use tokio_util::sync::CancellationToken;

use crate::{
errors::SubsystemError,
errors::{handle_dropped_error, SubsystemError},
runner::{AliveGuard, SubsystemRunner},
utils::{remote_drop_collection::RemotelyDroppableItems, JoinerToken},
BoxedError, ErrTypeTraits, ErrorAction, NestedSubsystem, SubsystemBuilder,
Expand Down Expand Up @@ -124,9 +124,7 @@ impl<ErrType: ErrTypeTraits> SubsystemHandle<ErrType> {
match error_action {
ErrorAction::Forward => Some(e),
ErrorAction::CatchAndLocalShutdown => {
if let Err(mpsc::error::SendError(e)) = error_sender.send(e) {
tracing::warn!("An error got dropped: {e:?}");
};
handle_dropped_error(error_sender.send(e));
cancellation_token.cancel();
None
}
Expand Down Expand Up @@ -167,7 +165,7 @@ impl<ErrType: ErrTypeTraits> SubsystemHandle<ErrType> {
}

/// Waits until all the children of this subsystem are finished.
pub async fn wait_for_children(&mut self) {
pub async fn wait_for_children(&self) {
self.inner.joiner_token.join_children().await
}

Expand Down
17 changes: 8 additions & 9 deletions src/toplevel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;

use crate::{
errors::{GracefulShutdownError, SubsystemError},
errors::{handle_dropped_error, GracefulShutdownError, SubsystemError},
signal_handling::wait_for_signal,
subsystem::{self, ErrorActions},
BoxedError, ErrTypeTraits, ErrorAction, NestedSubsystem, SubsystemHandle,
Expand Down Expand Up @@ -74,9 +74,7 @@ impl<ErrType: ErrTypeTraits> Toplevel<ErrType> {
}
};

if let Err(mpsc::error::SendError(e)) = error_sender.send(e) {
tracing::warn!("An error got dropped: {e:?}");
};
handle_dropped_error(error_sender.send(e));
});

let toplevel_subsys = root_handle.start_with_abs_name(
Expand Down Expand Up @@ -181,7 +179,12 @@ impl<ErrType: ErrTypeTraits> Toplevel<ErrType> {
);

match tokio::time::timeout(shutdown_timeout, self.toplevel_subsys.join()).await {
Ok(Ok(())) => {
Ok(result) => {
// An `Err` here would indicate a programming error,
// because the toplevel subsys doesn't catch any errors;
// it only forwards them.
assert!(result.is_ok());

let errors = collect_errors();
if errors.is_empty() {
tracing::info!("Shutdown finished.");
Expand All @@ -191,10 +194,6 @@ impl<ErrType: ErrTypeTraits> Toplevel<ErrType> {
Err(GracefulShutdownError::SubsystemsFailed(errors))
}
}
Ok(Err(_)) => {
// This can't happen because the toplevel subsys doesn't catch any errors; it only forwards them.
unreachable!();
}
Err(_) => {
tracing::error!("Shutdown timed out!");
Err(GracefulShutdownError::ShutdownTimeout(collect_errors()))
Expand Down
13 changes: 6 additions & 7 deletions src/utils/joiner_token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use std::{fmt::Debug, sync::Arc};

use tokio::sync::watch;

use crate::{errors::SubsystemError, ErrTypeTraits};
use crate::{
errors::{handle_unhandled_stopreason, SubsystemError},
ErrTypeTraits,
};

struct Inner<ErrType: ErrTypeTraits> {
counter: watch::Sender<(bool, u32)>,
Expand Down Expand Up @@ -67,9 +70,7 @@ impl<ErrType: ErrTypeTraits> JoinerToken<ErrType> {
(Self { inner }, weak_ref)
}

// Requires `mut` access to prevent children from being spawned
// while waiting
pub(crate) async fn join_children(&mut self) {
pub(crate) async fn join_children(&self) {
let mut subscriber = self.inner.counter.subscribe();

// Ignore errors; if the channel got closed, that definitely means
Expand Down Expand Up @@ -126,9 +127,7 @@ impl<ErrType: ErrTypeTraits> JoinerToken<ErrType> {
maybe_parent = parent.parent.as_ref();
}

if let Some(stop_reason) = maybe_stop_reason {
tracing::warn!("Unhandled stop reason: {:?}", stop_reason);
}
handle_unhandled_stopreason(maybe_stop_reason);
}

pub(crate) fn downgrade(self) -> JoinerTokenRef {
Expand Down
2 changes: 1 addition & 1 deletion src/utils/joiner_token/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ fn counters_weak() {
async fn join() {
let (superroot, _) = JoinerToken::<BoxedError>::new(|_| None);

let (mut root, _) = superroot.child_token(|_| None);
let (root, _) = superroot.child_token(|_| None);

let (child1, _) = root.child_token(|_| None);
let (child2, _) = child1.child_token(|_| None);
Expand Down
24 changes: 14 additions & 10 deletions src/utils/remote_drop_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,22 @@ impl<T> Drop for RemoteDrop<T> {
// Important: lock first, then read the offset.
let mut data = data.lock().unwrap();

if let Some(offset) = self.offset.upgrade() {
let offset = offset.load(Ordering::Acquire);
let offset = self
.offset
.upgrade()
.expect("Trying to delete non-existent item! Please report this.")
.load(Ordering::Acquire);

if let Some(last_item) = data.pop() {
if offset != data.len() {
// There must have been at least two items, and we are not at the end.
// So swap first before dropping.
let last_item = data
.pop()
.expect("Trying to delete non-existent item! Please report this.");

last_item.offset.store(offset, Ordering::Release);
data[offset] = last_item;
}
}
if offset != data.len() {
// There must have been at least two items, and we are not at the end.
// So swap first before dropping.

last_item.offset.store(offset, Ordering::Release);
data[offset] = last_item;
}
}
}
Expand Down
14 changes: 14 additions & 0 deletions src/utils/remote_drop_collection/tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,20 @@
use super::*;
use crate::{utils::JoinerToken, BoxedError};

#[test]
fn single_item() {
let items = RemotelyDroppableItems::new();

let (count1, _) = JoinerToken::<BoxedError>::new(|_| None);
assert_eq!(0, count1.count());

let token1 = items.insert(count1.child_token(|_| None));
assert_eq!(1, count1.count());

drop(token1);
assert_eq!(0, count1.count());
}

#[test]
fn insert_and_drop() {
let items = RemotelyDroppableItems::new();
Expand Down
Loading