diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowZipWithIndexSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowZipWithIndexSpec.scala index 86a2354da0..7b696020ea 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowZipWithIndexSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowZipWithIndexSpec.scala @@ -14,7 +14,7 @@ package org.apache.pekko.stream.scaladsl import org.apache.pekko -import pekko.stream.{ ActorMaterializer, ActorMaterializerSettings, Materializer } +import pekko.stream.{ ActorMaterializer, ActorMaterializerSettings, ClosedShape, Materializer, UniformFanInShape } import pekko.stream.testkit.{ StreamSpec, TestSubscriber } import scala.annotation.nowarn @@ -57,5 +57,39 @@ class FlowZipWithIndexSpec extends StreamSpec { // #zip-with-index } + "support junction output ports" in { + // https://github.com/apache/pekko/issues/1525 + import GraphDSL.Implicits._ + + val pickMaxOfThree = GraphDSL.create() { implicit b => + val zip1 = b.add(ZipWith[Int, Int, Int](math.max _)) + val zip2 = b.add(ZipWith[Int, Int, Int](math.max _)) + zip1.out ~> zip2.in0 + + UniformFanInShape(zip2.out, zip1.in0, zip1.in1, zip2.in1) + } + + val probe = TestSubscriber.manualProbe[(Int, AnyVal)]() + val resultSink = Sink.fromSubscriber(probe) + + val g = RunnableGraph.fromGraph(GraphDSL.createGraph(resultSink) { implicit b => sink => + // importing the partial graph will return its shape (inlets & outlets) + val pm3 = b.add(pickMaxOfThree) + + Source.single(1) ~> pm3.in(0) + Source.single(2) ~> pm3.in(1) + Source.single(3) ~> pm3.in(2) + pm3.out.zipWithIndex ~> sink.in + ClosedShape + }) + + g.run() + + val subscription = probe.expectSubscription() + subscription.request(1) + probe.expectNext((3, 0)) + probe.expectComplete() + } + } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 3bf8e436ba..9c66727d51 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -3304,10 +3304,16 @@ trait FlowOps[+Out, +Mat] { * * '''Cancels when''' downstream cancels */ - def zipWithIndex: Repr[(Out, Long)] = - statefulMap(() => 0L)((index, out) => - (index + 1L, (out, index)), _ => None) - .withAttributes(DefaultAttributes.zipWithIndex) + def zipWithIndex: Repr[(Out, Long)] = { + statefulMapConcat[(Out, Long)] { () => + var index: Long = 0L + elem => { + val zipped = (elem, index) + index += 1 + immutable.Iterable[(Out, Long)](zipped) + } + } + } /** * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]].