Skip to content

Commit

Permalink
revert zipWithIndex changes (#1526)
Browse files Browse the repository at this point in the history
* revert zipWithIndex changes

* scalafmt

* Update FlowZipWithIndexSpec.scala
  • Loading branch information
pjfanning authored Oct 14, 2024
1 parent d20f102 commit bbbcacc
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
package org.apache.pekko.stream.scaladsl

import org.apache.pekko
import pekko.stream.{ ActorMaterializer, ActorMaterializerSettings, Materializer }
import pekko.stream.{ ActorMaterializer, ActorMaterializerSettings, ClosedShape, Materializer, UniformFanInShape }
import pekko.stream.testkit.{ StreamSpec, TestSubscriber }
import scala.annotation.nowarn

Expand Down Expand Up @@ -57,5 +57,39 @@ class FlowZipWithIndexSpec extends StreamSpec {
// #zip-with-index
}

"support junction output ports" in {
// https://github.com/apache/pekko/issues/1525
import GraphDSL.Implicits._

val pickMaxOfThree = GraphDSL.create() { implicit b =>
val zip1 = b.add(ZipWith[Int, Int, Int](math.max _))
val zip2 = b.add(ZipWith[Int, Int, Int](math.max _))
zip1.out ~> zip2.in0

UniformFanInShape(zip2.out, zip1.in0, zip1.in1, zip2.in1)
}

val probe = TestSubscriber.manualProbe[(Int, AnyVal)]()
val resultSink = Sink.fromSubscriber(probe)

val g = RunnableGraph.fromGraph(GraphDSL.createGraph(resultSink) { implicit b => sink =>
// importing the partial graph will return its shape (inlets & outlets)
val pm3 = b.add(pickMaxOfThree)

Source.single(1) ~> pm3.in(0)
Source.single(2) ~> pm3.in(1)
Source.single(3) ~> pm3.in(2)
pm3.out.zipWithIndex ~> sink.in
ClosedShape
})

g.run()

val subscription = probe.expectSubscription()
subscription.request(1)
probe.expectNext((3, 0))
probe.expectComplete()
}

}
}
14 changes: 10 additions & 4 deletions stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3304,10 +3304,16 @@ trait FlowOps[+Out, +Mat] {
*
* '''Cancels when''' downstream cancels
*/
def zipWithIndex: Repr[(Out, Long)] =
statefulMap(() => 0L)((index, out) =>
(index + 1L, (out, index)), _ => None)
.withAttributes(DefaultAttributes.zipWithIndex)
def zipWithIndex: Repr[(Out, Long)] = {
statefulMapConcat[(Out, Long)] { () =>
var index: Long = 0L
elem => {
val zipped = (elem, index)
index += 1
immutable.Iterable[(Out, Long)](zipped)
}
}
}

/**
* Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]].
Expand Down

0 comments on commit bbbcacc

Please sign in to comment.