diff --git a/benchmark/src/main/scala/fs2/benchmark/FlowInteropBenchmark.scala b/benchmark/src/main/scala/fs2/benchmark/FlowInteropBenchmark.scala new file mode 100644 index 0000000000..57bc9af329 --- /dev/null +++ b/benchmark/src/main/scala/fs2/benchmark/FlowInteropBenchmark.scala @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2013 Functional Streams for Scala + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2 +package benchmark + +import cats.effect.IO +import cats.effect.unsafe.implicits.global + +import org.openjdk.jmh.annotations.{ + Benchmark, + BenchmarkMode, + Mode, + OutputTimeUnit, + Param, + Scope, + State +} + +import java.util.concurrent.TimeUnit +import java.util.concurrent.Flow.{Publisher, Subscriber, Subscription} + +@State(Scope.Thread) +@BenchmarkMode(Array(Mode.Throughput)) +@OutputTimeUnit(TimeUnit.SECONDS) +class FlowInteropBenchmark { + @Param(Array("1024", "5120", "10240", "51200", "512000")) + var totalElements: Long = _ + + @Param(Array("1000")) + var iterations: Int = _ + + @Benchmark + def fastPublisher(): Unit = { + def publisher = + new Publisher[Unit] { + override final def subscribe(subscriber: Subscriber[? >: Unit]): Unit = + subscriber.onSubscribe( + new Subscription { + var i: Long = 0 + @volatile var canceled: Boolean = false + + // Sequential fast Publisher. + override final def request(n: Long): Unit = { + val elementsToProduce = math.min(i + n, totalElements) + + while (i < elementsToProduce) { + subscriber.onNext(()) + i += 1 + } + + if (i == totalElements || canceled) { + subscriber.onComplete() + } + } + + override final def cancel(): Unit = + canceled = true + } + ) + } + + val stream = + interop.flow.fromPublisher[IO](publisher, chunkSize = 512) + + val program = + stream.compile.drain + + program.replicateA_(iterations).unsafeRunSync() + } +} diff --git a/build.sbt b/build.sbt index a4dd23e861..e16f57682a 100644 --- a/build.sbt +++ b/build.sbt @@ -256,7 +256,23 @@ ThisBuild / mimaBinaryIssueFilters ++= Seq( ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.hash.createHash"), ProblemFilters.exclude[MissingClassProblem]("fs2.hash$Hash"), ProblemFilters.exclude[MissingFieldProblem]("fs2.hash.openssl"), - ProblemFilters.exclude[MissingClassProblem]("fs2.hash$openssl$") + ProblemFilters.exclude[MissingClassProblem]("fs2.hash$openssl$"), + // Privates: #3387 + ProblemFilters.exclude[MissingClassProblem]( + "fs2.interop.flow.StreamSubscriber$Input$Next" + ), + ProblemFilters.exclude[MissingClassProblem]( + "fs2.interop.flow.StreamSubscriber$Input$Next$" + ), + ProblemFilters.exclude[MissingFieldProblem]( + "fs2.interop.flow.StreamSubscriber#Input.Next" + ), + ProblemFilters.exclude[Problem]( + "fs2.interop.flow.StreamSubscriber#State#WaitingOnUpstream.*" + ), + ProblemFilters.exclude[MissingTypesProblem]( + "fs2.interop.flow.StreamSubscriber$State$WaitingOnUpstream$" + ) ) lazy val root = tlCrossRootProject diff --git a/core/shared/src/main/scala/fs2/interop/flow/StreamSubscriber.scala b/core/shared/src/main/scala/fs2/interop/flow/StreamSubscriber.scala index f0a4dfb588..4212bcb94a 100644 --- a/core/shared/src/main/scala/fs2/interop/flow/StreamSubscriber.scala +++ b/core/shared/src/main/scala/fs2/interop/flow/StreamSubscriber.scala @@ -58,13 +58,36 @@ private[flow] final class StreamSubscriber[F[_], A] private ( nextState(input = Subscribe(subscription)) } + /** OnNext state. + * This is concurrent unsafe, + * however the reactive-streams specification demands that these operations are called serially: + * https://github.com/reactive-streams/reactive-streams-jvm?tab=readme-ov-file#1.3 + * Additionally, we ensure that the modifications happens only after we ensure they are safe; + * since they are always done on the effect run after the state update took place. + * Meaning this should be correct if the Producer is well-behaved. + */ + private var inOnNextLoop: Boolean = _ + private var buffer: Array[Any] = null + private var index: Int = _ + /** Receives the next record from the upstream reactive-streams system. */ override final def onNext(a: A): Unit = { requireNonNull( a, "The element provided to onNext must not be null" ) - nextState(input = Next(a)) + + // Optimized onNext loop. + if (inOnNextLoop) { + // If we are here, we can assume the array is properly initialized. + buffer(index) = a + index += 1 + if (index == chunkSize) { + nextState(input = CompleteNext) + } + } else { + nextState(input = InitialNext(a)) + } } /** Called by the upstream reactive-streams system when it fails. */ @@ -119,7 +142,7 @@ private[flow] final class StreamSubscriber[F[_], A] private ( Idle(s) -> noop case Uninitialized(Some(cb)) => - WaitingOnUpstream(idx = 0, buffer = null, cb, s) -> run { + WaitingOnUpstream(cb, s) -> run { s.request(chunkSize.toLong) } @@ -129,30 +152,22 @@ private[flow] final class StreamSubscriber[F[_], A] private ( } } - case Next(a) => { - case WaitingOnUpstream(idx, buffer, cb, s) => - val newIdx = idx + 1 - + case InitialNext(a) => { + case state @ WaitingOnUpstream(cb, s) => if (chunkSize == 1) { + // Optimization for when the chunkSize is 1. Idle(s) -> run { cb.apply(Right(Some(Chunk.singleton(a)))) } - } else if (idx == 0) { - val newBuffer = new Array[Any](chunkSize) - WaitingOnUpstream(newIdx, newBuffer, cb, s) -> run { - // We do the update here, to ensure it happens after we have secured access to the index. - newBuffer.update(idx, a) - } - } else if (newIdx == chunkSize) { - Idle(s) -> run { - // We do the update here, to ensure it happens after we have secured access to the index. - buffer.update(idx, a) - cb.apply(Right(Some(Chunk.array(buffer)))) - } } else { - WaitingOnUpstream(newIdx, buffer, cb, s) -> run { - // We do the update here, to ensure it happens after we have secured access to the index. - buffer.update(idx, a) + // We start the onNext tight loop. + state -> run { + // We do the updates here, + // to ensure they happen after we have secured the state. + inOnNextLoop = true + index = 1 + buffer = new Array(chunkSize) + buffer(0) = a } } @@ -168,15 +183,42 @@ private[flow] final class StreamSubscriber[F[_], A] private ( Failed(new InvalidStateException(operation = s"Received record [${a}]", state)) -> noop } + case CompleteNext => { + case WaitingOnUpstream(cb, s) => + Idle(s) -> run { + // We do the updates here, + // to ensure they happen after we have secured the state. + cb.apply(Right(Some(Chunk.array(buffer)))) + inOnNextLoop = false + buffer = null + } + + case state => + Failed( + new InvalidStateException(operation = s"Received record [${buffer.last}]", state) + ) -> run { + inOnNextLoop = false + buffer = null + } + } + case Error(ex) => { case Uninitialized(Some(cb)) => Terminal -> run { cb.apply(Left(ex)) + // We do the updates here, + // to ensure they happen after we have secured the state. + inOnNextLoop = false + buffer = null } - case WaitingOnUpstream(_, _, cb, _) => + case WaitingOnUpstream(cb, _) => Terminal -> run { cb.apply(Left(ex)) + // We do the updates here, + // to ensure they happen after we have secured the state. + inOnNextLoop = false + buffer = null } case _ => @@ -196,16 +238,19 @@ private[flow] final class StreamSubscriber[F[_], A] private ( } } - case WaitingOnUpstream(idx, buffer, cb, s) => + case WaitingOnUpstream(cb, s) => Terminal -> run { - if (idx == 0) { - cb.apply(Right(None)) - } else { - cb.apply(Right(Some(Chunk.array(buffer, offset = 0, length = idx)))) - } - if (canceled) { s.cancel() + cb.apply(Right(None)) + } else if (index == 0) { + cb.apply(Right(None)) + } else { + cb.apply(Right(Some(Chunk.array(buffer, offset = 0, length = index)))) + // We do the updates here, + // to ensure they happen after we have secured the state. + inOnNextLoop = false + buffer = null } } @@ -221,8 +266,12 @@ private[flow] final class StreamSubscriber[F[_], A] private ( Uninitialized(Some(cb)) -> noop case Idle(s) => - WaitingOnUpstream(idx = 0, buffer = null, cb, s) -> run { + WaitingOnUpstream(cb, s) -> run { s.request(chunkSize.toLong) + // We do the updates here, + // to ensure they happen after we have secured the state. + inOnNextLoop = false + index = 0 } case state @ Uninitialized(Some(otherCB)) => @@ -232,13 +281,16 @@ private[flow] final class StreamSubscriber[F[_], A] private ( cb.apply(ex) } - case state @ WaitingOnUpstream(_, _, otherCB, s) => + case state @ WaitingOnUpstream(otherCB, s) => Terminal -> run { s.cancel() - val ex = Left(new InvalidStateException(operation = "Received request", state)) otherCB.apply(ex) cb.apply(ex) + // We do the updates here, + // to ensure they happen after we have secured the state. + inOnNextLoop = false + buffer = null } case Failed(ex) => @@ -318,12 +370,7 @@ private[flow] object StreamSubscriber { final case class Uninitialized(cb: Option[CB]) extends State final case class Idle(s: Subscription) extends State - // Having an Array inside the state is fine, - // because the reactive streams spec ensures that all signals must be sent and processed sequentially. - // Additionally, we ensure that the modifications happens only after we ensure they are safe; - // since they are always done on the effect run after the state update took place. - final case class WaitingOnUpstream(idx: Int, buffer: Array[Any], cb: CB, s: Subscription) - extends State + final case class WaitingOnUpstream(cb: CB, s: Subscription) extends State final case class Failed(ex: StreamSubscriberException) extends State case object Terminal extends State } @@ -333,7 +380,8 @@ private[flow] object StreamSubscriber { type Input = StreamSubscriber.Input final case class Subscribe(s: Subscription) extends Input - final case class Next(a: Any) extends Input + final case class InitialNext(a: Any) extends Input + case object CompleteNext extends Input final case class Error(ex: Throwable) extends Input final case class Complete(canceled: Boolean) extends Input final case class Dequeue(cb: CB) extends Input diff --git a/core/shared/src/main/scala/fs2/interop/flow/StreamSubscription.scala b/core/shared/src/main/scala/fs2/interop/flow/StreamSubscription.scala index a076752b73..5e6ef43120 100644 --- a/core/shared/src/main/scala/fs2/interop/flow/StreamSubscription.scala +++ b/core/shared/src/main/scala/fs2/interop/flow/StreamSubscription.scala @@ -109,6 +109,7 @@ private[flow] final class StreamSubscription[F[_], A] private ( // if we were externally canceled, this is handled below F.unit } + .mask val cancellation = F.asyncCheckAttempt[Unit] { cb => F.delay {