diff --git a/docs/src/main/paradox/release-notes/releases-1.1.md b/docs/src/main/paradox/release-notes/releases-1.1.md index 9571bdda364..5fdee4669f9 100644 --- a/docs/src/main/paradox/release-notes/releases-1.1.md +++ b/docs/src/main/paradox/release-notes/releases-1.1.md @@ -42,6 +42,7 @@ The Stream API has been updated to add some extra functions. * add Source.iterate operator ([PR1244](https://github.com/apache/pekko/pull/1244)) * added extra retry operators that allow users to provide a predicate to decide whether to retry based on the exception ([PR1269](https://github.com/apache/pekko/pull/1269)) * add optionalVia/unsafeOptionalDataVia operators ([PR1422](https://github.com/apache/pekko/pull/1422)) +* add alsoTo/alsoToContext operators to `SourceWithContext`/`FlowWithContext` ([PR-1443](https://github.com/apache/pekko/pull/1443)) The Stream Testkit Java DSL has some extra functions. diff --git a/stream/src/main/mima-filters/1.0.x.backwards.excludes/pr-1443-also-alsoTo-source-flow-with-context.backwards.excludes b/stream/src/main/mima-filters/1.0.x.backwards.excludes/pr-1443-also-alsoTo-source-flow-with-context.backwards.excludes new file mode 100644 index 00000000000..4006d402a19 --- /dev/null +++ b/stream/src/main/mima-filters/1.0.x.backwards.excludes/pr-1443-also-alsoTo-source-flow-with-context.backwards.excludes @@ -0,0 +1,2 @@ +ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.stream.scaladsl.FlowWithContextOps.alsoTo") +ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.stream.scaladsl.FlowWithContextOps.alsoToContext") diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala index 94141c95b94..c1db961ee2d 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala @@ -138,6 +138,14 @@ final class FlowWithContext[-In, -CtxIn, +Out, +CtxOut, +Mat](delegate: Flow[(In combine: (Mat, Mat2) => Mat3): FlowWithContext[In, CtxIn, Out2, Ctx2, Mat3] = new FlowWithContext(delegate.viaMat(flow)(combine)) + override def alsoTo(that: Graph[SinkShape[Out], _]): Repr[Out, CtxOut] = + FlowWithContext.fromTuples(delegate.alsoTo(Flow.fromFunction { (in: (Out, CtxOut)) => in._1 }.toMat(that)( + Keep.right))) + + override def alsoToContext(that: Graph[SinkShape[CtxOut], _]): Repr[Out, CtxOut] = + FlowWithContext.fromTuples(delegate.alsoTo(Flow.fromFunction { (in: (Out, CtxOut)) => in._2 }.toMat(that)( + Keep.right))) + /** * Context-preserving variant of [[pekko.stream.scaladsl.Flow.withAttributes]]. * diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala index 8edcc22c203..a8a08c902af 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala @@ -88,6 +88,20 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] { def viaMat[Out2, Ctx2, Mat2, Mat3](flow: Graph[FlowShape[(Out, Ctx), (Out2, Ctx2)], Mat2])( combine: (Mat, Mat2) => Mat3): ReprMat[Out2, Ctx2, Mat3] + /** + * Data variant of [[pekko.stream.scaladsl.FlowOps.alsoTo]] + * + * @see [[pekko.stream.scaladsl.FlowOps.alsoTo]] + */ + def alsoTo(that: Graph[SinkShape[Out], _]): Repr[Out, Ctx] + + /** + * Context variant of [[pekko.stream.scaladsl.FlowOps.alsoTo]] + * + * @see [[pekko.stream.scaladsl.FlowOps.alsoTo]] + */ + def alsoToContext(that: Graph[SinkShape[Ctx], _]): Repr[Out, Ctx] + /** * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.map]]. * diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala index 59e7af530dc..3b377fa65cd 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala @@ -155,6 +155,14 @@ final class SourceWithContext[+Out, +Ctx, +Mat] private[stream] (delegate: Sourc def toMat[Mat2, Mat3](sink: Graph[SinkShape[(Out, Ctx)], Mat2])(combine: (Mat, Mat2) => Mat3): RunnableGraph[Mat3] = delegate.toMat(sink)(combine) + override def alsoTo(that: Graph[SinkShape[Out], _]): Repr[Out, Ctx] = + SourceWithContext.fromTuples(delegate.alsoTo(Flow.fromFunction { (in: (Out, Ctx)) => in._1 }.toMat(that)( + Keep.right))) + + override def alsoToContext(that: Graph[SinkShape[Ctx], _]): Repr[Out, Ctx] = + SourceWithContext.fromTuples(delegate.alsoTo(Flow.fromFunction { (in: (Out, Ctx)) => in._2 }.toMat(that)( + Keep.right))) + /** * Connect this [[pekko.stream.scaladsl.SourceWithContext]] to a [[pekko.stream.scaladsl.Sink]] and run it. * The returned value is the materialized value of the `Sink`.