From 8140a3b3a064e2f599e078a252a07216eb2b9181 Mon Sep 17 00:00:00 2001 From: Matthew de Detrich Date: Mon, 9 Sep 2024 09:15:21 +0200 Subject: [PATCH] Add overridden duration timeout to StreamTestKit --- .../testkit/javadsl/StreamTestKit.scala | 20 +++++++++++- .../testkit/scaladsl/StreamTestKit.scala | 32 ++++++++++++++++--- .../pekko/stream/testkit/StreamSpec.scala | 2 +- 3 files changed, 48 insertions(+), 6 deletions(-) diff --git a/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/javadsl/StreamTestKit.scala b/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/javadsl/StreamTestKit.scala index 5f42371bf38..02d1237a84d 100644 --- a/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/javadsl/StreamTestKit.scala +++ b/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/javadsl/StreamTestKit.scala @@ -19,6 +19,10 @@ import pekko.stream.{ Materializer, SystemMaterializer } import pekko.stream.impl.PhasedFusingActorMaterializer import pekko.stream.testkit.scaladsl +import java.time.Duration +import java.util.concurrent.TimeUnit +import scala.concurrent.duration.FiniteDuration + object StreamTestKit { /** @@ -29,7 +33,21 @@ object StreamTestKit { def assertAllStagesStopped(mat: Materializer): Unit = mat match { case impl: PhasedFusingActorMaterializer => - scaladsl.StreamTestKit.assertNoChildren(impl.system, impl.supervisor) + scaladsl.StreamTestKit.assertNoChildren(impl.system, impl.supervisor, None) + case _ => + } + + /** + * Assert that there are no stages running under a given materializer. + * Usually this assertion is run after a test-case to check that all of the + * stages have terminated successfully with an overridden duration that ignores + * `stream.testkit.all-stages-stopped-timeout`. + */ + def assertAllStagesStopped(mat: Materializer, duration: Duration): Unit = + mat match { + case impl: PhasedFusingActorMaterializer => + scaladsl.StreamTestKit.assertNoChildren(impl.system, impl.supervisor, + Some(FiniteDuration(duration.toMillis, TimeUnit.MILLISECONDS))) case _ => } diff --git a/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/scaladsl/StreamTestKit.scala b/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/scaladsl/StreamTestKit.scala index 85d5d5623b8..e5797f59634 100644 --- a/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/scaladsl/StreamTestKit.scala +++ b/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/scaladsl/StreamTestKit.scala @@ -35,12 +35,30 @@ object StreamTestKit { * This assertion is useful to check that all of the stages have * terminated successfully. */ + def assertAllStagesStopped[T](block: => T, overrideTimeout: FiniteDuration)(implicit materializer: Materializer): T = + materializer match { + case impl: PhasedFusingActorMaterializer => + stopAllChildren(impl.system, impl.supervisor) + val result = block + assertNoChildren(impl.system, impl.supervisor, Some(overrideTimeout)) + result + case _ => block + } + + /** + * Asserts that after the given code block is ran, no stages are left over + * that were created by the given materializer with an overridden duration + * that ignores `stream.testkit.all-stages-stopped-timeout`. + * + * This assertion is useful to check that all of the stages have + * terminated successfully. + */ def assertAllStagesStopped[T](block: => T)(implicit materializer: Materializer): T = materializer match { case impl: PhasedFusingActorMaterializer => stopAllChildren(impl.system, impl.supervisor) val result = block - assertNoChildren(impl.system, impl.supervisor) + assertNoChildren(impl.system, impl.supervisor, None) result case _ => block } @@ -53,10 +71,16 @@ object StreamTestKit { } /** INTERNAL API */ - @InternalApi private[testkit] def assertNoChildren(sys: ActorSystem, supervisor: ActorRef): Unit = { + @InternalApi private[testkit] def assertNoChildren(sys: ActorSystem, supervisor: ActorRef, + overrideTimeout: Option[FiniteDuration]): Unit = { val probe = TestProbe()(sys) - val c = sys.settings.config.getConfig("pekko.stream.testkit") - val timeout = c.getDuration("all-stages-stopped-timeout", MILLISECONDS).millis + val timeout = overrideTimeout match { + case Some(value) => value + case None => + val c = sys.settings.config.getConfig("pekko.stream.testkit") + c.getDuration("all-stages-stopped-timeout", MILLISECONDS).millis + } + probe.within(timeout) { try probe.awaitAssert { supervisor.tell(StreamSupervisor.GetChildren, probe.ref) diff --git a/stream-testkit/src/test/scala/org/apache/pekko/stream/testkit/StreamSpec.scala b/stream-testkit/src/test/scala/org/apache/pekko/stream/testkit/StreamSpec.scala index 827a3c4918c..a6fcde03840 100644 --- a/stream-testkit/src/test/scala/org/apache/pekko/stream/testkit/StreamSpec.scala +++ b/stream-testkit/src/test/scala/org/apache/pekko/stream/testkit/StreamSpec.scala @@ -73,7 +73,7 @@ abstract class StreamSpec(_system: ActorSystem) extends PekkoSpec(_system) { case impl: PhasedFusingActorMaterializer => stopAllChildren(impl.system, impl.supervisor) val result = test.apply() - assertNoChildren(impl.system, impl.supervisor) + assertNoChildren(impl.system, impl.supervisor, None) result case _ => other }