Skip to content

Commit

Permalink
Merge pull request #3387 from BalmungSan/optimize-flow-interop
Browse files Browse the repository at this point in the history
Optmize `interop.flow.StreamSubscriber.onNext`
  • Loading branch information
armanbilge authored Oct 23, 2024
2 parents dfa4928 + 58bfa07 commit 8d11163
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 40 deletions.
89 changes: 89 additions & 0 deletions benchmark/src/main/scala/fs2/benchmark/FlowInteropBenchmark.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright (c) 2013 Functional Streams for Scala
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package fs2
package benchmark

import cats.effect.IO
import cats.effect.unsafe.implicits.global

import org.openjdk.jmh.annotations.{
Benchmark,
BenchmarkMode,
Mode,
OutputTimeUnit,
Param,
Scope,
State
}

import java.util.concurrent.TimeUnit
import java.util.concurrent.Flow.{Publisher, Subscriber, Subscription}

@State(Scope.Thread)
@BenchmarkMode(Array(Mode.Throughput))
@OutputTimeUnit(TimeUnit.SECONDS)
class FlowInteropBenchmark {
@Param(Array("1024", "5120", "10240", "51200", "512000"))
var totalElements: Long = _

@Param(Array("1000"))
var iterations: Int = _

@Benchmark
def fastPublisher(): Unit = {
def publisher =
new Publisher[Unit] {
override final def subscribe(subscriber: Subscriber[? >: Unit]): Unit =
subscriber.onSubscribe(
new Subscription {
var i: Long = 0
@volatile var canceled: Boolean = false

// Sequential fast Publisher.
override final def request(n: Long): Unit = {
val elementsToProduce = math.min(i + n, totalElements)

while (i < elementsToProduce) {
subscriber.onNext(())
i += 1
}

if (i == totalElements || canceled) {
subscriber.onComplete()
}
}

override final def cancel(): Unit =
canceled = true
}
)
}

val stream =
interop.flow.fromPublisher[IO](publisher, chunkSize = 512)

val program =
stream.compile.drain

program.replicateA_(iterations).unsafeRunSync()
}
}
18 changes: 17 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,23 @@ ThisBuild / mimaBinaryIssueFilters ++= Seq(
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.hash.createHash"),
ProblemFilters.exclude[MissingClassProblem]("fs2.hash$Hash"),
ProblemFilters.exclude[MissingFieldProblem]("fs2.hash.openssl"),
ProblemFilters.exclude[MissingClassProblem]("fs2.hash$openssl$")
ProblemFilters.exclude[MissingClassProblem]("fs2.hash$openssl$"),
// Privates: #3387
ProblemFilters.exclude[MissingClassProblem](
"fs2.interop.flow.StreamSubscriber$Input$Next"
),
ProblemFilters.exclude[MissingClassProblem](
"fs2.interop.flow.StreamSubscriber$Input$Next$"
),
ProblemFilters.exclude[MissingFieldProblem](
"fs2.interop.flow.StreamSubscriber#Input.Next"
),
ProblemFilters.exclude[Problem](
"fs2.interop.flow.StreamSubscriber#State#WaitingOnUpstream.*"
),
ProblemFilters.exclude[MissingTypesProblem](
"fs2.interop.flow.StreamSubscriber$State$WaitingOnUpstream$"
)
)

lazy val root = tlCrossRootProject
Expand Down
126 changes: 87 additions & 39 deletions core/shared/src/main/scala/fs2/interop/flow/StreamSubscriber.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,36 @@ private[flow] final class StreamSubscriber[F[_], A] private (
nextState(input = Subscribe(subscription))
}

/** OnNext state.
* This is concurrent unsafe,
* however the reactive-streams specification demands that these operations are called serially:
* https://github.com/reactive-streams/reactive-streams-jvm?tab=readme-ov-file#1.3
* Additionally, we ensure that the modifications happens only after we ensure they are safe;
* since they are always done on the effect run after the state update took place.
* Meaning this should be correct if the Producer is well-behaved.
*/
private var inOnNextLoop: Boolean = _
private var buffer: Array[Any] = null
private var index: Int = _

/** Receives the next record from the upstream reactive-streams system. */
override final def onNext(a: A): Unit = {
requireNonNull(
a,
"The element provided to onNext must not be null"
)
nextState(input = Next(a))

// Optimized onNext loop.
if (inOnNextLoop) {
// If we are here, we can assume the array is properly initialized.
buffer(index) = a
index += 1
if (index == chunkSize) {
nextState(input = CompleteNext)
}
} else {
nextState(input = InitialNext(a))
}
}

/** Called by the upstream reactive-streams system when it fails. */
Expand Down Expand Up @@ -119,7 +142,7 @@ private[flow] final class StreamSubscriber[F[_], A] private (
Idle(s) -> noop

case Uninitialized(Some(cb)) =>
WaitingOnUpstream(idx = 0, buffer = null, cb, s) -> run {
WaitingOnUpstream(cb, s) -> run {
s.request(chunkSize.toLong)
}

Expand All @@ -129,30 +152,22 @@ private[flow] final class StreamSubscriber[F[_], A] private (
}
}

case Next(a) => {
case WaitingOnUpstream(idx, buffer, cb, s) =>
val newIdx = idx + 1

case InitialNext(a) => {
case state @ WaitingOnUpstream(cb, s) =>
if (chunkSize == 1) {
// Optimization for when the chunkSize is 1.
Idle(s) -> run {
cb.apply(Right(Some(Chunk.singleton(a))))
}
} else if (idx == 0) {
val newBuffer = new Array[Any](chunkSize)
WaitingOnUpstream(newIdx, newBuffer, cb, s) -> run {
// We do the update here, to ensure it happens after we have secured access to the index.
newBuffer.update(idx, a)
}
} else if (newIdx == chunkSize) {
Idle(s) -> run {
// We do the update here, to ensure it happens after we have secured access to the index.
buffer.update(idx, a)
cb.apply(Right(Some(Chunk.array(buffer))))
}
} else {
WaitingOnUpstream(newIdx, buffer, cb, s) -> run {
// We do the update here, to ensure it happens after we have secured access to the index.
buffer.update(idx, a)
// We start the onNext tight loop.
state -> run {
// We do the updates here,
// to ensure they happen after we have secured the state.
inOnNextLoop = true
index = 1
buffer = new Array(chunkSize)
buffer(0) = a
}
}

Expand All @@ -168,15 +183,42 @@ private[flow] final class StreamSubscriber[F[_], A] private (
Failed(new InvalidStateException(operation = s"Received record [${a}]", state)) -> noop
}

case CompleteNext => {
case WaitingOnUpstream(cb, s) =>
Idle(s) -> run {
// We do the updates here,
// to ensure they happen after we have secured the state.
cb.apply(Right(Some(Chunk.array(buffer))))
inOnNextLoop = false
buffer = null
}

case state =>
Failed(
new InvalidStateException(operation = s"Received record [${buffer.last}]", state)
) -> run {
inOnNextLoop = false
buffer = null
}
}

case Error(ex) => {
case Uninitialized(Some(cb)) =>
Terminal -> run {
cb.apply(Left(ex))
// We do the updates here,
// to ensure they happen after we have secured the state.
inOnNextLoop = false
buffer = null
}

case WaitingOnUpstream(_, _, cb, _) =>
case WaitingOnUpstream(cb, _) =>
Terminal -> run {
cb.apply(Left(ex))
// We do the updates here,
// to ensure they happen after we have secured the state.
inOnNextLoop = false
buffer = null
}

case _ =>
Expand All @@ -196,16 +238,19 @@ private[flow] final class StreamSubscriber[F[_], A] private (
}
}

case WaitingOnUpstream(idx, buffer, cb, s) =>
case WaitingOnUpstream(cb, s) =>
Terminal -> run {
if (idx == 0) {
cb.apply(Right(None))
} else {
cb.apply(Right(Some(Chunk.array(buffer, offset = 0, length = idx))))
}

if (canceled) {
s.cancel()
cb.apply(Right(None))
} else if (index == 0) {
cb.apply(Right(None))
} else {
cb.apply(Right(Some(Chunk.array(buffer, offset = 0, length = index))))
// We do the updates here,
// to ensure they happen after we have secured the state.
inOnNextLoop = false
buffer = null
}
}

Expand All @@ -221,8 +266,12 @@ private[flow] final class StreamSubscriber[F[_], A] private (
Uninitialized(Some(cb)) -> noop

case Idle(s) =>
WaitingOnUpstream(idx = 0, buffer = null, cb, s) -> run {
WaitingOnUpstream(cb, s) -> run {
s.request(chunkSize.toLong)
// We do the updates here,
// to ensure they happen after we have secured the state.
inOnNextLoop = false
index = 0
}

case state @ Uninitialized(Some(otherCB)) =>
Expand All @@ -232,13 +281,16 @@ private[flow] final class StreamSubscriber[F[_], A] private (
cb.apply(ex)
}

case state @ WaitingOnUpstream(_, _, otherCB, s) =>
case state @ WaitingOnUpstream(otherCB, s) =>
Terminal -> run {
s.cancel()

val ex = Left(new InvalidStateException(operation = "Received request", state))
otherCB.apply(ex)
cb.apply(ex)
// We do the updates here,
// to ensure they happen after we have secured the state.
inOnNextLoop = false
buffer = null
}

case Failed(ex) =>
Expand Down Expand Up @@ -318,12 +370,7 @@ private[flow] object StreamSubscriber {

final case class Uninitialized(cb: Option[CB]) extends State
final case class Idle(s: Subscription) extends State
// Having an Array inside the state is fine,
// because the reactive streams spec ensures that all signals must be sent and processed sequentially.
// Additionally, we ensure that the modifications happens only after we ensure they are safe;
// since they are always done on the effect run after the state update took place.
final case class WaitingOnUpstream(idx: Int, buffer: Array[Any], cb: CB, s: Subscription)
extends State
final case class WaitingOnUpstream(cb: CB, s: Subscription) extends State
final case class Failed(ex: StreamSubscriberException) extends State
case object Terminal extends State
}
Expand All @@ -333,7 +380,8 @@ private[flow] object StreamSubscriber {
type Input = StreamSubscriber.Input

final case class Subscribe(s: Subscription) extends Input
final case class Next(a: Any) extends Input
final case class InitialNext(a: Any) extends Input
case object CompleteNext extends Input
final case class Error(ex: Throwable) extends Input
final case class Complete(canceled: Boolean) extends Input
final case class Dequeue(cb: CB) extends Input
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ private[flow] final class StreamSubscription[F[_], A] private (
// if we were externally canceled, this is handled below
F.unit
}
.mask

val cancellation = F.asyncCheckAttempt[Unit] { cb =>
F.delay {
Expand Down

0 comments on commit 8d11163

Please sign in to comment.