From 854eb3e17c2f38011b817de3129eae3831bab696 Mon Sep 17 00:00:00 2001 From: adamw Date: Mon, 7 Oct 2024 16:16:15 +0200 Subject: [PATCH] Update to Ox 0.5.0 --- .../websocket/WebSocketNettySyncServer.scala | 26 ++++++------------- project/Versions.scala | 2 +- .../reactivestreams/OxProcessor.scala | 2 +- .../ws/OxSourceWebSocketProcessor.scala | 21 ++++++++------- .../sync/perf/NettySyncServerRunner.scala | 3 ++- 5 files changed, 23 insertions(+), 31 deletions(-) diff --git a/examples/src/main/scala/sttp/tapir/examples/websocket/WebSocketNettySyncServer.scala b/examples/src/main/scala/sttp/tapir/examples/websocket/WebSocketNettySyncServer.scala index ba4386bffa..947f63d6eb 100644 --- a/examples/src/main/scala/sttp/tapir/examples/websocket/WebSocketNettySyncServer.scala +++ b/examples/src/main/scala/sttp/tapir/examples/websocket/WebSocketNettySyncServer.scala @@ -7,14 +7,13 @@ package sttp.tapir.examples.websocket import ox.* import ox.channels.* +import ox.flow.Flow import sttp.capabilities.WebSockets import sttp.tapir.* import sttp.tapir.server.netty.sync.OxStreams import sttp.tapir.server.netty.sync.OxStreams.Pipe import sttp.tapir.server.netty.sync.NettySyncServer -import sttp.ws.WebSocketFrame -import java.util.concurrent.atomic.AtomicBoolean import scala.concurrent.duration.* object WebSocketNettySyncServer: @@ -22,15 +21,7 @@ object WebSocketNettySyncServer: val wsEndpoint = endpoint.get .in("ws") - .out( - webSocketBody[String, CodecFormat.TextPlain, String, CodecFormat.TextPlain](OxStreams) - .concatenateFragmentedFrames(false) // All these options are supported by tapir-netty - .ignorePong(true) - .autoPongOnPing(true) - .decodeCloseRequests(false) - .decodeCloseResponses(false) - .autoPing(Some((10.seconds, WebSocketFrame.Ping("ping-content".getBytes)))) - ) + .out(webSocketBody[String, CodecFormat.TextPlain, String, CodecFormat.TextPlain](OxStreams)) // Your processor transforming a stream of requests into a stream of responses val wsPipe: Pipe[String, String] = requestStream => requestStream.map(_.toUpperCase) @@ -38,17 +29,16 @@ object WebSocketNettySyncServer: // Alternative logic (not used here): requests and responses can be treated separately, for example to emit frames // to the client from another source. val wsPipe2: Pipe[String, String] = { in => - val running = new AtomicBoolean(true) // TODO use https://github.com/softwaremill/ox/issues/209 once available - fork { - in.drain() // read and ignore requests - running.set(false) // stopping the responses - } + val flowLeft: Flow[Either[String, String]] = Flow.fromSource(in).map(Left(_)) // emit periodic responses - Source.tick(1.second).takeWhile(_ => running.get()).map(_ => System.currentTimeMillis()).map(_.toString) + val flowRight: Flow[Either[String, String]] = Flow.tick(1.second).map(_ => System.currentTimeMillis()).map(_.toString).map(Right(_)) + + // ignore whatever is sent by the client (represented as `Left`) + flowLeft.merge(flowRight, propagateDoneLeft = true).collect { case Right(s) => s }.runToChannel() } // The WebSocket endpoint, builds the pipeline in serverLogicSuccess - val wsServerEndpoint = wsEndpoint.handleSuccess(_ => wsPipe2) + val wsServerEndpoint = wsEndpoint.handleSuccess(_ => wsPipe) // A regular /GET endpoint val helloWorldEndpoint = diff --git a/project/Versions.scala b/project/Versions.scala index 018567897b..a42c38e9d9 100644 --- a/project/Versions.scala +++ b/project/Versions.scala @@ -25,7 +25,7 @@ object Versions { val json4s = "4.0.7" val metrics4Scala = "4.3.2" val nettyReactiveStreams = "3.0.2" - val ox = "0.4.0" + val ox = "0.5.0" val reactiveStreams = "1.0.4" val sprayJson = "1.3.6" val scalaCheck = "1.18.1" diff --git a/server/netty-server/sync/src/main/scala/sttp/tapir/server/netty/sync/internal/reactivestreams/OxProcessor.scala b/server/netty-server/sync/src/main/scala/sttp/tapir/server/netty/sync/internal/reactivestreams/OxProcessor.scala index c5a1ba09b5..0f9a784bdd 100644 --- a/server/netty-server/sync/src/main/scala/sttp/tapir/server/netty/sync/internal/reactivestreams/OxProcessor.scala +++ b/server/netty-server/sync/src/main/scala/sttp/tapir/server/netty/sync/internal/reactivestreams/OxProcessor.scala @@ -81,7 +81,7 @@ private[sync] class OxProcessor[A, B]( if (pipelineForkFuture != null) try { val pipelineFork = Await.result(pipelineForkFuture, pipelineCancelationTimeout) oxDispatcher.runAsync { - race( + raceSuccess( { ox.sleep(pipelineCancelationTimeout) logger.error(s"Pipeline fork cancelation did not complete in time ($pipelineCancelationTimeout).") diff --git a/server/netty-server/sync/src/main/scala/sttp/tapir/server/netty/sync/internal/ws/OxSourceWebSocketProcessor.scala b/server/netty-server/sync/src/main/scala/sttp/tapir/server/netty/sync/internal/ws/OxSourceWebSocketProcessor.scala index b66eb99bd4..603c76f00e 100644 --- a/server/netty-server/sync/src/main/scala/sttp/tapir/server/netty/sync/internal/ws/OxSourceWebSocketProcessor.scala +++ b/server/netty-server/sync/src/main/scala/sttp/tapir/server/netty/sync/internal/ws/OxSourceWebSocketProcessor.scala @@ -18,6 +18,7 @@ import java.io.IOException import java.util.concurrent.Semaphore import scala.concurrent.duration.* +import ox.flow.Flow private[sync] object OxSourceWebSocketProcessor: private val logger = LoggerFactory.getLogger(getClass.getName) @@ -37,15 +38,17 @@ private[sync] object OxSourceWebSocketProcessor: val frame2FramePipe: OxStreams.Pipe[NettyWebSocketFrame, NettyWebSocketFrame] = ox ?=> val closeSignal = new Semaphore(0) (incoming: Source[NettyWebSocketFrame]) => - val outgoing = incoming - .mapAsView { f => + val outgoing = Flow + .fromSource(incoming) + .map { f => val sttpFrame = nettyFrameToFrame(f) f.release() sttpFrame } .pipe(takeUntilCloseFrame(passAlongCloseFrame = o.decodeCloseRequests, closeSignal)) .pipe(optionallyConcatenateFrames(o.concatenateFragmentedFrames)) - .mapAsView(decodeFrame) + .map(decodeFrame) + .runToChannel() .pipe(processingPipe) .mapAsView(r => frameToNettyFrame(o.responses.encode(r))) @@ -76,14 +79,12 @@ private[sync] object OxSourceWebSocketProcessor: new OxProcessor(oxDispatcher, frame2FramePipe, wrapSubscriberWithNettyCallback) end apply - private def optionallyConcatenateFrames(doConcatenate: Boolean)(s: Source[WebSocketFrame])(using Ox): Source[WebSocketFrame] = - if doConcatenate then s.mapStateful(() => None: Accumulator)(accumulateFrameState).collectAsView { case Some(f: WebSocketFrame) => f } - else s + private def optionallyConcatenateFrames(doConcatenate: Boolean)(f: Flow[WebSocketFrame]): Flow[WebSocketFrame] = + if doConcatenate then f.mapStateful(() => None: Accumulator)(accumulateFrameState).collect { case Some(f: WebSocketFrame) => f } + else f - private def takeUntilCloseFrame(passAlongCloseFrame: Boolean, closeSignal: Semaphore)( - s: Source[WebSocketFrame] - )(using Ox): Source[WebSocketFrame] = - s.takeWhile( + private def takeUntilCloseFrame(passAlongCloseFrame: Boolean, closeSignal: Semaphore)(f: Flow[WebSocketFrame]): Flow[WebSocketFrame] = + f.takeWhile( { case _: WebSocketFrame.Close => closeSignal.release(); false case _ => true diff --git a/server/netty-server/sync/src/test/scala/sttp/tapir/server/netty/sync/perf/NettySyncServerRunner.scala b/server/netty-server/sync/src/test/scala/sttp/tapir/server/netty/sync/perf/NettySyncServerRunner.scala index b020b85df1..b42bb5bebd 100644 --- a/server/netty-server/sync/src/test/scala/sttp/tapir/server/netty/sync/perf/NettySyncServerRunner.scala +++ b/server/netty-server/sync/src/test/scala/sttp/tapir/server/netty/sync/perf/NettySyncServerRunner.scala @@ -2,6 +2,7 @@ package sttp.tapir.server.netty.sync.perf import ox.* import ox.channels.* +import ox.flow.Flow import sttp.shared.Identity import sttp.tapir.server.netty.sync.NettySyncServerOptions import sttp.tapir.server.netty.sync.NettySyncServerBinding @@ -61,7 +62,7 @@ object NettySyncServerRunner { fork { in.drain() } - Source.tick(WebSocketSingleResponseLag).map(_ => System.currentTimeMillis()) + Flow.tick(WebSocketSingleResponseLag).map(_ => System.currentTimeMillis()).runToChannel() } val wsEndpoint: Endpoint[Unit, Unit, Unit, OxStreams.Pipe[Long, Long], OxStreams with WebSockets] = wsBaseEndpoint