Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optmize interop.flow.StreamSubscriber.onNext #3387

Merged
merged 6 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions benchmark/src/main/scala/fs2/benchmark/FlowInteropBenchmark.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright (c) 2013 Functional Streams for Scala
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package fs2
package benchmark

import cats.effect.IO
import cats.effect.unsafe.implicits.global

import org.openjdk.jmh.annotations.{
Benchmark,
BenchmarkMode,
Mode,
OutputTimeUnit,
Param,
Scope,
State
}

import java.util.concurrent.TimeUnit
import java.util.concurrent.Flow.{Publisher, Subscriber, Subscription}

@State(Scope.Thread)
@BenchmarkMode(Array(Mode.Throughput))
@OutputTimeUnit(TimeUnit.SECONDS)
class FlowInteropBenchmark {
@Param(Array("1024", "5120", "10240", "51200", "512000"))
var totalElements: Long = _

@Param(Array("1000"))
var iterations: Int = _

@Benchmark
def fastPublisher(): Unit = {
def publisher =
new Publisher[Unit] {
override final def subscribe(subscriber: Subscriber[? >: Unit]): Unit =
subscriber.onSubscribe(
new Subscription {
var i: Long = 0
@volatile var canceled: Boolean = false

// Sequential fast Publisher.
override final def request(n: Long): Unit = {
val elementsToProduce = math.min(i + n, totalElements)

while (i < elementsToProduce) {
subscriber.onNext(())
i += 1
}

if (i == totalElements || canceled) {
subscriber.onComplete()
}
}

override final def cancel(): Unit =
canceled = true
}
)
}

val stream =
interop.flow.fromPublisher[IO](publisher, chunkSize = 512)

val program =
stream.compile.drain

program.replicateA_(iterations).unsafeRunSync()
}
}
18 changes: 17 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,23 @@ ThisBuild / mimaBinaryIssueFilters ++= Seq(
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.hash.createHash"),
ProblemFilters.exclude[MissingClassProblem]("fs2.hash$Hash"),
ProblemFilters.exclude[MissingFieldProblem]("fs2.hash.openssl"),
ProblemFilters.exclude[MissingClassProblem]("fs2.hash$openssl$")
ProblemFilters.exclude[MissingClassProblem]("fs2.hash$openssl$"),
// Privates: #3387
ProblemFilters.exclude[MissingClassProblem](
"fs2.interop.flow.StreamSubscriber$Input$Next"
),
ProblemFilters.exclude[MissingClassProblem](
"fs2.interop.flow.StreamSubscriber$Input$Next$"
),
ProblemFilters.exclude[MissingFieldProblem](
"fs2.interop.flow.StreamSubscriber#Input.Next"
),
ProblemFilters.exclude[Problem](
"fs2.interop.flow.StreamSubscriber#State#WaitingOnUpstream.*"
),
ProblemFilters.exclude[MissingTypesProblem](
"fs2.interop.flow.StreamSubscriber$State$WaitingOnUpstream$"
)
)

lazy val root = tlCrossRootProject
Expand Down
126 changes: 87 additions & 39 deletions core/shared/src/main/scala/fs2/interop/flow/StreamSubscriber.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,36 @@ private[flow] final class StreamSubscriber[F[_], A] private (
nextState(input = Subscribe(subscription))
}

/** OnNext state.
* This is concurrent unsafe,
* however the reactive-streams specification demands that these operations are called serially:
* https://github.com/reactive-streams/reactive-streams-jvm?tab=readme-ov-file#1.3
* Additionally, we ensure that the modifications happens only after we ensure they are safe;
* since they are always done on the effect run after the state update took place.
* Meaning this should be correct if the Producer is well-behaved.
*/
private var inOnNextLoop: Boolean = _
private var buffer: Array[Any] = null
private var index: Int = _

/** Receives the next record from the upstream reactive-streams system. */
override final def onNext(a: A): Unit = {
requireNonNull(
a,
"The element provided to onNext must not be null"
)
nextState(input = Next(a))

// Optimized onNext loop.
if (inOnNextLoop) {
// If we are here, we can assume the array is properly initialized.
buffer(index) = a
index += 1
if (index == chunkSize) {
nextState(input = CompleteNext)
}
} else {
nextState(input = InitialNext(a))
}
}

/** Called by the upstream reactive-streams system when it fails. */
Expand Down Expand Up @@ -119,7 +142,7 @@ private[flow] final class StreamSubscriber[F[_], A] private (
Idle(s) -> noop

case Uninitialized(Some(cb)) =>
WaitingOnUpstream(idx = 0, buffer = null, cb, s) -> run {
WaitingOnUpstream(cb, s) -> run {
s.request(chunkSize.toLong)
}

Expand All @@ -129,30 +152,22 @@ private[flow] final class StreamSubscriber[F[_], A] private (
}
}

case Next(a) => {
case WaitingOnUpstream(idx, buffer, cb, s) =>
val newIdx = idx + 1

case InitialNext(a) => {
case state @ WaitingOnUpstream(cb, s) =>
if (chunkSize == 1) {
// Optimization for when the chunkSize is 1.
Idle(s) -> run {
cb.apply(Right(Some(Chunk.singleton(a))))
}
} else if (idx == 0) {
val newBuffer = new Array[Any](chunkSize)
WaitingOnUpstream(newIdx, newBuffer, cb, s) -> run {
// We do the update here, to ensure it happens after we have secured access to the index.
newBuffer.update(idx, a)
}
} else if (newIdx == chunkSize) {
Idle(s) -> run {
// We do the update here, to ensure it happens after we have secured access to the index.
buffer.update(idx, a)
cb.apply(Right(Some(Chunk.array(buffer))))
}
} else {
WaitingOnUpstream(newIdx, buffer, cb, s) -> run {
// We do the update here, to ensure it happens after we have secured access to the index.
buffer.update(idx, a)
// We start the onNext tight loop.
state -> run {
// We do the updates here,
// to ensure they happen after we have secured the state.
inOnNextLoop = true
index = 1
buffer = new Array(chunkSize)
buffer(0) = a
}
}

Expand All @@ -168,15 +183,42 @@ private[flow] final class StreamSubscriber[F[_], A] private (
Failed(new InvalidStateException(operation = s"Received record [${a}]", state)) -> noop
}

case CompleteNext => {
case WaitingOnUpstream(cb, s) =>
Idle(s) -> run {
// We do the updates here,
// to ensure they happen after we have secured the state.
cb.apply(Right(Some(Chunk.array(buffer))))
inOnNextLoop = false
buffer = null
}

case state =>
Failed(
new InvalidStateException(operation = s"Received record [${buffer.last}]", state)
) -> run {
inOnNextLoop = false
buffer = null
}
}

case Error(ex) => {
case Uninitialized(Some(cb)) =>
Terminal -> run {
cb.apply(Left(ex))
// We do the updates here,
// to ensure they happen after we have secured the state.
inOnNextLoop = false
buffer = null
}

case WaitingOnUpstream(_, _, cb, _) =>
case WaitingOnUpstream(cb, _) =>
Terminal -> run {
cb.apply(Left(ex))
// We do the updates here,
// to ensure they happen after we have secured the state.
inOnNextLoop = false
buffer = null
}

case _ =>
Expand All @@ -196,16 +238,19 @@ private[flow] final class StreamSubscriber[F[_], A] private (
}
}

case WaitingOnUpstream(idx, buffer, cb, s) =>
case WaitingOnUpstream(cb, s) =>
Terminal -> run {
if (idx == 0) {
cb.apply(Right(None))
} else {
cb.apply(Right(Some(Chunk.array(buffer, offset = 0, length = idx))))
}

if (canceled) {
s.cancel()
cb.apply(Right(None))
} else if (index == 0) {
cb.apply(Right(None))
} else {
cb.apply(Right(Some(Chunk.array(buffer, offset = 0, length = index))))
// We do the updates here,
// to ensure they happen after we have secured the state.
inOnNextLoop = false
buffer = null
}
}

Expand All @@ -221,8 +266,12 @@ private[flow] final class StreamSubscriber[F[_], A] private (
Uninitialized(Some(cb)) -> noop

case Idle(s) =>
WaitingOnUpstream(idx = 0, buffer = null, cb, s) -> run {
WaitingOnUpstream(cb, s) -> run {
s.request(chunkSize.toLong)
// We do the updates here,
// to ensure they happen after we have secured the state.
inOnNextLoop = false
index = 0
}

case state @ Uninitialized(Some(otherCB)) =>
Expand All @@ -232,13 +281,16 @@ private[flow] final class StreamSubscriber[F[_], A] private (
cb.apply(ex)
}

case state @ WaitingOnUpstream(_, _, otherCB, s) =>
case state @ WaitingOnUpstream(otherCB, s) =>
Terminal -> run {
s.cancel()

val ex = Left(new InvalidStateException(operation = "Received request", state))
otherCB.apply(ex)
cb.apply(ex)
// We do the updates here,
// to ensure they happen after we have secured the state.
inOnNextLoop = false
buffer = null
}

case Failed(ex) =>
Expand Down Expand Up @@ -318,12 +370,7 @@ private[flow] object StreamSubscriber {

final case class Uninitialized(cb: Option[CB]) extends State
final case class Idle(s: Subscription) extends State
// Having an Array inside the state is fine,
// because the reactive streams spec ensures that all signals must be sent and processed sequentially.
// Additionally, we ensure that the modifications happens only after we ensure they are safe;
// since they are always done on the effect run after the state update took place.
final case class WaitingOnUpstream(idx: Int, buffer: Array[Any], cb: CB, s: Subscription)
extends State
final case class WaitingOnUpstream(cb: CB, s: Subscription) extends State
final case class Failed(ex: StreamSubscriberException) extends State
case object Terminal extends State
}
Expand All @@ -333,7 +380,8 @@ private[flow] object StreamSubscriber {
type Input = StreamSubscriber.Input

final case class Subscribe(s: Subscription) extends Input
final case class Next(a: Any) extends Input
final case class InitialNext(a: Any) extends Input
case object CompleteNext extends Input
final case class Error(ex: Throwable) extends Input
final case class Complete(canceled: Boolean) extends Input
final case class Dequeue(cb: CB) extends Input
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ private[flow] final class StreamSubscription[F[_], A] private (
// if we were externally canceled, this is handled below
F.unit
}
.mask
Copy link
Contributor Author

@BalmungSan BalmungSan Feb 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to #3384


val cancellation = F.asyncCheckAttempt[Unit] { cb =>
F.delay {
Expand Down