Skip to content

Commit

Permalink
Update ox to 0.5.1
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw committed Oct 15, 2024
1 parent d374fe0 commit e75173f
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 47 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ val zio2Version = "2.1.11"
val zio1InteropRsVersion = "1.3.12"
val zio2InteropRsVersion = "2.0.2"

val oxVersion = "0.4.0"
val oxVersion = "0.5.1"
val sttpModelVersion = "1.7.11"
val sttpSharedVersion = "1.3.22"

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package sttp.client4.impl.ox.sse

import ox.*
import ox.channels.Source
import ox.flow.Flow
import sttp.model.sse.ServerSentEvent

import java.io.InputStream

object OxServerSentEvents:
def parse(is: InputStream)(using Ox): Source[ServerSentEvent] =
Source
def parse(is: InputStream): Flow[ServerSentEvent] =
Flow
.fromInputStream(is)
.linesUtf8
.mapStatefulConcat(() => List.empty[String])(
Expand All @@ -18,5 +17,5 @@ object OxServerSentEvents:
else None
}
)
.filterAsView(_.nonEmpty)
.filter(_.nonEmpty)
.map(ServerSentEvent.parse)
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,23 @@ import sttp.client4.ws.SyncWebSocket
import sttp.ws.WebSocketFrame

import scala.util.control.NonFatal
import ox.flow.Flow

/** Converts a [[SyncWebSocket]] into a pair of `Source` of server responses and a `Sink` for client requests. The
/** Converts a [[SyncWebSocket]] into a pair of [[Source]] of server responses and a [[Sink]] for client requests. The
* `Source` starts receiving frames immediately, its internal buffer size can be adjusted with an implicit
* [[ox.channels.StageCapacity]]. Make sure that the `Source` is contiunually read. This will guarantee that
* server-side Close signal is received and handled. If you don't want to process frames from the server, you can at
* [[ox.channels.BufferCapacity]]. Make sure that the `Source` is contiunually read. This will guarantee that
* server-side close signal is received and handled. If you don't want to process frames from the server, you can at
* least handle it with a `fork { source.drain() }`.
*
* You don't need to manually call `ws.close()` when using this approach, this will be handled automatically
* underneath, according to following rules:
* - If the request `Sink` is closed due to an upstream error, a Close frame is sent, and the `Source` with incoming
* responses gets completed as `Done`.
* - If the request `Sink` completes as `Done`, a `Close` frame is sent, and the response `Sink` keeps receiving
* - If the request sink is closed due to an upstream error, a close frame is sent. The response sink keeps receiving
* responses, until the enclosing [[Ox]] scope ends (that is controlled by the caller). When this happens, the fork
* which populates the response channel will be interrupted.
* - If the request sink completes as done, a close frame is sent. As above, the response sink keeps receiving
* responses until the server closes communication.
* - If the response `Source` is closed by a Close frome from the server or due to an error, the request Sink is
* closed as `Done`, which will still send all outstanding buffered frames, and then finish.
* - If the response source is closed by a close frame from the server or due to an error, the request sink is closed
* as done. This will attempt to send all outstanding buffered frames, unless the enclosing scope ends beforehand).
*
* @param ws
* a `SyncWebSocket` where the underlying `Sink` will send requests, and where the `Source` will pull responses from.
Expand All @@ -29,38 +31,30 @@ import scala.util.control.NonFatal
*/
def asSourceAndSink(ws: SyncWebSocket, concatenateFragmented: Boolean = true)(using
Ox,
StageCapacity
BufferCapacity
): (Source[WebSocketFrame], Sink[WebSocketFrame]) =
val requestsChannel = StageCapacity.newChannel[WebSocketFrame]
val responsesChannel = StageCapacity.newChannel[WebSocketFrame]
fork {
try
val requestsChannel = BufferCapacity.newChannel[WebSocketFrame]

val responsesChannel = Flow
.usingEmit[WebSocketFrame] { emit =>
repeatWhile {
ws.receive() match
case frame: WebSocketFrame.Data[_] =>
responsesChannel.sendOrClosed(frame) match
case _: ChannelClosed => false
case _ => true
case WebSocketFrame.Close(status, msg) if status > 1001 =>
responsesChannel.errorOrClosed(new WebSocketClosedWithError(status, msg)).discard
false
case _: WebSocketFrame.Close =>
responsesChannel.doneOrClosed().discard
false
case frame: WebSocketFrame.Data[_] => emit(frame); true
case WebSocketFrame.Close(status, msg) if status > 1001 => throw new WebSocketClosedWithError(status, msg)
case _: WebSocketFrame.Close => false
case ping: WebSocketFrame.Ping =>
requestsChannel.sendOrClosed(WebSocketFrame.Pong(ping.payload)).discard
// Keep receiving even if pong couldn't be send due to closed request channel. We want to process
// Keep receiving even if pong couldn't be sent due to closed request channel. We want to process
// whatever responses there are still coming from the server until it signals the end with a Close frome.
true
case _: WebSocketFrame.Pong =>
// ignore pongs
true
}
catch
case NonFatal(err) =>
responsesChannel.errorOrClosed(err).discard
finally requestsChannel.doneOrClosed().discard
}.discard
}
.pipe(optionallyConcatenateFrames(_, concatenateFragmented))
.onComplete(requestsChannel.doneOrClosed().discard)
.runToChannel()

fork {
try
Expand All @@ -78,7 +72,7 @@ def asSourceAndSink(ws: SyncWebSocket, concatenateFragmented: Boolean = true)(us
case ChannelClosed.Error(err) =>
// There's no proper "client error" status. Statuses 4000+ are available for custom cases
ws.send(WebSocketFrame.Close(4000, "Client error"))
responsesChannel.doneOrClosed().discard
// Assuming the responsesChannel fork will get interrupted because the enclosing scope will end
false
}
catch
Expand All @@ -87,17 +81,17 @@ def asSourceAndSink(ws: SyncWebSocket, concatenateFragmented: Boolean = true)(us
if (!responsesChannel.isClosedForReceive) requestsChannel.errorOrClosed(err).discard
}.discard

(optionallyConcatenateFrames(responsesChannel, concatenateFragmented), requestsChannel)
(responsesChannel, requestsChannel)

final case class WebSocketClosedWithError(statusCode: Int, msg: String)
extends Exception(s"WebSocket closed with status $statusCode: $msg")

private def optionallyConcatenateFrames(s: Source[WebSocketFrame], doConcatenate: Boolean)(using
private def optionallyConcatenateFrames(f: Flow[WebSocketFrame], doConcatenate: Boolean)(using
Ox
): Source[WebSocketFrame] =
): Flow[WebSocketFrame] =
if doConcatenate then
type Accumulator = Option[Either[Array[Byte], String]]
s.mapStateful(() => None: Accumulator) {
f.mapStateful(() => None: Accumulator) {
case (None, f: WebSocketFrame.Ping) => (None, Some(f))
case (None, f: WebSocketFrame.Pong) => (None, Some(f))
case (None, f: WebSocketFrame.Close) => (None, Some(f))
Expand All @@ -115,5 +109,5 @@ private def optionallyConcatenateFrames(s: Source[WebSocketFrame], doConcatenate
s"Unexpected WebSocket frame received during concatenation. Frame received: ${f.getClass
.getSimpleName()}, accumulator type: ${acc.map(_.getClass.getSimpleName)}"
)
}.collectAsView { case Some(f: WebSocketFrame) => f }
else s
}.collect { case Some(f: WebSocketFrame) => f }
else f
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package sttp.client4.impl.ox.sse
import org.scalatest.BeforeAndAfterAll
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import ox.*
import sttp.client4.*
import sttp.client4.testing.HttpTest.*
import sttp.model.sse.ServerSentEvent
Expand All @@ -14,7 +13,7 @@ class OxServerSentEventsTest extends AnyFlatSpec with Matchers with BeforeAndAft

behavior of "OxServerSentEvents"

it should "parse SSEs" in supervised {
it should "parse SSEs" in {
val sseData = "text1 in line1\ntext2 in line2"
val expectedEvent = ServerSentEvent(data = Some(sseData), eventType = Some("test-event"), retry = Some(42000))
val expectedEvents =
Expand All @@ -23,7 +22,7 @@ class OxServerSentEventsTest extends AnyFlatSpec with Matchers with BeforeAndAft
.post(uri"$endpoint/sse/echo3")
.body(sseData)
.response(asInputStreamAlways { is =>
OxServerSentEvents.parse(is).take(3).toList shouldBe expectedEvents
OxServerSentEvents.parse(is).take(3).runToList() shouldBe expectedEvents
()
})
.send(backend)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import org.scalatest.concurrent.Eventually
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import ox.*
import ox.flow.Flow
import ox.channels.ChannelClosed
import ox.channels.Sink
import ox.channels.Source
Expand Down Expand Up @@ -62,7 +63,7 @@ class OxWebSocketTest extends AnyFlatSpec with BeforeAndAfterAll with Matchers w
val (wsSource, wsSink) = asSourceAndSink(ws)
wsSink.send(WebSocketFrame.text("test1"))
wsSink.error(new Exception("failed source"))
eventually(wsSource.isClosedForReceiveDetail shouldBe Some(ChannelClosed.Done))
eventually(wsSource.isClosedForReceiveDetail should matchPattern { case Some(ChannelClosed.Error(_)) => })
})
.send(backend)
}
Expand Down Expand Up @@ -117,7 +118,7 @@ class OxWebSocketTest extends AnyFlatSpec with BeforeAndAfterAll with Matchers w
.response(asWebSocket { ws =>
val (wsSource, wsSink) = asSourceAndSink(ws, concatenateFragmented = false)
sendText(wsSink, 1)
wsSource.take(3).toList shouldBe List(
Flow.fromSource(wsSource).take(3).runToList() shouldBe List(
WebSocketFrame.Text("echo: ", false, None),
WebSocketFrame.Text("test1", false, None),
WebSocketFrame.Text("", true, None)
Expand Down

0 comments on commit e75173f

Please sign in to comment.