From 9948552e3d5a5deb29b748c663a4b52bf15941cd Mon Sep 17 00:00:00 2001 From: Matthew de Detrich Date: Fri, 23 Aug 2024 16:13:29 +0200 Subject: [PATCH] Add wiretap/wiretapContext to FlowWithContext/SourceWithContext --- .../paradox/release-notes/releases-1.1.md | 7 ++- .../stream/scaladsl/FlowWithContextSpec.scala | 44 +++++++++++++++++++ .../scaladsl/SourceWithContextSpec.scala | 38 ++++++++++++++++ ...ource-flow-with-context.backwards.excludes | 2 + .../stream/scaladsl/FlowWithContext.scala | 6 +++ .../stream/scaladsl/FlowWithContextOps.scala | 16 +++++++ .../stream/scaladsl/SourceWithContext.scala | 6 +++ 7 files changed, 117 insertions(+), 2 deletions(-) create mode 100644 stream/src/main/mima-filters/1.0.x.backwards.excludes/pr-1446-wireTap-wireTapContext-source-flow-with-context.backwards.excludes diff --git a/docs/src/main/paradox/release-notes/releases-1.1.md b/docs/src/main/paradox/release-notes/releases-1.1.md index 5fdee4669f..8743d9d0b6 100644 --- a/docs/src/main/paradox/release-notes/releases-1.1.md +++ b/docs/src/main/paradox/release-notes/releases-1.1.md @@ -5,6 +5,11 @@ Release notes for Apache Pekko 1.1.0-M1. See [GitHub Milestone](https://github.com/apache/pekko/milestone/2?closed=1) for fuller list of changes. As with all milestone releases, this release is not recommended for production use - it is designed to allow users to try out the changes in a test environment. +### Additional APIs +* add optionalVia/unsafeOptionalDataVia operators ([PR1422](https://github.com/apache/pekko/pull/1422)) +* add alsoTo/alsoToContext operators to `SourceWithContext`/`FlowWithContext` ([PR-1443](https://github.com/apache/pekko/pull/1443)) +* add wireTap/wireTapContext operators to `SourceWithContext`/`FlowWithContext` ([PR-1446](https://github.com/apache/pekko/pull/1446)) + ### 1.0.x changes Apache Pekko 1.1.0-M1 contains all of the changes that have been released in the @ref:[1.0.x releases](releases-1.0.md) up to and including v1.0.3-M1. @@ -41,8 +46,6 @@ The Stream API has been updated to add some extra functions. * add Sink.forall operator ([PR989](https://github.com/apache/pekko/pull/989)) * add Source.iterate operator ([PR1244](https://github.com/apache/pekko/pull/1244)) * added extra retry operators that allow users to provide a predicate to decide whether to retry based on the exception ([PR1269](https://github.com/apache/pekko/pull/1269)) -* add optionalVia/unsafeOptionalDataVia operators ([PR1422](https://github.com/apache/pekko/pull/1422)) -* add alsoTo/alsoToContext operators to `SourceWithContext`/`FlowWithContext` ([PR-1443](https://github.com/apache/pekko/pull/1443)) The Stream Testkit Java DSL has some extra functions. diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala index ed3d61ab6f..9acee47c76 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala @@ -126,6 +126,50 @@ class FlowWithContextSpec extends StreamSpec { } } + "pass through all data when using wireTap" in { + val listBuffer = new ListBuffer[String]() + Source(Vector(Message("A", 1L), Message("B", 2L), Message("D", 3L), Message("C", 4L))) + .asSourceWithContext(_.offset) + .via( + FlowWithContext.fromTuples(Flow.fromFunction[(Message, Long), (String, Long)] { case (data, offset) => + (data.data.toLowerCase, offset) + }).wireTap(Sink.foreach(string => listBuffer.+=(string))) + ) + .toMat(TestSink.probe[(String, Long)])(Keep.right) + .run() + .request(4) + .expectNext(("a", 1L)) + .expectNext(("b", 2L)) + .expectNext(("d", 3L)) + .expectNext(("c", 4L)) + .expectComplete() + .within(10.seconds) { + listBuffer should contain atLeastOneElementOf List("a", "b", "d", "c") + } + } + + "pass through all data when using wireTapContext" in { + val listBuffer = new ListBuffer[Long]() + Source(Vector(Message("A", 1L), Message("B", 2L), Message("D", 3L), Message("C", 4L))) + .asSourceWithContext(_.offset) + .via( + FlowWithContext.fromTuples(Flow.fromFunction[(Message, Long), (String, Long)] { case (data, offset) => + (data.data.toLowerCase, offset) + }).wireTapContext(Sink.foreach(offset => listBuffer.+=(offset))) + ) + .toMat(TestSink.probe[(String, Long)])(Keep.right) + .run() + .request(4) + .expectNext(("a", 1L)) + .expectNext(("b", 2L)) + .expectNext(("d", 3L)) + .expectNext(("c", 4L)) + .expectComplete() + .within(10.seconds) { + listBuffer should contain atLeastOneElementOf List(1L, 2L, 3L, 4L) + } + } + "keep the same order for data and context when using unsafeDataVia" in { val data = List(("1", 1), ("2", 2), ("3", 3), ("4", 4)) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala index 0c832f2beb..06650a7b77 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala @@ -115,6 +115,44 @@ class SourceWithContextSpec extends StreamSpec { } } + "pass through all data when using wireTap" in { + val listBuffer = new ListBuffer[Message]() + val messages = Vector(Message("A", 1L), Message("B", 2L), Message("D", 3L), Message("C", 4L)) + Source(messages) + .asSourceWithContext(_.offset) + .wireTap(Sink.foreach(message => listBuffer.+=(message))) + .toMat(TestSink.probe[(Message, Long)])(Keep.right) + .run() + .request(4) + .expectNext((Message("A", 1L), 1L)) + .expectNext((Message("B", 2L), 2L)) + .expectNext((Message("D", 3L), 3L)) + .expectNext((Message("C", 4L), 4L)) + .expectComplete() + .within(10.seconds) { + listBuffer.toVector should contain atLeastOneElementOf messages + } + } + + "pass through all data when using wireTapContext" in { + val listBuffer = new ListBuffer[Long]() + val messages = Vector(Message("A", 1L), Message("B", 2L), Message("D", 3L), Message("C", 4L)) + Source(messages) + .asSourceWithContext(_.offset) + .wireTapContext(Sink.foreach(offset => listBuffer.+=(offset))) + .toMat(TestSink.probe[(Message, Long)])(Keep.right) + .run() + .request(4) + .expectNext((Message("A", 1L), 1L)) + .expectNext((Message("B", 2L), 2L)) + .expectNext((Message("D", 3L), 3L)) + .expectNext((Message("C", 4L), 4L)) + .expectComplete() + .within(10.seconds) { + listBuffer.toVector should contain atLeastOneElementOf (messages.map(_.offset)) + } + } + "pass through contexts via a FlowWithContext" in { def flowWithContext[T] = FlowWithContext[T, Long] diff --git a/stream/src/main/mima-filters/1.0.x.backwards.excludes/pr-1446-wireTap-wireTapContext-source-flow-with-context.backwards.excludes b/stream/src/main/mima-filters/1.0.x.backwards.excludes/pr-1446-wireTap-wireTapContext-source-flow-with-context.backwards.excludes new file mode 100644 index 0000000000..0eb85a68d4 --- /dev/null +++ b/stream/src/main/mima-filters/1.0.x.backwards.excludes/pr-1446-wireTap-wireTapContext-source-flow-with-context.backwards.excludes @@ -0,0 +1,2 @@ + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.stream.scaladsl.FlowWithContextOps.wireTap") + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.stream.scaladsl.FlowWithContextOps.wireTapContext") diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala index 13df874395..87315e91ec 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala @@ -144,6 +144,12 @@ final class FlowWithContext[-In, -CtxIn, +Out, +CtxOut, +Mat](delegate: Flow[(In override def alsoToContext(that: Graph[SinkShape[CtxOut], _]): Repr[Out, CtxOut] = FlowWithContext.fromTuples(delegate.alsoTo(Sink.contramapImpl(that, (in: (Out, CtxOut)) => in._2))) + override def wireTap(that: Graph[SinkShape[Out], _]): Repr[Out, CtxOut] = + FlowWithContext.fromTuples(delegate.wireTap(Sink.contramapImpl(that, (in: (Out, CtxOut)) => in._1))) + + override def wireTapContext(that: Graph[SinkShape[CtxOut], _]): Repr[Out, CtxOut] = + FlowWithContext.fromTuples(delegate.wireTap(Sink.contramapImpl(that, (in: (Out, CtxOut)) => in._2))) + /** * Context-preserving variant of [[pekko.stream.scaladsl.Flow.withAttributes]]. * diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala index fe06cf0142..88848d2fca 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala @@ -104,6 +104,22 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] { */ def alsoToContext(that: Graph[SinkShape[Ctx], _]): Repr[Out, Ctx] + /** + * Data variant of [[pekko.stream.scaladsl.FlowOps.wireTap]] + * + * @see [[pekko.stream.scaladsl.FlowOps.wireTap]] + * @since 1.1.0 + */ + def wireTap(that: Graph[SinkShape[Out], _]): Repr[Out, Ctx] + + /** + * Context variant of [[pekko.stream.scaladsl.FlowOps.wireTap]] + * + * @see [[pekko.stream.scaladsl.FlowOps.wireTap]] + * @since 1.1.0 + */ + def wireTapContext(that: Graph[SinkShape[Ctx], _]): Repr[Out, Ctx] + /** * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.map]]. * diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala index be9c09f7e1..f98c445bf9 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala @@ -161,6 +161,12 @@ final class SourceWithContext[+Out, +Ctx, +Mat] private[stream] (delegate: Sourc override def alsoToContext(that: Graph[SinkShape[Ctx], _]): Repr[Out, Ctx] = SourceWithContext.fromTuples(delegate.alsoTo(Sink.contramapImpl(that, (in: (Out, Ctx)) => in._2))) + override def wireTap(that: Graph[SinkShape[Out], _]): Repr[Out, Ctx] = + SourceWithContext.fromTuples(delegate.wireTap(Sink.contramapImpl(that, (in: (Out, Ctx)) => in._1))) + + override def wireTapContext(that: Graph[SinkShape[Ctx], _]): Repr[Out, Ctx] = + SourceWithContext.fromTuples(delegate.wireTap(Sink.contramapImpl(that, (in: (Out, Ctx)) => in._2))) + /** * Connect this [[pekko.stream.scaladsl.SourceWithContext]] to a [[pekko.stream.scaladsl.Sink]] and run it. * The returned value is the materialized value of the `Sink`.