diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index 60cb73f..e2b0f11 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -8,8 +8,8 @@ concurrency: group: coverage-${{ github.ref }} cancel-in-progress: true jobs: - coveralls: - name: Coveralls + coverage: + name: Codecov.io continue-on-error: true runs-on: ubuntu-latest env: @@ -25,14 +25,10 @@ jobs: uses: taiki-e/install-action@cargo-llvm-cov #- uses: Swatinem/rust-cache@v1 - name: Compute Coverage - env: - RUST_LOG: "debug" run: - cargo llvm-cov - --all-features --workspace - --lcov --output-path lcov.info - - name: Upload to Coveralls - uses: coverallsapp/github-action@master + cargo llvm-cov --all-features --workspace --ignore-filename-regex tests.rs --codecov --output-path codecov.json + - name: Upload coverage to Codecov + uses: codecov/codecov-action@v3 with: - path-to-lcov: lcov.info - github-token: ${{ secrets.github_token }} + files: codecov.json + fail_ci_if_error: true diff --git a/Cargo.toml b/Cargo.toml index 07e8143..f3de681 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,7 +46,7 @@ miette = { version = "5.10.0", features = ["fancy"] } # Logging tracing-subscriber = "0.3.17" -tracing-test = "0.2.4" +tracing-test = { version = "0.2.4", features = ["no-env-filter"] } # Tokio tokio = { version = "1.32.0", features = ["full"] } diff --git a/README.md b/README.md index c3ff462..a1a0d33 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ [![License](https://img.shields.io/crates/l/tokio-graceful-shutdown)](https://github.com/Finomnis/tokio-graceful-shutdown/blob/main/LICENSE-MIT) [![Build Status](https://img.shields.io/github/actions/workflow/status/Finomnis/tokio-graceful-shutdown/ci.yml?branch=main)](https://github.com/Finomnis/tokio-graceful-shutdown/actions/workflows/ci.yml?query=branch%3Amain) [![docs.rs](https://img.shields.io/docsrs/tokio-graceful-shutdown)](https://docs.rs/tokio-graceful-shutdown) -[![Coverage Status](https://img.shields.io/coveralls/github/Finomnis/tokio-graceful-shutdown/main)](https://coveralls.io/github/Finomnis/tokio-graceful-shutdown?branch=main) +[![Coverage Status](https://img.shields.io/codecov/c/github/Finomnis/tokio-graceful-shutdown)](https://app.codecov.io/github/Finomnis/tokio-graceful-shutdown/tree/main) This crate provides utility functions to perform a graceful shutdown on tokio-rs based services. diff --git a/src/error_action.rs b/src/error_action.rs index 63a37ba..5155087 100644 --- a/src/error_action.rs +++ b/src/error_action.rs @@ -24,15 +24,3 @@ pub enum ErrorAction { /// Do not forward the error to the parent subsystem. CatchAndLocalShutdown, } - -#[cfg(test)] -mod tests { - use super::*; - #[test] - fn derive_traits() { - let x = ErrorAction::CatchAndLocalShutdown; - #[allow(clippy::clone_on_copy)] - let y = x.clone(); - assert!(y == x); - } -} diff --git a/src/errors.rs b/src/errors.rs index 65871d7..693ec98 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -127,70 +127,4 @@ impl SubsystemError { pub struct CancelledByShutdown; #[cfg(test)] -mod tests { - use crate::BoxedError; - - use super::*; - - fn examine_report(report: miette::Report) { - println!("{}", report); - println!("{:?}", report); - // Convert to std::error::Error - let boxed_error: BoxedError = report.into(); - println!("{}", boxed_error); - println!("{:?}", boxed_error); - } - - #[test] - fn errors_can_be_converted_to_diagnostic() { - examine_report(GracefulShutdownError::ShutdownTimeout::(Box::new([])).into()); - examine_report(GracefulShutdownError::SubsystemsFailed::(Box::new([])).into()); - examine_report(SubsystemJoinError::SubsystemsFailed::(Arc::new([])).into()); - examine_report(SubsystemError::Panicked::("".into()).into()); - examine_report( - SubsystemError::Failed::("".into(), SubsystemFailure("".into())).into(), - ); - examine_report(CancelledByShutdown.into()); - } - - #[test] - fn extract_related_from_graceful_shutdown_error() { - let related = || { - Box::new([ - SubsystemError::Failed("a".into(), SubsystemFailure(String::from("A").into())), - SubsystemError::Panicked("b".into()), - ]) - }; - - let matches_related = |data: &[SubsystemError]| { - let mut iter = data.iter(); - - let elem = iter.next().unwrap(); - assert_eq!(elem.name(), "a"); - assert!(matches!(elem, SubsystemError::Failed(_, _))); - - let elem = iter.next().unwrap(); - assert_eq!(elem.name(), "b"); - assert!(matches!(elem, SubsystemError::Panicked(_))); - - assert!(iter.next().is_none()); - }; - - matches_related(GracefulShutdownError::ShutdownTimeout(related()).get_subsystem_errors()); - matches_related(GracefulShutdownError::SubsystemsFailed(related()).get_subsystem_errors()); - matches_related(&GracefulShutdownError::ShutdownTimeout(related()).into_subsystem_errors()); - matches_related( - &GracefulShutdownError::SubsystemsFailed(related()).into_subsystem_errors(), - ); - } - - #[test] - fn extract_contained_error_from_convert_subsystem_failure() { - let msg = "MyFailure".to_string(); - let failure = SubsystemFailure(msg.clone()); - - assert_eq!(&msg, failure.get_error()); - assert_eq!(msg, *failure); - assert_eq!(msg, failure.into_error()); - } -} +mod tests; diff --git a/src/errors/tests.rs b/src/errors/tests.rs new file mode 100644 index 0000000..ac86457 --- /dev/null +++ b/src/errors/tests.rs @@ -0,0 +1,63 @@ +use crate::BoxedError; + +use super::*; + +fn examine_report(report: miette::Report) { + println!("{}", report); + println!("{:?}", report); + // Convert to std::error::Error + let boxed_error: BoxedError = report.into(); + println!("{}", boxed_error); + println!("{:?}", boxed_error); +} + +#[test] +fn errors_can_be_converted_to_diagnostic() { + examine_report(GracefulShutdownError::ShutdownTimeout::(Box::new([])).into()); + examine_report(GracefulShutdownError::SubsystemsFailed::(Box::new([])).into()); + examine_report(SubsystemJoinError::SubsystemsFailed::(Arc::new([])).into()); + examine_report(SubsystemError::Panicked::("".into()).into()); + examine_report( + SubsystemError::Failed::("".into(), SubsystemFailure("".into())).into(), + ); + examine_report(CancelledByShutdown.into()); +} + +#[test] +fn extract_related_from_graceful_shutdown_error() { + let related = || { + Box::new([ + SubsystemError::Failed("a".into(), SubsystemFailure(String::from("A").into())), + SubsystemError::Panicked("b".into()), + ]) + }; + + let matches_related = |data: &[SubsystemError]| { + let mut iter = data.iter(); + + let elem = iter.next().unwrap(); + assert_eq!(elem.name(), "a"); + assert!(matches!(elem, SubsystemError::Failed(_, _))); + + let elem = iter.next().unwrap(); + assert_eq!(elem.name(), "b"); + assert!(matches!(elem, SubsystemError::Panicked(_))); + + assert!(iter.next().is_none()); + }; + + matches_related(GracefulShutdownError::ShutdownTimeout(related()).get_subsystem_errors()); + matches_related(GracefulShutdownError::SubsystemsFailed(related()).get_subsystem_errors()); + matches_related(&GracefulShutdownError::ShutdownTimeout(related()).into_subsystem_errors()); + matches_related(&GracefulShutdownError::SubsystemsFailed(related()).into_subsystem_errors()); +} + +#[test] +fn extract_contained_error_from_convert_subsystem_failure() { + let msg = "MyFailure".to_string(); + let failure = SubsystemFailure(msg.clone()); + + assert_eq!(&msg, failure.get_error()); + assert_eq!(msg, *failure); + assert_eq!(msg, failure.into_error()); +} diff --git a/src/runner/alive_guard.rs b/src/runner/alive_guard.rs index 00cfddf..0efbca5 100644 --- a/src/runner/alive_guard.rs +++ b/src/runner/alive_guard.rs @@ -59,62 +59,4 @@ impl Drop for Inner { } #[cfg(test)] -mod tests { - - use std::sync::atomic::{AtomicU32, Ordering}; - - use super::*; - - #[test] - fn finished_callback() { - let alive_guard = AliveGuard::new(); - - let counter = Arc::new(AtomicU32::new(0)); - let counter2 = Arc::clone(&counter); - - alive_guard.on_finished(move || { - counter2.fetch_add(1, Ordering::Relaxed); - }); - - drop(alive_guard); - - assert_eq!(counter.load(Ordering::Relaxed), 1); - } - - #[test] - fn cancel_callback() { - let alive_guard = AliveGuard::new(); - - let counter = Arc::new(AtomicU32::new(0)); - let counter2 = Arc::clone(&counter); - - alive_guard.on_finished(|| {}); - alive_guard.on_cancel(move || { - counter2.fetch_add(1, Ordering::Relaxed); - }); - - drop(alive_guard); - - assert_eq!(counter.load(Ordering::Relaxed), 1); - } - - #[test] - fn both_callbacks() { - let alive_guard = AliveGuard::new(); - - let counter = Arc::new(AtomicU32::new(0)); - let counter2 = Arc::clone(&counter); - let counter3 = Arc::clone(&counter); - - alive_guard.on_finished(move || { - counter2.fetch_add(1, Ordering::Relaxed); - }); - alive_guard.on_cancel(move || { - counter3.fetch_add(1, Ordering::Relaxed); - }); - - drop(alive_guard); - - assert_eq!(counter.load(Ordering::Relaxed), 2); - } -} +mod tests; diff --git a/src/runner/alive_guard/tests.rs b/src/runner/alive_guard/tests.rs new file mode 100644 index 0000000..4542951 --- /dev/null +++ b/src/runner/alive_guard/tests.rs @@ -0,0 +1,69 @@ +use std::sync::atomic::{AtomicU32, Ordering}; +use tracing_test::traced_test; + +use super::*; + +#[test] +#[traced_test] +fn finished_callback() { + let alive_guard = AliveGuard::new(); + + let counter = Arc::new(AtomicU32::new(0)); + let counter2 = Arc::clone(&counter); + + alive_guard.on_finished(move || { + counter2.fetch_add(1, Ordering::Relaxed); + }); + + drop(alive_guard); + + assert_eq!(counter.load(Ordering::Relaxed), 1); +} + +#[test] +#[traced_test] +fn cancel_callback() { + let alive_guard = AliveGuard::new(); + + let counter = Arc::new(AtomicU32::new(0)); + let counter2 = Arc::clone(&counter); + + alive_guard.on_finished(|| {}); + alive_guard.on_cancel(move || { + counter2.fetch_add(1, Ordering::Relaxed); + }); + + drop(alive_guard); + + assert_eq!(counter.load(Ordering::Relaxed), 1); +} + +#[test] +#[traced_test] +fn both_callbacks() { + let alive_guard = AliveGuard::new(); + + let counter = Arc::new(AtomicU32::new(0)); + let counter2 = Arc::clone(&counter); + let counter3 = Arc::clone(&counter); + + alive_guard.on_finished(move || { + counter2.fetch_add(1, Ordering::Relaxed); + }); + alive_guard.on_cancel(move || { + counter3.fetch_add(1, Ordering::Relaxed); + }); + + drop(alive_guard); + + assert_eq!(counter.load(Ordering::Relaxed), 2); +} + +#[test] +#[traced_test] +fn no_callback() { + let alive_guard = AliveGuard::new(); + drop(alive_guard); + + assert!(logs_contain("No `finished` callback was registered in AliveGuard! This should not happen, please report this at https://github.com/Finomnis/tokio-graceful-shutdown/issues.")); +} diff --git a/src/subsystem/error_collector.rs b/src/subsystem/error_collector.rs index 93208b0..9a16956 100644 --- a/src/subsystem/error_collector.rs +++ b/src/subsystem/error_collector.rs @@ -41,3 +41,6 @@ impl Drop for ErrorCollector { } } } + +#[cfg(test)] +mod tests; diff --git a/src/subsystem/error_collector/tests.rs b/src/subsystem/error_collector/tests.rs new file mode 100644 index 0000000..1e0e354 --- /dev/null +++ b/src/subsystem/error_collector/tests.rs @@ -0,0 +1,68 @@ +use tracing_test::traced_test; + +use super::*; + +#[test] +#[traced_test] +fn normal() { + let (sender, receiver) = mpsc::unbounded_channel(); + let mut error_collector = ErrorCollector::::new(receiver); + + sender + .send(SubsystemError::Panicked(Arc::from("ABC"))) + .unwrap(); + sender + .send(SubsystemError::Panicked(Arc::from("def"))) + .unwrap(); + + let received = error_collector.finish(); + assert_eq!( + received.iter().map(|e| e.name()).collect::>(), + vec!["ABC", "def"] + ); +} + +#[test] +#[traced_test] +fn double_finish() { + let (sender, receiver) = mpsc::unbounded_channel(); + let mut error_collector = ErrorCollector::::new(receiver); + + sender + .send(SubsystemError::Panicked(Arc::from("ABC"))) + .unwrap(); + sender + .send(SubsystemError::Panicked(Arc::from("def"))) + .unwrap(); + + let received = error_collector.finish(); + assert_eq!( + received.iter().map(|e| e.name()).collect::>(), + vec!["ABC", "def"] + ); + + let received = error_collector.finish(); + assert_eq!( + received.iter().map(|e| e.name()).collect::>(), + vec!["ABC", "def"] + ); +} + +#[test] +#[traced_test] +fn no_finish() { + let (sender, receiver) = mpsc::unbounded_channel(); + let error_collector = ErrorCollector::::new(receiver); + + sender + .send(SubsystemError::Panicked(Arc::from("ABC"))) + .unwrap(); + sender + .send(SubsystemError::Panicked(Arc::from("def"))) + .unwrap(); + + drop(error_collector); + + assert!(logs_contain("An error got dropped: Panicked(\"ABC\")")); + assert!(logs_contain("An error got dropped: Panicked(\"def\")")); +} diff --git a/src/subsystem/subsystem_handle.rs b/src/subsystem/subsystem_handle.rs index ede0fbf..52a5f2f 100644 --- a/src/subsystem/subsystem_handle.rs +++ b/src/subsystem/subsystem_handle.rs @@ -354,77 +354,4 @@ pub(crate) fn root_handle( } #[cfg(test)] -mod tests { - - use tokio::time::{sleep, timeout, Duration}; - - use super::*; - use crate::subsystem::SubsystemBuilder; - - #[tokio::test] - async fn recursive_cancellation() { - let root_handle = root_handle::(|_| {}); - - let (drop_sender, mut drop_receiver) = tokio::sync::mpsc::channel::<()>(1); - - root_handle.start(SubsystemBuilder::new("", |_| async move { - drop_sender.send(()).await.unwrap(); - std::future::pending::>().await - })); - - // Make sure we are executing the subsystem - let recv_result = timeout(Duration::from_millis(100), drop_receiver.recv()) - .await - .unwrap(); - assert!(recv_result.is_some()); - - drop(root_handle); - - // Make sure the subsystem got cancelled - let recv_result = timeout(Duration::from_millis(100), drop_receiver.recv()) - .await - .unwrap(); - assert!(recv_result.is_none()); - } - - #[tokio::test] - async fn recursive_cancellation_2() { - let root_handle = root_handle(|_| {}); - - let (drop_sender, mut drop_receiver) = tokio::sync::mpsc::channel::<()>(1); - - let subsys2 = |_| async move { - drop_sender.send(()).await.unwrap(); - std::future::pending::>().await - }; - - let subsys = |x: SubsystemHandle| async move { - x.start(SubsystemBuilder::new("", subsys2)); - - Result::<(), BoxedError>::Ok(()) - }; - - root_handle.start(SubsystemBuilder::new("", subsys)); - - // Make sure we are executing the subsystem - let recv_result = timeout(Duration::from_millis(100), drop_receiver.recv()) - .await - .unwrap(); - assert!(recv_result.is_some()); - - // Make sure the grandchild is still running - sleep(Duration::from_millis(100)).await; - assert!(matches!( - drop_receiver.try_recv(), - Err(tokio::sync::mpsc::error::TryRecvError::Empty) - )); - - drop(root_handle); - - // Make sure the subsystem got cancelled - let recv_result = timeout(Duration::from_millis(100), drop_receiver.recv()) - .await - .unwrap(); - assert!(recv_result.is_none()); - } -} +mod tests; diff --git a/src/subsystem/subsystem_handle/tests.rs b/src/subsystem/subsystem_handle/tests.rs new file mode 100644 index 0000000..35acbf4 --- /dev/null +++ b/src/subsystem/subsystem_handle/tests.rs @@ -0,0 +1,71 @@ +use tokio::time::{sleep, timeout, Duration}; + +use super::*; +use crate::subsystem::SubsystemBuilder; + +#[tokio::test] +async fn recursive_cancellation() { + let root_handle = root_handle::(|_| {}); + + let (drop_sender, mut drop_receiver) = tokio::sync::mpsc::channel::<()>(1); + + root_handle.start(SubsystemBuilder::new("", |_| async move { + drop_sender.send(()).await.unwrap(); + std::future::pending::>().await + })); + + // Make sure we are executing the subsystem + let recv_result = timeout(Duration::from_millis(100), drop_receiver.recv()) + .await + .unwrap(); + assert!(recv_result.is_some()); + + drop(root_handle); + + // Make sure the subsystem got cancelled + let recv_result = timeout(Duration::from_millis(100), drop_receiver.recv()) + .await + .unwrap(); + assert!(recv_result.is_none()); +} + +#[tokio::test] +async fn recursive_cancellation_2() { + let root_handle = root_handle(|_| {}); + + let (drop_sender, mut drop_receiver) = tokio::sync::mpsc::channel::<()>(1); + + let subsys2 = |_| async move { + drop_sender.send(()).await.unwrap(); + std::future::pending::>().await + }; + + let subsys = |x: SubsystemHandle| async move { + x.start(SubsystemBuilder::new("", subsys2)); + + Result::<(), BoxedError>::Ok(()) + }; + + root_handle.start(SubsystemBuilder::new("", subsys)); + + // Make sure we are executing the subsystem + let recv_result = timeout(Duration::from_millis(100), drop_receiver.recv()) + .await + .unwrap(); + assert!(recv_result.is_some()); + + // Make sure the grandchild is still running + sleep(Duration::from_millis(100)).await; + assert!(matches!( + drop_receiver.try_recv(), + Err(tokio::sync::mpsc::error::TryRecvError::Empty) + )); + + drop(root_handle); + + // Make sure the subsystem got cancelled + let recv_result = timeout(Duration::from_millis(100), drop_receiver.recv()) + .await + .unwrap(); + assert!(recv_result.is_none()); +} diff --git a/src/utils/joiner_token.rs b/src/utils/joiner_token.rs index e792d8e..6b672ea 100644 --- a/src/utils/joiner_token.rs +++ b/src/utils/joiner_token.rs @@ -177,236 +177,4 @@ impl Drop for JoinerToken { } #[cfg(test)] -mod tests { - use tokio::time::{sleep, timeout, Duration}; - use tracing_test::traced_test; - - use crate::BoxedError; - - use super::*; - - #[test] - #[traced_test] - fn counters() { - let (root, _) = JoinerToken::::new(|_| None); - assert_eq!(0, root.count()); - - let (child1, _) = root.child_token(|_| None); - assert_eq!(1, root.count()); - assert_eq!(0, child1.count()); - - let (child2, _) = child1.child_token(|_| None); - assert_eq!(2, root.count()); - assert_eq!(1, child1.count()); - assert_eq!(0, child2.count()); - - let (child3, _) = child1.child_token(|_| None); - assert_eq!(3, root.count()); - assert_eq!(2, child1.count()); - assert_eq!(0, child2.count()); - assert_eq!(0, child3.count()); - - drop(child1); - assert_eq!(2, root.count()); - assert_eq!(0, child2.count()); - assert_eq!(0, child3.count()); - - drop(child2); - assert_eq!(1, root.count()); - assert_eq!(0, child3.count()); - - drop(child3); - assert_eq!(0, root.count()); - } - - #[test] - #[traced_test] - fn counters_weak() { - let (root, weak_root) = JoinerToken::::new(|_| None); - assert_eq!(0, weak_root.count()); - assert!(weak_root.alive()); - - let (child1, weak_child1) = root.child_token(|_| None); - assert_eq!(1, weak_root.count()); - assert!(weak_root.alive()); - assert_eq!(0, weak_child1.count()); - assert!(weak_child1.alive()); - - let (child2, weak_child2) = child1.child_token(|_| None); - assert_eq!(2, weak_root.count()); - assert!(weak_root.alive()); - assert_eq!(1, weak_child1.count()); - assert!(weak_child1.alive()); - assert_eq!(0, weak_child2.count()); - assert!(weak_child2.alive()); - - let (child3, weak_child3) = child1.child_token(|_| None); - assert_eq!(3, weak_root.count()); - assert!(weak_root.alive()); - assert_eq!(2, weak_child1.count()); - assert!(weak_child1.alive()); - assert_eq!(0, weak_child2.count()); - assert!(weak_child2.alive()); - assert_eq!(0, weak_child3.count()); - assert!(weak_child3.alive()); - - drop(child1); - assert_eq!(2, weak_root.count()); - assert!(weak_root.alive()); - assert_eq!(2, weak_child1.count()); - assert!(!weak_child1.alive()); - assert_eq!(0, weak_child2.count()); - assert!(weak_child2.alive()); - assert_eq!(0, weak_child3.count()); - assert!(weak_child3.alive()); - - drop(child2); - assert_eq!(1, weak_root.count()); - assert!(weak_root.alive()); - assert_eq!(1, weak_child1.count()); - assert!(!weak_child1.alive()); - assert_eq!(0, weak_child2.count()); - assert!(!weak_child2.alive()); - assert_eq!(0, weak_child3.count()); - assert!(weak_child3.alive()); - - drop(child3); - assert_eq!(0, weak_root.count()); - assert!(weak_root.alive()); - assert_eq!(0, weak_child1.count()); - assert!(!weak_child1.alive()); - assert_eq!(0, weak_child2.count()); - assert!(!weak_child2.alive()); - assert_eq!(0, weak_child3.count()); - assert!(!weak_child3.alive()); - - drop(root); - assert_eq!(0, weak_root.count()); - assert!(!weak_root.alive()); - assert_eq!(0, weak_child1.count()); - assert!(!weak_child1.alive()); - assert_eq!(0, weak_child2.count()); - assert!(!weak_child2.alive()); - assert_eq!(0, weak_child3.count()); - assert!(!weak_child3.alive()); - } - - #[tokio::test] - #[traced_test] - async fn join() { - let (superroot, _) = JoinerToken::::new(|_| None); - - let (mut root, _) = superroot.child_token(|_| None); - - let (child1, _) = root.child_token(|_| None); - let (child2, _) = child1.child_token(|_| None); - let (child3, _) = child1.child_token(|_| None); - - let (set_finished, mut finished) = tokio::sync::oneshot::channel(); - tokio::join!( - async { - timeout(Duration::from_millis(500), root.join_children()) - .await - .unwrap(); - set_finished.send(root.count()).unwrap(); - }, - async { - sleep(Duration::from_millis(50)).await; - assert!(finished.try_recv().is_err()); - - drop(child1); - sleep(Duration::from_millis(50)).await; - assert!(finished.try_recv().is_err()); - - drop(child2); - sleep(Duration::from_millis(50)).await; - assert!(finished.try_recv().is_err()); - - drop(child3); - sleep(Duration::from_millis(50)).await; - let count = timeout(Duration::from_millis(50), finished) - .await - .unwrap() - .unwrap(); - assert_eq!(count, 0); - } - ); - } - - #[tokio::test] - #[traced_test] - async fn join_through_ref() { - let (root, joiner) = JoinerToken::::new(|_| None); - - let (child1, _) = root.child_token(|_| None); - let (child2, _) = child1.child_token(|_| None); - - let (set_finished, mut finished) = tokio::sync::oneshot::channel(); - tokio::join!( - async { - timeout(Duration::from_millis(500), joiner.join()) - .await - .unwrap(); - set_finished.send(()).unwrap(); - }, - async { - sleep(Duration::from_millis(50)).await; - assert!(finished.try_recv().is_err()); - - drop(child1); - sleep(Duration::from_millis(50)).await; - assert!(finished.try_recv().is_err()); - - drop(root); - sleep(Duration::from_millis(50)).await; - assert!(finished.try_recv().is_err()); - - drop(child2); - sleep(Duration::from_millis(50)).await; - timeout(Duration::from_millis(50), finished) - .await - .unwrap() - .unwrap(); - } - ); - } - - #[test] - fn debug_print() { - let (root, _) = JoinerToken::::new(|_| None); - assert_eq!(format!("{:?}", root), "JoinerToken(children = 0)"); - - let (child1, _) = root.child_token(|_| None); - assert_eq!(format!("{:?}", root), "JoinerToken(children = 1)"); - - let (_child2, _) = child1.child_token(|_| None); - assert_eq!(format!("{:?}", root), "JoinerToken(children = 2)"); - } - - #[test] - fn debug_print_ref() { - let (root, root_ref) = JoinerToken::::new(|_| None); - assert_eq!( - format!("{:?}", root_ref), - "JoinerTokenRef(alive = true, children = 0)" - ); - - let (child1, _) = root.child_token(|_| None); - assert_eq!( - format!("{:?}", root_ref), - "JoinerTokenRef(alive = true, children = 1)" - ); - - drop(root); - assert_eq!( - format!("{:?}", root_ref), - "JoinerTokenRef(alive = false, children = 1)" - ); - - drop(child1); - assert_eq!( - format!("{:?}", root_ref), - "JoinerTokenRef(alive = false, children = 0)" - ); - } -} +mod tests; diff --git a/src/utils/joiner_token/tests.rs b/src/utils/joiner_token/tests.rs new file mode 100644 index 0000000..03ba7c9 --- /dev/null +++ b/src/utils/joiner_token/tests.rs @@ -0,0 +1,231 @@ +use tokio::time::{sleep, timeout, Duration}; +use tracing_test::traced_test; + +use crate::BoxedError; + +use super::*; + +#[test] +#[traced_test] +fn counters() { + let (root, _) = JoinerToken::::new(|_| None); + assert_eq!(0, root.count()); + + let (child1, _) = root.child_token(|_| None); + assert_eq!(1, root.count()); + assert_eq!(0, child1.count()); + + let (child2, _) = child1.child_token(|_| None); + assert_eq!(2, root.count()); + assert_eq!(1, child1.count()); + assert_eq!(0, child2.count()); + + let (child3, _) = child1.child_token(|_| None); + assert_eq!(3, root.count()); + assert_eq!(2, child1.count()); + assert_eq!(0, child2.count()); + assert_eq!(0, child3.count()); + + drop(child1); + assert_eq!(2, root.count()); + assert_eq!(0, child2.count()); + assert_eq!(0, child3.count()); + + drop(child2); + assert_eq!(1, root.count()); + assert_eq!(0, child3.count()); + + drop(child3); + assert_eq!(0, root.count()); +} + +#[test] +#[traced_test] +fn counters_weak() { + let (root, weak_root) = JoinerToken::::new(|_| None); + assert_eq!(0, weak_root.count()); + assert!(weak_root.alive()); + + let (child1, weak_child1) = root.child_token(|_| None); + assert_eq!(1, weak_root.count()); + assert!(weak_root.alive()); + assert_eq!(0, weak_child1.count()); + assert!(weak_child1.alive()); + + let (child2, weak_child2) = child1.child_token(|_| None); + assert_eq!(2, weak_root.count()); + assert!(weak_root.alive()); + assert_eq!(1, weak_child1.count()); + assert!(weak_child1.alive()); + assert_eq!(0, weak_child2.count()); + assert!(weak_child2.alive()); + + let (child3, weak_child3) = child1.child_token(|_| None); + assert_eq!(3, weak_root.count()); + assert!(weak_root.alive()); + assert_eq!(2, weak_child1.count()); + assert!(weak_child1.alive()); + assert_eq!(0, weak_child2.count()); + assert!(weak_child2.alive()); + assert_eq!(0, weak_child3.count()); + assert!(weak_child3.alive()); + + drop(child1); + assert_eq!(2, weak_root.count()); + assert!(weak_root.alive()); + assert_eq!(2, weak_child1.count()); + assert!(!weak_child1.alive()); + assert_eq!(0, weak_child2.count()); + assert!(weak_child2.alive()); + assert_eq!(0, weak_child3.count()); + assert!(weak_child3.alive()); + + drop(child2); + assert_eq!(1, weak_root.count()); + assert!(weak_root.alive()); + assert_eq!(1, weak_child1.count()); + assert!(!weak_child1.alive()); + assert_eq!(0, weak_child2.count()); + assert!(!weak_child2.alive()); + assert_eq!(0, weak_child3.count()); + assert!(weak_child3.alive()); + + drop(child3); + assert_eq!(0, weak_root.count()); + assert!(weak_root.alive()); + assert_eq!(0, weak_child1.count()); + assert!(!weak_child1.alive()); + assert_eq!(0, weak_child2.count()); + assert!(!weak_child2.alive()); + assert_eq!(0, weak_child3.count()); + assert!(!weak_child3.alive()); + + drop(root); + assert_eq!(0, weak_root.count()); + assert!(!weak_root.alive()); + assert_eq!(0, weak_child1.count()); + assert!(!weak_child1.alive()); + assert_eq!(0, weak_child2.count()); + assert!(!weak_child2.alive()); + assert_eq!(0, weak_child3.count()); + assert!(!weak_child3.alive()); +} + +#[tokio::test] +#[traced_test] +async fn join() { + let (superroot, _) = JoinerToken::::new(|_| None); + + let (mut root, _) = superroot.child_token(|_| None); + + let (child1, _) = root.child_token(|_| None); + let (child2, _) = child1.child_token(|_| None); + let (child3, _) = child1.child_token(|_| None); + + let (set_finished, mut finished) = tokio::sync::oneshot::channel(); + tokio::join!( + async { + timeout(Duration::from_millis(500), root.join_children()) + .await + .unwrap(); + set_finished.send(root.count()).unwrap(); + }, + async { + sleep(Duration::from_millis(50)).await; + assert!(finished.try_recv().is_err()); + + drop(child1); + sleep(Duration::from_millis(50)).await; + assert!(finished.try_recv().is_err()); + + drop(child2); + sleep(Duration::from_millis(50)).await; + assert!(finished.try_recv().is_err()); + + drop(child3); + sleep(Duration::from_millis(50)).await; + let count = timeout(Duration::from_millis(50), finished) + .await + .unwrap() + .unwrap(); + assert_eq!(count, 0); + } + ); +} + +#[tokio::test] +#[traced_test] +async fn join_through_ref() { + let (root, joiner) = JoinerToken::::new(|_| None); + + let (child1, _) = root.child_token(|_| None); + let (child2, _) = child1.child_token(|_| None); + + let (set_finished, mut finished) = tokio::sync::oneshot::channel(); + tokio::join!( + async { + timeout(Duration::from_millis(500), joiner.join()) + .await + .unwrap(); + set_finished.send(()).unwrap(); + }, + async { + sleep(Duration::from_millis(50)).await; + assert!(finished.try_recv().is_err()); + + drop(child1); + sleep(Duration::from_millis(50)).await; + assert!(finished.try_recv().is_err()); + + drop(root); + sleep(Duration::from_millis(50)).await; + assert!(finished.try_recv().is_err()); + + drop(child2); + sleep(Duration::from_millis(50)).await; + timeout(Duration::from_millis(50), finished) + .await + .unwrap() + .unwrap(); + } + ); +} + +#[test] +fn debug_print() { + let (root, _) = JoinerToken::::new(|_| None); + assert_eq!(format!("{:?}", root), "JoinerToken(children = 0)"); + + let (child1, _) = root.child_token(|_| None); + assert_eq!(format!("{:?}", root), "JoinerToken(children = 1)"); + + let (_child2, _) = child1.child_token(|_| None); + assert_eq!(format!("{:?}", root), "JoinerToken(children = 2)"); +} + +#[test] +fn debug_print_ref() { + let (root, root_ref) = JoinerToken::::new(|_| None); + assert_eq!( + format!("{:?}", root_ref), + "JoinerTokenRef(alive = true, children = 0)" + ); + + let (child1, _) = root.child_token(|_| None); + assert_eq!( + format!("{:?}", root_ref), + "JoinerTokenRef(alive = true, children = 1)" + ); + + drop(root); + assert_eq!( + format!("{:?}", root_ref), + "JoinerTokenRef(alive = false, children = 1)" + ); + + drop(child1); + assert_eq!( + format!("{:?}", root_ref), + "JoinerTokenRef(alive = false, children = 0)" + ); +} diff --git a/src/utils/remote_drop_collection.rs b/src/utils/remote_drop_collection.rs index 56cfc5a..ad88612 100644 --- a/src/utils/remote_drop_collection.rs +++ b/src/utils/remote_drop_collection.rs @@ -80,78 +80,4 @@ impl Drop for RemoteDrop { } #[cfg(test)] -mod tests { - - use super::*; - use crate::{utils::JoinerToken, BoxedError}; - - #[test] - fn insert_and_drop() { - let items = RemotelyDroppableItems::new(); - - let (count1, _) = JoinerToken::::new(|_| None); - let (count2, _) = JoinerToken::::new(|_| None); - - assert_eq!(0, count1.count()); - assert_eq!(0, count2.count()); - - let _token1 = items.insert(count1.child_token(|_| None)); - assert_eq!(1, count1.count()); - assert_eq!(0, count2.count()); - - let _token2 = items.insert(count2.child_token(|_| None)); - assert_eq!(1, count1.count()); - assert_eq!(1, count2.count()); - - drop(items); - assert_eq!(0, count1.count()); - assert_eq!(0, count2.count()); - } - - #[test] - fn drop_token() { - let items = RemotelyDroppableItems::new(); - - let (count1, _) = JoinerToken::::new(|_| None); - let (count2, _) = JoinerToken::::new(|_| None); - let (count3, _) = JoinerToken::::new(|_| None); - let (count4, _) = JoinerToken::::new(|_| None); - - let token1 = items.insert(count1.child_token(|_| None)); - let token2 = items.insert(count2.child_token(|_| None)); - let token3 = items.insert(count3.child_token(|_| None)); - let token4 = items.insert(count4.child_token(|_| None)); - assert_eq!(1, count1.count()); - assert_eq!(1, count2.count()); - assert_eq!(1, count3.count()); - assert_eq!(1, count4.count()); - - // Last item - drop(token4); - assert_eq!(1, count1.count()); - assert_eq!(1, count2.count()); - assert_eq!(1, count3.count()); - assert_eq!(0, count4.count()); - - // Middle item - drop(token2); - assert_eq!(1, count1.count()); - assert_eq!(0, count2.count()); - assert_eq!(1, count3.count()); - assert_eq!(0, count4.count()); - - // First item - drop(token1); - assert_eq!(0, count1.count()); - assert_eq!(0, count2.count()); - assert_eq!(1, count3.count()); - assert_eq!(0, count4.count()); - - // Only item - drop(token3); - assert_eq!(0, count1.count()); - assert_eq!(0, count2.count()); - assert_eq!(0, count3.count()); - assert_eq!(0, count4.count()); - } -} +mod tests; diff --git a/src/utils/remote_drop_collection/tests.rs b/src/utils/remote_drop_collection/tests.rs new file mode 100644 index 0000000..45a194b --- /dev/null +++ b/src/utils/remote_drop_collection/tests.rs @@ -0,0 +1,72 @@ +use super::*; +use crate::{utils::JoinerToken, BoxedError}; + +#[test] +fn insert_and_drop() { + let items = RemotelyDroppableItems::new(); + + let (count1, _) = JoinerToken::::new(|_| None); + let (count2, _) = JoinerToken::::new(|_| None); + + assert_eq!(0, count1.count()); + assert_eq!(0, count2.count()); + + let _token1 = items.insert(count1.child_token(|_| None)); + assert_eq!(1, count1.count()); + assert_eq!(0, count2.count()); + + let _token2 = items.insert(count2.child_token(|_| None)); + assert_eq!(1, count1.count()); + assert_eq!(1, count2.count()); + + drop(items); + assert_eq!(0, count1.count()); + assert_eq!(0, count2.count()); +} + +#[test] +fn drop_token() { + let items = RemotelyDroppableItems::new(); + + let (count1, _) = JoinerToken::::new(|_| None); + let (count2, _) = JoinerToken::::new(|_| None); + let (count3, _) = JoinerToken::::new(|_| None); + let (count4, _) = JoinerToken::::new(|_| None); + + let token1 = items.insert(count1.child_token(|_| None)); + let token2 = items.insert(count2.child_token(|_| None)); + let token3 = items.insert(count3.child_token(|_| None)); + let token4 = items.insert(count4.child_token(|_| None)); + assert_eq!(1, count1.count()); + assert_eq!(1, count2.count()); + assert_eq!(1, count3.count()); + assert_eq!(1, count4.count()); + + // Last item + drop(token4); + assert_eq!(1, count1.count()); + assert_eq!(1, count2.count()); + assert_eq!(1, count3.count()); + assert_eq!(0, count4.count()); + + // Middle item + drop(token2); + assert_eq!(1, count1.count()); + assert_eq!(0, count2.count()); + assert_eq!(1, count3.count()); + assert_eq!(0, count4.count()); + + // First item + drop(token1); + assert_eq!(0, count1.count()); + assert_eq!(0, count2.count()); + assert_eq!(1, count3.count()); + assert_eq!(0, count4.count()); + + // Only item + drop(token3); + assert_eq!(0, count1.count()); + assert_eq!(0, count2.count()); + assert_eq!(0, count3.count()); + assert_eq!(0, count4.count()); +}