Skip to content

Commit

Permalink
Add wiretap/wiretapContext to FlowWithContext/SourceWithContext
Browse files Browse the repository at this point in the history
  • Loading branch information
mdedetrich committed Aug 27, 2024
1 parent eccfb84 commit f548ea5
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 0 deletions.
1 change: 1 addition & 0 deletions docs/src/main/paradox/release-notes/releases-1.1.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ The Stream API has been updated to add some extra functions.
* added extra retry operators that allow users to provide a predicate to decide whether to retry based on the exception ([PR1269](https://github.com/apache/pekko/pull/1269))
* add optionalVia/unsafeOptionalDataVia operators ([PR1422](https://github.com/apache/pekko/pull/1422))
* add alsoTo/alsoToContext operators to `SourceWithContext`/`FlowWithContext` ([PR-1443](https://github.com/apache/pekko/pull/1443))
* add wireTap/wireTapContext operators to `SourceWithContext`/`FlowWithContext` ([PR-1446](https://github.com/apache/pekko/pull/1446))

The Stream Testkit Java DSL has some extra functions.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,50 @@ class FlowWithContextSpec extends StreamSpec {
}
}

"pass through all data when using wireTap" in {
val listBuffer = new ListBuffer[String]()
Source(Vector(Message("A", 1L), Message("B", 2L), Message("D", 3L), Message("C", 4L)))
.asSourceWithContext(_.offset)
.via(
FlowWithContext.fromTuples(Flow.fromFunction[(Message, Long), (String, Long)] { case (data, offset) =>
(data.data.toLowerCase, offset)
}).wireTap(Sink.foreach(string => listBuffer.+=(string)))
)
.toMat(TestSink.probe[(String, Long)])(Keep.right)
.run()
.request(4)
.expectNext(("a", 1L))
.expectNext(("b", 2L))
.expectNext(("d", 3L))
.expectNext(("c", 4L))
.expectComplete()
.within(10.seconds) {
listBuffer should contain atLeastOneElementOf List("a", "b", "d", "c")
}
}

"pass through all data when using wireTapContext" in {
val listBuffer = new ListBuffer[Long]()
Source(Vector(Message("A", 1L), Message("B", 2L), Message("D", 3L), Message("C", 4L)))
.asSourceWithContext(_.offset)
.via(
FlowWithContext.fromTuples(Flow.fromFunction[(Message, Long), (String, Long)] { case (data, offset) =>
(data.data.toLowerCase, offset)
}).wireTapContext(Sink.foreach(offset => listBuffer.+=(offset)))
)
.toMat(TestSink.probe[(String, Long)])(Keep.right)
.run()
.request(4)
.expectNext(("a", 1L))
.expectNext(("b", 2L))
.expectNext(("d", 3L))
.expectNext(("c", 4L))
.expectComplete()
.within(10.seconds) {
listBuffer should contain atLeastOneElementOf List(1L, 2L, 3L, 4L)
}
}

"keep the same order for data and context when using unsafeDataVia" in {
val data = List(("1", 1), ("2", 2), ("3", 3), ("4", 4))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,44 @@ class SourceWithContextSpec extends StreamSpec {
}
}

"pass through all data when using wireTap" in {
val listBuffer = new ListBuffer[Message]()
val messages = Vector(Message("A", 1L), Message("B", 2L), Message("D", 3L), Message("C", 4L))
Source(messages)
.asSourceWithContext(_.offset)
.wireTap(Sink.foreach(message => listBuffer.+=(message)))
.toMat(TestSink.probe[(Message, Long)])(Keep.right)
.run()
.request(4)
.expectNext((Message("A", 1L), 1L))
.expectNext((Message("B", 2L), 2L))
.expectNext((Message("D", 3L), 3L))
.expectNext((Message("C", 4L), 4L))
.expectComplete()
.within(10.seconds) {
listBuffer.toVector should contain atLeastOneElementOf messages
}
}

"pass through all data when using wireTapContext" in {
val listBuffer = new ListBuffer[Long]()
val messages = Vector(Message("A", 1L), Message("B", 2L), Message("D", 3L), Message("C", 4L))
Source(messages)
.asSourceWithContext(_.offset)
.wireTapContext(Sink.foreach(offset => listBuffer.+=(offset)))
.toMat(TestSink.probe[(Message, Long)])(Keep.right)
.run()
.request(4)
.expectNext((Message("A", 1L), 1L))
.expectNext((Message("B", 2L), 2L))
.expectNext((Message("D", 3L), 3L))
.expectNext((Message("C", 4L), 4L))
.expectComplete()
.within(10.seconds) {
listBuffer.toVector should contain atLeastOneElementOf (messages.map(_.offset))
}
}

"pass through contexts via a FlowWithContext" in {

def flowWithContext[T] = FlowWithContext[T, Long]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.stream.scaladsl.FlowWithContextOps.wireTap")
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.stream.scaladsl.FlowWithContextOps.wireTapContext")
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ final class FlowWithContext[-In, -CtxIn, +Out, +CtxOut, +Mat](delegate: Flow[(In
override def alsoToContext(that: Graph[SinkShape[CtxOut], _]): Repr[Out, CtxOut] =
FlowWithContext.fromTuples(delegate.alsoTo(Sink.contramapImpl(that, (in: (Out, CtxOut)) => in._2)))

override def wireTap(that: Graph[SinkShape[Out], _]): Repr[Out, CtxOut] =
FlowWithContext.fromTuples(delegate.wireTap(Sink.contramapImpl(that, (in: (Out, CtxOut)) => in._1)))

override def wireTapContext(that: Graph[SinkShape[CtxOut], _]): Repr[Out, CtxOut] =
FlowWithContext.fromTuples(delegate.wireTap(Sink.contramapImpl(that, (in: (Out, CtxOut)) => in._2)))

/**
* Context-preserving variant of [[pekko.stream.scaladsl.Flow.withAttributes]].
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,22 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] {
*/
def alsoToContext(that: Graph[SinkShape[Ctx], _]): Repr[Out, Ctx]

/**
* Data variant of [[pekko.stream.scaladsl.FlowOps.wireTap]]
*
* @see [[pekko.stream.scaladsl.FlowOps.wireTap]]
* @since 1.1.0
*/
def wireTap(that: Graph[SinkShape[Out], _]): Repr[Out, Ctx]

/**
* Context variant of [[pekko.stream.scaladsl.FlowOps.wireTap]]
*
* @see [[pekko.stream.scaladsl.FlowOps.wireTap]]
* @since 1.1.0
*/
def wireTapContext(that: Graph[SinkShape[Ctx], _]): Repr[Out, Ctx]

/**
* Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.map]].
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,12 @@ final class SourceWithContext[+Out, +Ctx, +Mat] private[stream] (delegate: Sourc
override def alsoToContext(that: Graph[SinkShape[Ctx], _]): Repr[Out, Ctx] =
SourceWithContext.fromTuples(delegate.alsoTo(Sink.contramapImpl(that, (in: (Out, Ctx)) => in._2)))

override def wireTap(that: Graph[SinkShape[Out], _]): Repr[Out, Ctx] =
SourceWithContext.fromTuples(delegate.wireTap(Sink.contramapImpl(that, (in: (Out, Ctx)) => in._1)))

override def wireTapContext(that: Graph[SinkShape[Ctx], _]): Repr[Out, Ctx] =
SourceWithContext.fromTuples(delegate.wireTap(Sink.contramapImpl(that, (in: (Out, Ctx)) => in._2)))

/**
* Connect this [[pekko.stream.scaladsl.SourceWithContext]] to a [[pekko.stream.scaladsl.Sink]] and run it.
* The returned value is the materialized value of the `Sink`.
Expand Down

0 comments on commit f548ea5

Please sign in to comment.