From d8bef067b4081667de635896b02bfa2a6fb95e6d Mon Sep 17 00:00:00 2001 From: adamw Date: Thu, 27 Jul 2023 15:12:19 +0200 Subject: [PATCH] Introduce a synchronous web socket interface --- .../src/main/scala/sttp/client4/SttpApi.scala | 68 --------- .../sttp/client4/SttpWebSocketAsyncApi.scala | 41 +++++ .../sttp/client4/SttpWebSocketStreamApi.scala | 37 +++++ .../sttp/client4/SttpWebSocketSyncApi.scala | 44 ++++++ .../scala/sttp/client4/ws/SyncWebSocket.scala | 140 ++++++++++++++++++ .../main/scala/sttp/client4/ws/package.scala | 7 + .../scalajs/sttp/client4/SttpExtensions.scala | 2 +- .../sttp/client4/SttpExtensions.scala | 10 +- .../sttp/client4/SttpExtensions.scala | 2 +- .../client4/testing/BackendStubTests.scala | 1 + .../WebSocketBufferOverflowTest.scala | 1 + .../websocket/WebSocketConcurrentTest.scala | 1 + .../websocket/WebSocketStreamingTest.scala | 1 + .../testing/websocket/WebSocketTest.scala | 1 + .../client4/examples/WebSocketMonix.scala | 1 + .../client4/examples/WebSocketTesting.scala | 1 + .../sttp/client4/examples/WebSocketAkka.scala | 1 + .../client4/examples/WebSocketStreamFs2.scala | 1 + .../examples/WebSocketSynchronous.scala | 11 +- .../sttp/client4/examples/WebSocketZio.scala | 1 + .../okhttp/OkHttpSyncWebSocketTest.scala | 7 +- 21 files changed, 295 insertions(+), 84 deletions(-) create mode 100644 core/src/main/scala/sttp/client4/SttpWebSocketAsyncApi.scala create mode 100644 core/src/main/scala/sttp/client4/SttpWebSocketStreamApi.scala create mode 100644 core/src/main/scala/sttp/client4/SttpWebSocketSyncApi.scala create mode 100644 core/src/main/scala/sttp/client4/ws/SyncWebSocket.scala create mode 100644 core/src/main/scala/sttp/client4/ws/package.scala diff --git a/core/src/main/scala/sttp/client4/SttpApi.scala b/core/src/main/scala/sttp/client4/SttpApi.scala index a043972527..8e06ce9070 100644 --- a/core/src/main/scala/sttp/client4/SttpApi.scala +++ b/core/src/main/scala/sttp/client4/SttpApi.scala @@ -255,72 +255,4 @@ trait SttpApi extends SttpExtensions with UriInterpolator { StreamBody(s)(b), contentType = Some(MediaType.ApplicationOctetStream) ) - - // websocket response specifications - - def asWebSocket[F[_], T](f: WebSocket[F] => F[T]): WebSocketResponseAs[F, Either[String, T]] = - asWebSocketEither(asStringAlways, asWebSocketAlways(f)) - - def asWebSocketWithMetadata[F[_], T]( - f: (WebSocket[F], ResponseMetadata) => F[T] - ): WebSocketResponseAs[F, Either[String, T]] = - asWebSocketEither(asStringAlways, asWebSocketAlwaysWithMetadata(f)) - - def asWebSocketAlways[F[_], T](f: WebSocket[F] => F[T]): WebSocketResponseAs[F, T] = - asWebSocketAlwaysWithMetadata((w, _) => f(w)) - - def asWebSocketAlwaysWithMetadata[F[_], T](f: (WebSocket[F], ResponseMetadata) => F[T]): WebSocketResponseAs[F, T] = - WebSocketResponseAs(ResponseAsWebSocket(f)) - - def asWebSocketUnsafe[F[_]]: WebSocketResponseAs[F, Either[String, WebSocket[F]]] = - asWebSocketEither(asStringAlways, asWebSocketAlwaysUnsafe) - - def asWebSocketAlwaysUnsafe[F[_]]: WebSocketResponseAs[F, WebSocket[F]] = - WebSocketResponseAs(ResponseAsWebSocketUnsafe()) - - def fromMetadata[F[_], T]( - default: ResponseAs[T], - conditions: ConditionalResponseAs[WebSocketResponseAs[F, T]]* - ): WebSocketResponseAs[F, T] = - WebSocketResponseAs(ResponseAsFromMetadata(conditions.map(_.map(_.delegate)).toList, default.delegate)) - - /** Uses the `onSuccess` response specification for 101 responses (switching protocols) on JVM/Native, 200 responses - * on JS. Otherwise, use the `onError` specification. - */ - def asWebSocketEither[F[_], A, B]( - onError: ResponseAs[A], - onSuccess: WebSocketResponseAs[F, B] - ): WebSocketResponseAs[F, Either[A, B]] = - SttpExtensions.asWebSocketEitherPlatform(onError, onSuccess) - - // websocket stream response specifications - - def asWebSocketStream[S]( - s: Streams[S] - )(p: s.Pipe[WebSocketFrame.Data[_], WebSocketFrame]): WebSocketStreamResponseAs[Either[String, Unit], S] = - asWebSocketEither(asStringAlways, asWebSocketStreamAlways(s)(p)) - - def asWebSocketStreamAlways[S](s: Streams[S])( - p: s.Pipe[WebSocketFrame.Data[_], WebSocketFrame] - ): WebSocketStreamResponseAs[Unit, S] = WebSocketStreamResponseAs[Unit, S](ResponseAsWebSocketStream(s, p)) - - def fromMetadata[T, S]( - default: ResponseAs[T], - conditions: ConditionalResponseAs[WebSocketStreamResponseAs[T, S]]* - ): WebSocketStreamResponseAs[T, S] = - WebSocketStreamResponseAs[T, S]( - ResponseAsFromMetadata(conditions.map(_.map(_.delegate)).toList, default.delegate) - ) - - /** Uses the `onSuccess` response specification for 101 responses (switching protocols), and the `onError` - * specification otherwise. - */ - def asWebSocketEither[A, B, S]( - onError: ResponseAs[A], - onSuccess: WebSocketStreamResponseAs[B, S] - ): WebSocketStreamResponseAs[Either[A, B], S] = - fromMetadata( - onError.map(Left(_)), - ConditionalResponseAs(_.code == StatusCode.SwitchingProtocols, onSuccess.map(Right(_))) - ).showAs(s"either(${onError.show}, ${onSuccess.show})") } diff --git a/core/src/main/scala/sttp/client4/SttpWebSocketAsyncApi.scala b/core/src/main/scala/sttp/client4/SttpWebSocketAsyncApi.scala new file mode 100644 index 0000000000..bc8ff93f09 --- /dev/null +++ b/core/src/main/scala/sttp/client4/SttpWebSocketAsyncApi.scala @@ -0,0 +1,41 @@ +package sttp.client4 + +import sttp.model.ResponseMetadata +import sttp.ws.WebSocket + +trait SttpWebSocketAsyncApi { + def asWebSocket[F[_], T](f: WebSocket[F] => F[T]): WebSocketResponseAs[F, Either[String, T]] = + asWebSocketEither(asStringAlways, asWebSocketAlways(f)) + + def asWebSocketWithMetadata[F[_], T]( + f: (WebSocket[F], ResponseMetadata) => F[T] + ): WebSocketResponseAs[F, Either[String, T]] = + asWebSocketEither(asStringAlways, asWebSocketAlwaysWithMetadata(f)) + + def asWebSocketAlways[F[_], T](f: WebSocket[F] => F[T]): WebSocketResponseAs[F, T] = + asWebSocketAlwaysWithMetadata((w, _) => f(w)) + + def asWebSocketAlwaysWithMetadata[F[_], T](f: (WebSocket[F], ResponseMetadata) => F[T]): WebSocketResponseAs[F, T] = + WebSocketResponseAs(ResponseAsWebSocket(f)) + + def asWebSocketUnsafe[F[_]]: WebSocketResponseAs[F, Either[String, WebSocket[F]]] = + asWebSocketEither(asStringAlways, asWebSocketAlwaysUnsafe) + + def asWebSocketAlwaysUnsafe[F[_]]: WebSocketResponseAs[F, WebSocket[F]] = + WebSocketResponseAs(ResponseAsWebSocketUnsafe()) + + def fromMetadata[F[_], T]( + default: ResponseAs[T], + conditions: ConditionalResponseAs[WebSocketResponseAs[F, T]]* + ): WebSocketResponseAs[F, T] = + WebSocketResponseAs(ResponseAsFromMetadata(conditions.map(_.map(_.delegate)).toList, default.delegate)) + + /** Uses the `onSuccess` response specification for 101 responses (switching protocols) on JVM/Native, 200 responses + * on JS. Otherwise, use the `onError` specification. + */ + def asWebSocketEither[F[_], A, B]( + onError: ResponseAs[A], + onSuccess: WebSocketResponseAs[F, B] + ): WebSocketResponseAs[F, Either[A, B]] = + SttpExtensions.asWebSocketEitherPlatform(onError, onSuccess) +} diff --git a/core/src/main/scala/sttp/client4/SttpWebSocketStreamApi.scala b/core/src/main/scala/sttp/client4/SttpWebSocketStreamApi.scala new file mode 100644 index 0000000000..ec9ea08f26 --- /dev/null +++ b/core/src/main/scala/sttp/client4/SttpWebSocketStreamApi.scala @@ -0,0 +1,37 @@ +package sttp.client4 + +import sttp.capabilities.Streams +import sttp.model.StatusCode +import sttp.ws.WebSocketFrame + +trait SttpWebSocketStreamApi { + def asWebSocketStream[S]( + s: Streams[S] + )(p: s.Pipe[WebSocketFrame.Data[_], WebSocketFrame]): WebSocketStreamResponseAs[Either[String, Unit], S] = + asWebSocketEither(asStringAlways, asWebSocketStreamAlways(s)(p)) + + def asWebSocketStreamAlways[S](s: Streams[S])( + p: s.Pipe[WebSocketFrame.Data[_], WebSocketFrame] + ): WebSocketStreamResponseAs[Unit, S] = WebSocketStreamResponseAs[Unit, S](ResponseAsWebSocketStream(s, p)) + + def fromMetadata[T, S]( + default: ResponseAs[T], + conditions: ConditionalResponseAs[WebSocketStreamResponseAs[T, S]]* + ): WebSocketStreamResponseAs[T, S] = + WebSocketStreamResponseAs[T, S]( + ResponseAsFromMetadata(conditions.map(_.map(_.delegate)).toList, default.delegate) + ) + + /** Uses the `onSuccess` response specification for 101 responses (switching protocols), and the `onError` + * specification otherwise. + */ + def asWebSocketEither[A, B, S]( + onError: ResponseAs[A], + onSuccess: WebSocketStreamResponseAs[B, S] + ): WebSocketStreamResponseAs[Either[A, B], S] = + fromMetadata( + onError.map(Left(_)), + ConditionalResponseAs(_.code == StatusCode.SwitchingProtocols, onSuccess.map(Right(_))) + ).showAs(s"either(${onError.show}, ${onSuccess.show})") + +} diff --git a/core/src/main/scala/sttp/client4/SttpWebSocketSyncApi.scala b/core/src/main/scala/sttp/client4/SttpWebSocketSyncApi.scala new file mode 100644 index 0000000000..839e34d76e --- /dev/null +++ b/core/src/main/scala/sttp/client4/SttpWebSocketSyncApi.scala @@ -0,0 +1,44 @@ +package sttp.client4 + +import sttp.client4.ws.SyncWebSocket +import sttp.model.ResponseMetadata +import sttp.ws.WebSocket + +trait SttpWebSocketSyncApi { + def asWebSocket[T](f: SyncWebSocket => T): WebSocketResponseAs[Identity, Either[String, T]] = + asWebSocketEither(asStringAlways, asWebSocketAlways(f)) + + def asWebSocketWithMetadata[T]( + f: (SyncWebSocket, ResponseMetadata) => T + ): WebSocketResponseAs[Identity, Either[String, T]] = + asWebSocketEither(asStringAlways, asWebSocketAlwaysWithMetadata(f)) + + def asWebSocketAlways[T](f: SyncWebSocket => T): WebSocketResponseAs[Identity, T] = + asWebSocketAlwaysWithMetadata((w, _) => f(w)) + + def asWebSocketAlwaysWithMetadata[T]( + f: (SyncWebSocket, ResponseMetadata) => T + ): WebSocketResponseAs[Identity, T] = + WebSocketResponseAs[Identity, T](ResponseAsWebSocket[Identity, T]((ws, m) => f(new SyncWebSocket(ws), m))) + + def asWebSocketUnsafe: WebSocketResponseAs[Identity, Either[String, SyncWebSocket]] = + asWebSocketEither(asStringAlways, asWebSocketAlwaysUnsafe) + + def asWebSocketAlwaysUnsafe: WebSocketResponseAs[Identity, SyncWebSocket] = + WebSocketResponseAs[Identity, WebSocket[Identity]](ResponseAsWebSocketUnsafe()).map(new SyncWebSocket(_)) + + def fromMetadata[T]( + default: ResponseAs[T], + conditions: ConditionalResponseAs[WebSocketResponseAs[Identity, T]]* + ): WebSocketResponseAs[Identity, T] = + WebSocketResponseAs(ResponseAsFromMetadata(conditions.map(_.map(_.delegate)).toList, default.delegate)) + + /** Uses the `onSuccess` response specification for 101 responses (switching protocols) on JVM/Native, 200 responses + * on JS. Otherwise, use the `onError` specification. + */ + def asWebSocketEither[A, B]( + onError: ResponseAs[A], + onSuccess: WebSocketResponseAs[Identity, B] + ): WebSocketResponseAs[Identity, Either[A, B]] = + SttpExtensions.asWebSocketEitherPlatform(onError, onSuccess) +} diff --git a/core/src/main/scala/sttp/client4/ws/SyncWebSocket.scala b/core/src/main/scala/sttp/client4/ws/SyncWebSocket.scala new file mode 100644 index 0000000000..7a66a16be6 --- /dev/null +++ b/core/src/main/scala/sttp/client4/ws/SyncWebSocket.scala @@ -0,0 +1,140 @@ +package sttp.client4.ws + +import sttp.client4.Identity +import sttp.model.Headers +import sttp.ws.{WebSocket, WebSocketClosed, WebSocketFrame} + +/** Allows interacting with a web socket. Interactions can happen: + * + * - on the frame level, by sending and receiving raw [[WebSocketFrame]] s + * - using the provided `receive*` methods to obtain concatenated data frames, or string/byte payloads, and the + * `send*` method to send string/binary frames. + * + * The `send*` and `receive*` methods may result in a failed effect, with either one of [[sttp.ws.WebSocketException]] + * exceptions, or a backend-specific exception. Specifically, they will fail with [[WebSocketClosed]] if the web socket + * is closed. + * + * See the `either` and `eitherClose` method to lift web socket closed events to the value level. + */ +class SyncWebSocket(val delegate: WebSocket[Identity]) { + + /** Receive the next frame from the web socket. This can be a data frame, or a control frame including + * [[WebSocketFrame.Close]]. After receiving a close frame, no further interactions with the web socket should + * happen. + * + * However, not all implementations expose the close frame, and web sockets might also get closed without the proper + * close frame exchange. In such cases, as well as when invoking `receive`/`send` after receiving a close frame, a + * [[WebSocketClosed]] exception will be thrown. + * + * *Should be only called sequentially!* (from a single thread/fiber). Because web socket frames might be fragmented, + * calling this method concurrently might result in threads/fibers receiving fragments of the same frame. + */ + def receive(): WebSocketFrame = delegate.receive() + + /** Sends a web socket frame. Can be safely called from multiple threads. + * + * May result in an exception, in case of a network error, or if the socket is closed. + */ + def send(f: WebSocketFrame, isContinuation: Boolean = false): Unit = delegate.send(f, isContinuation) + + def isOpen(): Boolean = delegate.isOpen() + + /** Receive a single data frame, ignoring others. The frame might be a fragment. Will throw [[WebSocketClosed]] if the + * web socket is closed, or if a close frame is received. + * + * *Should be only called sequentially!* (from a single thread/fiber). + * + * @param pongOnPing + * Should a [[WebSocketFrame.Pong]] be sent when a [[WebSocketFrame.Ping]] is received. + */ + def receiveDataFrame(pongOnPing: Boolean = true): WebSocketFrame.Data[_] = delegate.receiveDataFrame(pongOnPing) + + /** Receive a single text data frame, ignoring others. The frame might be a fragment. To receive whole messages, use + * [[receiveText]]. Will throw [[WebSocketClosed]] if the web socket is closed, or if a close frame is received. + * + * *Should be only called sequentially!* (from a single thread/fiber). + * + * @param pongOnPing + * Should a [[WebSocketFrame.Pong]] be sent when a [[WebSocketFrame.Ping]] is received. + */ + def receiveTextFrame(pongOnPing: Boolean = true): WebSocketFrame.Text = delegate.receiveTextFrame(pongOnPing) + + /** Receive a single binary data frame, ignoring others. The frame might be a fragment. To receive whole messages, use + * [[receiveBinary]]. Will throw [[WebSocketClosed]] if the web socket is closed, or if a close frame is received. + * + * *Should be only called sequentially!* (from a single thread/fiber). + * + * @param pongOnPing + * Should a [[WebSocketFrame.Pong]] be sent when a [[WebSocketFrame.Ping]] is received. + */ + def receiveBinaryFrame(pongOnPing: Boolean = true): WebSocketFrame.Binary = delegate.receiveBinaryFrame(pongOnPing) + + /** Receive a single text message (which might come from multiple, fragmented frames). Ignores non-text frames and + * returns combined results. Will throw [[WebSocketClosed]] if the web socket is closed, or if a close frame is + * received. + * + * *Should be only called sequentially!* (from a single thread/fiber). + * + * @param pongOnPing + * Should a [[WebSocketFrame.Pong]] be sent when a [[WebSocketFrame.Ping]] is received. + */ + def receiveText(pongOnPing: Boolean = true): String = delegate.receiveText(pongOnPing) + + /** Receive a single binary message (which might come from multiple, fragmented frames). Ignores non-binary frames and + * returns combined results. Will throw [[WebSocketClosed]] if the web socket is closed, or if a close frame is + * received. + * + * *Should be only called sequentially!* (from a single thread/fiber). + * + * @param pongOnPing + * Should a [[WebSocketFrame.Pong]] be sent when a [[WebSocketFrame.Ping]] is received. + */ + def receiveBinary(pongOnPing: Boolean): Array[Byte] = delegate.receiveBinary(pongOnPing) + + /** Extracts the received close frame (if available) as the left side of an either, or returns the original result on + * the right. + * + * Will throw [[WebSocketClosed]] if the web socket is closed, but no close frame is available. + * + * @param f + * The effect describing web socket interactions. + */ + def eitherClose[T](f: => T): Either[WebSocketFrame.Close, T] = + try Right(f) + catch { + case WebSocketClosed(Some(close)) => Left(close) + } + + /** Returns an effect computing a: + * + * - `Left` if the web socket is closed - optionally with the received close frame (if available). + * - `Right` with the original result otherwise. + * + * Will never throw a [[WebSocketClosed]]. + * + * @param f + * The effect describing web socket interactions. + */ + def either[T](f: => T): Either[Option[WebSocketFrame.Close], T] = + try Right(f) + catch { + case WebSocketClosed(close) => Left(close) + } + + /** Sends a web socket frame with the given payload. Can be safely called from multiple threads. + * + * May result in an exception, in case of a network error, or if the socket is closed. + */ + def sendText(payload: String): Unit = delegate.sendText(payload) + + /** Sends a web socket frame with the given payload. Can be safely called from multiple threads. + * + * May result in an exception, in case of a network error, or if the socket is closed. + */ + def sendBinary(payload: Array[Byte]): Unit = delegate.sendBinary(payload) + + /** Idempotent when used sequentially. */ + def close(): Unit = delegate.close() + + def upgradeHeaders: Headers = delegate.upgradeHeaders +} diff --git a/core/src/main/scala/sttp/client4/ws/package.scala b/core/src/main/scala/sttp/client4/ws/package.scala new file mode 100644 index 0000000000..8ae92b8392 --- /dev/null +++ b/core/src/main/scala/sttp/client4/ws/package.scala @@ -0,0 +1,7 @@ +package sttp.client4 + +package object ws { + object async extends SttpWebSocketAsyncApi + object sync extends SttpWebSocketSyncApi + object stream extends SttpWebSocketStreamApi +} diff --git a/core/src/main/scalajs/sttp/client4/SttpExtensions.scala b/core/src/main/scalajs/sttp/client4/SttpExtensions.scala index e7a1750acb..b09977884a 100644 --- a/core/src/main/scalajs/sttp/client4/SttpExtensions.scala +++ b/core/src/main/scalajs/sttp/client4/SttpExtensions.scala @@ -28,7 +28,7 @@ object SttpExtensions { onError: ResponseAs[A], onSuccess: WebSocketResponseAs[F, B] ): WebSocketResponseAs[F, Either[A, B]] = - fromMetadata( + ws.async.fromMetadata( onError.map(Left(_)), ConditionalResponseAs(_.code == StatusCode.Ok, onSuccess.map(Right(_))) ).showAs(s"either(${onError.show}, ${onSuccess.show})") diff --git a/core/src/main/scalajvm/sttp/client4/SttpExtensions.scala b/core/src/main/scalajvm/sttp/client4/SttpExtensions.scala index 33e1aa9560..09062e4bb9 100644 --- a/core/src/main/scalajvm/sttp/client4/SttpExtensions.scala +++ b/core/src/main/scalajvm/sttp/client4/SttpExtensions.scala @@ -39,8 +39,10 @@ object SttpExtensions { onError: ResponseAs[A], onSuccess: WebSocketResponseAs[F, B] ): WebSocketResponseAs[F, Either[A, B]] = - fromMetadata( - onError.map(Left(_)), - ConditionalResponseAs(_.code == StatusCode.SwitchingProtocols, onSuccess.map(Right(_))) - ).showAs(s"either(${onError.show}, ${onSuccess.show})") + ws.async + .fromMetadata( + onError.map(Left(_)), + ConditionalResponseAs(_.code == StatusCode.SwitchingProtocols, onSuccess.map(Right(_))) + ) + .showAs(s"either(${onError.show}, ${onSuccess.show})") } diff --git a/core/src/main/scalanative/sttp/client4/SttpExtensions.scala b/core/src/main/scalanative/sttp/client4/SttpExtensions.scala index 1f0503f777..f0aa530294 100644 --- a/core/src/main/scalanative/sttp/client4/SttpExtensions.scala +++ b/core/src/main/scalanative/sttp/client4/SttpExtensions.scala @@ -39,7 +39,7 @@ object SttpExtensions { onError: ResponseAs[A], onSuccess: WebSocketResponseAs[F, B] ): WebSocketResponseAs[F, Either[A, B]] = - fromMetadata( + sttp.client4.ws.async.fromMetadata( onError.map(Left(_)), ConditionalResponseAs(_.code == StatusCode.SwitchingProtocols, onSuccess.map(Right(_))) ).showAs(s"either(${onError.show}, ${onSuccess.show})") diff --git a/core/src/test/scala/sttp/client4/testing/BackendStubTests.scala b/core/src/test/scala/sttp/client4/testing/BackendStubTests.scala index 34b778dc24..34fb66788a 100644 --- a/core/src/test/scala/sttp/client4/testing/BackendStubTests.scala +++ b/core/src/test/scala/sttp/client4/testing/BackendStubTests.scala @@ -7,6 +7,7 @@ import sttp.client4.SttpClientException.ReadException import sttp.client4._ import sttp.client4.internal._ import sttp.client4.monad.IdMonad +import sttp.client4.ws.async._ import sttp.model._ import sttp.monad.{FutureMonad, TryMonad} import sttp.ws.WebSocketFrame diff --git a/core/src/test/scala/sttp/client4/testing/websocket/WebSocketBufferOverflowTest.scala b/core/src/test/scala/sttp/client4/testing/websocket/WebSocketBufferOverflowTest.scala index c64141cd34..71a2f50142 100644 --- a/core/src/test/scala/sttp/client4/testing/websocket/WebSocketBufferOverflowTest.scala +++ b/core/src/test/scala/sttp/client4/testing/websocket/WebSocketBufferOverflowTest.scala @@ -6,6 +6,7 @@ import org.scalatest.Suite import org.scalatest.flatspec.AsyncFlatSpecLike import sttp.client4.testing.HttpTest.wsEndpoint import sttp.client4._ +import sttp.client4.ws.async._ import sttp.monad.MonadError import sttp.client4.testing.ConvertToFuture diff --git a/core/src/test/scala/sttp/client4/testing/websocket/WebSocketConcurrentTest.scala b/core/src/test/scala/sttp/client4/testing/websocket/WebSocketConcurrentTest.scala index bcf1e815e2..0c74b5e1d6 100644 --- a/core/src/test/scala/sttp/client4/testing/websocket/WebSocketConcurrentTest.scala +++ b/core/src/test/scala/sttp/client4/testing/websocket/WebSocketConcurrentTest.scala @@ -5,6 +5,7 @@ import org.scalatest.flatspec.AsyncFlatSpecLike import sttp.client4._ import sttp.client4.testing.ConvertToFuture import sttp.client4.testing.HttpTest.wsEndpoint +import sttp.client4.ws.async._ import sttp.monad.MonadError import sttp.monad.syntax._ import sttp.ws.WebSocket diff --git a/core/src/test/scala/sttp/client4/testing/websocket/WebSocketStreamingTest.scala b/core/src/test/scala/sttp/client4/testing/websocket/WebSocketStreamingTest.scala index 62029c7e9f..e7710954ac 100644 --- a/core/src/test/scala/sttp/client4/testing/websocket/WebSocketStreamingTest.scala +++ b/core/src/test/scala/sttp/client4/testing/websocket/WebSocketStreamingTest.scala @@ -7,6 +7,7 @@ import sttp.capabilities.Streams import sttp.client4.testing.HttpTest.wsEndpoint import sttp.client4.testing.{ConvertToFuture, ToFutureWrapper} import sttp.client4._ +import sttp.client4.ws.stream._ import sttp.monad.MonadError import sttp.monad.syntax._ import sttp.ws.WebSocketFrame diff --git a/core/src/test/scala/sttp/client4/testing/websocket/WebSocketTest.scala b/core/src/test/scala/sttp/client4/testing/websocket/WebSocketTest.scala index f78fc64aa0..f5816adc09 100644 --- a/core/src/test/scala/sttp/client4/testing/websocket/WebSocketTest.scala +++ b/core/src/test/scala/sttp/client4/testing/websocket/WebSocketTest.scala @@ -7,6 +7,7 @@ import org.scalatest.time.{Seconds, Span} import org.scalatest.{Assertion, BeforeAndAfterAll} import sttp.client4.SttpClientException.ReadException import sttp.client4._ +import sttp.client4.ws.async._ import sttp.client4.logging.{LogConfig, LogLevel, Logger, LoggingBackend} import sttp.client4.testing.HttpTest.wsEndpoint import sttp.client4.testing.{ConvertToFuture, ToFutureWrapper} diff --git a/examples-ce2/src/main/scala/sttp/client4/examples/WebSocketMonix.scala b/examples-ce2/src/main/scala/sttp/client4/examples/WebSocketMonix.scala index 7aedec1d1e..bf5ed08057 100644 --- a/examples-ce2/src/main/scala/sttp/client4/examples/WebSocketMonix.scala +++ b/examples-ce2/src/main/scala/sttp/client4/examples/WebSocketMonix.scala @@ -2,6 +2,7 @@ package sttp.client4.examples import monix.eval.Task import sttp.client4._ +import sttp.client4.ws.async._ import sttp.client4.httpclient.monix.HttpClientMonixBackend import sttp.ws.WebSocket diff --git a/examples-ce2/src/main/scala/sttp/client4/examples/WebSocketTesting.scala b/examples-ce2/src/main/scala/sttp/client4/examples/WebSocketTesting.scala index 8a20c0e4d6..1e0dda8a5f 100644 --- a/examples-ce2/src/main/scala/sttp/client4/examples/WebSocketTesting.scala +++ b/examples-ce2/src/main/scala/sttp/client4/examples/WebSocketTesting.scala @@ -3,6 +3,7 @@ package sttp.client4.examples import monix.eval.Task import sttp.capabilities.monix.MonixStreams import sttp.client4._ +import sttp.client4.ws.async._ import sttp.client4.httpclient.monix.HttpClientMonixBackend import sttp.client4.testing.WebSocketStreamBackendStub import sttp.model.StatusCode diff --git a/examples/src/main/scala/sttp/client4/examples/WebSocketAkka.scala b/examples/src/main/scala/sttp/client4/examples/WebSocketAkka.scala index 94d194a322..d23468aa9a 100644 --- a/examples/src/main/scala/sttp/client4/examples/WebSocketAkka.scala +++ b/examples/src/main/scala/sttp/client4/examples/WebSocketAkka.scala @@ -1,6 +1,7 @@ package sttp.client4.examples import sttp.client4._ +import sttp.client4.ws.async._ import sttp.client4.akkahttp.AkkaHttpBackend import sttp.ws.WebSocket diff --git a/examples/src/main/scala/sttp/client4/examples/WebSocketStreamFs2.scala b/examples/src/main/scala/sttp/client4/examples/WebSocketStreamFs2.scala index 39e17c539a..9e9d37007d 100644 --- a/examples/src/main/scala/sttp/client4/examples/WebSocketStreamFs2.scala +++ b/examples/src/main/scala/sttp/client4/examples/WebSocketStreamFs2.scala @@ -5,6 +5,7 @@ import cats.effect.unsafe.IORuntime import fs2._ import sttp.capabilities.fs2.Fs2Streams import sttp.client4._ +import sttp.client4.ws.stream._ import sttp.client4.httpclient.fs2.HttpClientFs2Backend import sttp.ws.WebSocketFrame diff --git a/examples/src/main/scala/sttp/client4/examples/WebSocketSynchronous.scala b/examples/src/main/scala/sttp/client4/examples/WebSocketSynchronous.scala index 9739b03664..d3693c13e0 100644 --- a/examples/src/main/scala/sttp/client4/examples/WebSocketSynchronous.scala +++ b/examples/src/main/scala/sttp/client4/examples/WebSocketSynchronous.scala @@ -1,14 +1,11 @@ package sttp.client4.examples import sttp.client4._ -import sttp.client4.akkahttp.AkkaHttpBackend -import sttp.ws.WebSocket - -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.Future +import sttp.client4.ws.SyncWebSocket +import sttp.client4.ws.sync._ object WebSocketSynchronous extends App { - def useWebSocket(ws: WebSocket[Identity]): Unit = { + def useWebSocket(ws: SyncWebSocket): Unit = { def send(i: Int): Unit = ws.sendText(s"Hello $i!") def receive(): Unit = { val t = ws.receiveText() @@ -25,7 +22,7 @@ object WebSocketSynchronous extends App { try basicRequest .get(uri"wss://ws.postman-echo.com/raw") - .response(asWebSocket[Identity, Unit](useWebSocket)) + .response(asWebSocket(useWebSocket)) .send(backend) finally backend.close() } diff --git a/examples/src/main/scala/sttp/client4/examples/WebSocketZio.scala b/examples/src/main/scala/sttp/client4/examples/WebSocketZio.scala index 322b6a5433..e039d275b9 100644 --- a/examples/src/main/scala/sttp/client4/examples/WebSocketZio.scala +++ b/examples/src/main/scala/sttp/client4/examples/WebSocketZio.scala @@ -1,6 +1,7 @@ package sttp.client4.examples import sttp.client4._ +import sttp.client4.ws.async._ import sttp.client4.httpclient.zio.HttpClientZioBackend import sttp.ws.WebSocket import zio.{Console, _} diff --git a/okhttp-backend/src/test/scala/sttp/client4/okhttp/OkHttpSyncWebSocketTest.scala b/okhttp-backend/src/test/scala/sttp/client4/okhttp/OkHttpSyncWebSocketTest.scala index 7919743d5e..3bb8fe4d1a 100644 --- a/okhttp-backend/src/test/scala/sttp/client4/okhttp/OkHttpSyncWebSocketTest.scala +++ b/okhttp-backend/src/test/scala/sttp/client4/okhttp/OkHttpSyncWebSocketTest.scala @@ -2,6 +2,7 @@ package sttp.client4.okhttp import org.scalatest.Assertion import sttp.client4._ +import sttp.client4.ws.sync._ import sttp.monad.syntax._ import sttp.client4.monad.IdMonad import sttp.client4.testing.ConvertToFuture @@ -21,9 +22,9 @@ class OkHttpSyncWebSocketTest extends WebSocketTest[Identity] { it should "error if incoming messages overflow the buffer" in { basicRequest .get(uri"$wsEndpoint/ws/echo") - .response(asWebSocket[Identity, Assertion] { ws => - sendText(ws, OkHttpBackend.DefaultWebSocketBufferCapacity.get + 1).flatMap(_ => - eventually(10.millis, 400)(() => ws.isOpen().map(_ shouldBe false)) + .response(asWebSocket[Assertion] { ws => + sendText(ws.delegate, OkHttpBackend.DefaultWebSocketBufferCapacity.get + 1).flatMap(_ => + eventually(10.millis, 400)(() => ws.isOpen() shouldBe false) ) }) .send(backend)