Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase test coverage #67

Merged
merged 14 commits into from
Oct 22, 2023
Merged
21 changes: 18 additions & 3 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,21 @@ use std::sync::Arc;

use miette::Diagnostic;
use thiserror::Error;
use tokio::sync::mpsc;

use crate::ErrTypeTraits;

/// This enum contains all the possible errors that could be returned
/// by [`handle_shutdown_requests()`](crate::Toplevel::handle_shutdown_requests).
#[derive(Error, Debug, Diagnostic)]
#[derive(Debug, Error, Diagnostic)]
pub enum GracefulShutdownError<ErrType: ErrTypeTraits = crate::BoxedError> {
/// At least one subsystem caused an error.
#[error("at least one subsystem returned an error")]
#[diagnostic(code(graceful_shutdown::failed))]
#[error("at least one subsystem returned an error")]
SubsystemsFailed(#[related] Box<[SubsystemError<ErrType>]>),
/// The shutdown did not finish within the given timeout.
#[error("shutdown timed out")]
#[diagnostic(code(graceful_shutdown::timeout))]
#[error("shutdown timed out")]
ShutdownTimeout(#[related] Box<[SubsystemError<ErrType>]>),
}

Expand Down Expand Up @@ -124,7 +125,21 @@ impl<ErrType: ErrTypeTraits> SubsystemError<ErrType> {
/// [`cancel_on_shutdown()`](crate::FutureExt::cancel_on_shutdown).
#[derive(Error, Debug, Diagnostic)]
#[error("A shutdown request caused this task to be cancelled")]
#[diagnostic(code(graceful_shutdown::future::cancelled_by_shutdown))]
pub struct CancelledByShutdown;

// This function contains code that stems from the principle
// of defensive coding - meaning, handle potential errors
// gracefully, even if they should not happen.
// Therefore it is in this special function, so we don't
// get coverage problems.
pub(crate) fn handle_dropped_error<ErrType: ErrTypeTraits>(
result: Result<(), mpsc::error::SendError<ErrType>>,
) {
if let Err(mpsc::error::SendError(e)) = result {
tracing::warn!("An error got dropped: {e:?}");
}
}

#[cfg(test)]
mod tests;
45 changes: 36 additions & 9 deletions src/errors/tests.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
use tracing_test::traced_test;

use crate::BoxedError;

use super::*;

fn examine_report(report: miette::Report) {
fn examine_report(
error: impl miette::Diagnostic + std::error::Error + std::fmt::Debug + Sync + Send + 'static,
) {
println!("{}", error);
println!("{:?}", error);
println!("{:?}", error.source());
println!("{}", error.code().unwrap());
// Convert to report
let report: miette::Report = error.into();
println!("{}", report);
println!("{:?}", report);
// Convert to std::error::Error
Expand All @@ -13,14 +23,21 @@ fn examine_report(report: miette::Report) {

#[test]
fn errors_can_be_converted_to_diagnostic() {
examine_report(GracefulShutdownError::ShutdownTimeout::<BoxedError>(Box::new([])).into());
examine_report(GracefulShutdownError::SubsystemsFailed::<BoxedError>(Box::new([])).into());
examine_report(SubsystemJoinError::SubsystemsFailed::<BoxedError>(Arc::new([])).into());
examine_report(SubsystemError::Panicked::<BoxedError>("".into()).into());
examine_report(
SubsystemError::Failed::<BoxedError>("".into(), SubsystemFailure("".into())).into(),
);
examine_report(CancelledByShutdown.into());
examine_report(GracefulShutdownError::ShutdownTimeout::<BoxedError>(
Box::new([]),
));
examine_report(GracefulShutdownError::SubsystemsFailed::<BoxedError>(
Box::new([]),
));
examine_report(SubsystemJoinError::SubsystemsFailed::<BoxedError>(
Arc::new([]),
));
examine_report(SubsystemError::Panicked::<BoxedError>("".into()));
examine_report(SubsystemError::Failed::<BoxedError>(
"".into(),
SubsystemFailure("".into()),
));
examine_report(CancelledByShutdown);
}

#[test]
Expand Down Expand Up @@ -61,3 +78,13 @@ fn extract_contained_error_from_convert_subsystem_failure() {
assert_eq!(msg, *failure);
assert_eq!(msg, failure.into_error());
}

#[test]
#[traced_test]
fn handle_dropped_errors() {
handle_dropped_error(Err(mpsc::error::SendError(BoxedError::from(String::from(
"ABC",
)))));

assert!(logs_contain("An error got dropped: \"ABC\""));
}
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@
test(no_crate_inject, attr(deny(warnings))),
test(attr(allow(dead_code)))
)]
// Allows functions to be ignored by the coverage algorithm
#![cfg_attr(coverage_nightly, feature(coverage_attribute))]

type BoxedError = Box<dyn std::error::Error + Send + Sync + 'static>;

Expand Down
18 changes: 8 additions & 10 deletions src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,10 @@ async fn run_subsystem<Fut, Subsys, ErrType: ErrTypeTraits, Err>(
Ok(Ok(())) => None,
Ok(Err(e)) => Some(SubsystemError::Failed(name, SubsystemFailure(e))),
Err(e) => {
if e.is_panic() {
Some(SubsystemError::Panicked(name))
} else {
// Don't do anything in case of a cancellation;
// cancellations can't be forwarded (because the
// current function we are in will be cancelled
// simultaneously)
None
}
// We can assume that this is a panic, because a cancellation
// can never happen as long as we still hold `guard`.
assert!(e.is_panic());
Some(SubsystemError::Panicked(name))
}
};

Expand All @@ -95,7 +90,10 @@ async fn run_subsystem<Fut, Subsys, ErrType: ErrTypeTraits, Err>(
// It is still important that the handle does not leak out of the subsystem.
let subsystem_handle = match redirected_subsystem_handle.try_recv() {
Ok(s) => s,
Err(_) => panic!("The SubsystemHandle object must not be leaked out of the subsystem!"),
Err(_) => {
tracing::error!("The SubsystemHandle object must not be leaked out of the subsystem!");
panic!("The SubsystemHandle object must not be leaked out of the subsystem!");
}
};

// Raise potential errors
Expand Down
6 changes: 2 additions & 4 deletions src/subsystem/subsystem_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tokio::sync::{mpsc, oneshot};
use tokio_util::sync::CancellationToken;

use crate::{
errors::SubsystemError,
errors::{handle_dropped_error, SubsystemError},
runner::{AliveGuard, SubsystemRunner},
utils::{remote_drop_collection::RemotelyDroppableItems, JoinerToken},
BoxedError, ErrTypeTraits, ErrorAction, NestedSubsystem, SubsystemBuilder,
Expand Down Expand Up @@ -124,9 +124,7 @@ impl<ErrType: ErrTypeTraits> SubsystemHandle<ErrType> {
match error_action {
ErrorAction::Forward => Some(e),
ErrorAction::CatchAndLocalShutdown => {
if let Err(mpsc::error::SendError(e)) = error_sender.send(e) {
tracing::warn!("An error got dropped: {e:?}");
};
handle_dropped_error(error_sender.send(e));
cancellation_token.cancel();
None
}
Expand Down
17 changes: 8 additions & 9 deletions src/toplevel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;

use crate::{
errors::{GracefulShutdownError, SubsystemError},
errors::{handle_dropped_error, GracefulShutdownError, SubsystemError},
signal_handling::wait_for_signal,
subsystem::{self, ErrorActions},
BoxedError, ErrTypeTraits, ErrorAction, NestedSubsystem, SubsystemHandle,
Expand Down Expand Up @@ -74,9 +74,7 @@ impl<ErrType: ErrTypeTraits> Toplevel<ErrType> {
}
};

if let Err(mpsc::error::SendError(e)) = error_sender.send(e) {
tracing::warn!("An error got dropped: {e:?}");
};
handle_dropped_error(error_sender.send(e));
});

let toplevel_subsys = root_handle.start_with_abs_name(
Expand Down Expand Up @@ -181,7 +179,12 @@ impl<ErrType: ErrTypeTraits> Toplevel<ErrType> {
);

match tokio::time::timeout(shutdown_timeout, self.toplevel_subsys.join()).await {
Ok(Ok(())) => {
Ok(result) => {
// An `Err` here would indicate a programming error,
// because the toplevel subsys doesn't catch any errors;
// it only forwards them.
assert!(result.is_ok());

let errors = collect_errors();
if errors.is_empty() {
tracing::info!("Shutdown finished.");
Expand All @@ -191,10 +194,6 @@ impl<ErrType: ErrTypeTraits> Toplevel<ErrType> {
Err(GracefulShutdownError::SubsystemsFailed(errors))
}
}
Ok(Err(_)) => {
// This can't happen because the toplevel subsys doesn't catch any errors; it only forwards them.
unreachable!();
}
Err(_) => {
tracing::error!("Shutdown timed out!");
Err(GracefulShutdownError::ShutdownTimeout(collect_errors()))
Expand Down
8 changes: 4 additions & 4 deletions src/utils/remote_drop_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,19 +62,19 @@
// Important: lock first, then read the offset.
let mut data = data.lock().unwrap();

if let Some(offset) = self.offset.upgrade() {
self.offset.upgrade().map(|offset| {
let offset = offset.load(Ordering::Acquire);

if let Some(last_item) = data.pop() {
data.pop().map(|last_item| {
if offset != data.len() {
// There must have been at least two items, and we are not at the end.
// So swap first before dropping.

last_item.offset.store(offset, Ordering::Release);
data[offset] = last_item;
}
}
}
});
Fixed Show fixed Hide fixed
Fixed Show fixed Hide fixed
});
Fixed Show fixed Hide fixed
Fixed Show fixed Hide fixed
}
}
}
Expand Down
14 changes: 14 additions & 0 deletions src/utils/remote_drop_collection/tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,20 @@
use super::*;
use crate::{utils::JoinerToken, BoxedError};

#[test]
fn single_item() {
let items = RemotelyDroppableItems::new();

let (count1, _) = JoinerToken::<BoxedError>::new(|_| None);
assert_eq!(0, count1.count());

let token1 = items.insert(count1.child_token(|_| None));
assert_eq!(1, count1.count());

drop(token1);
assert_eq!(0, count1.count());
}

#[test]
fn insert_and_drop() {
let items = RemotelyDroppableItems::new();
Expand Down
44 changes: 44 additions & 0 deletions tests/integration_test_2.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use tokio::time::{sleep, Duration};
use tokio_graceful_shutdown::{SubsystemBuilder, SubsystemHandle, Toplevel};
use tracing_test::traced_test;

pub mod common;

use std::{
error::Error,
sync::{Arc, Mutex},
};

/// Wrapper function to simplify lambdas
type BoxedError = Box<dyn Error + Sync + Send>;
type BoxedResult = Result<(), BoxedError>;

#[tokio::test]
#[traced_test]
async fn leak_subsystem_handle() {
let subsys_ext: Arc<Mutex<Option<SubsystemHandle>>> = Default::default();
let subsys_ext2 = Arc::clone(&subsys_ext);

let subsystem = move |subsys: SubsystemHandle| async move {
subsys.on_shutdown_requested().await;

*subsys_ext2.lock().unwrap() = Some(subsys);

BoxedResult::Ok(())
};

let toplevel = Toplevel::new(move |s| async move {
s.start(SubsystemBuilder::new("subsys", subsystem));

sleep(Duration::from_millis(100)).await;
s.request_shutdown();
});

let result = toplevel
.handle_shutdown_requests(Duration::from_millis(100))
.await;
assert!(result.is_err());
assert!(logs_contain(
"The SubsystemHandle object must not be leaked out of the subsystem!"
));
}
Loading