From 40c1c05fb14f23be03a884d3d1d648d6fe3716df Mon Sep 17 00:00:00 2001 From: Brian Baldino Date: Thu, 5 Nov 2020 13:33:32 -0800 Subject: [PATCH 1/4] add websocket client module --- jicoco-websocket-client/pom.xml | 44 +++++++++++++++++++++++++++++++++ pom.xml | 1 + 2 files changed, 45 insertions(+) create mode 100644 jicoco-websocket-client/pom.xml diff --git a/jicoco-websocket-client/pom.xml b/jicoco-websocket-client/pom.xml new file mode 100644 index 00000000..a266cc45 --- /dev/null +++ b/jicoco-websocket-client/pom.xml @@ -0,0 +1,44 @@ + + + + + 4.0.0 + + + org.jitsi + jicoco-parent + 1.1-SNAPSHOT + + + jicoco-websocket-client + 1.1-SNAPSHOT + + + 1.3.1 + + + + + io.ktor + ktor-client-cio + ${ktor.version} + + + + diff --git a/pom.xml b/pom.xml index 046b79cf..b5b33b4f 100644 --- a/pom.xml +++ b/pom.xml @@ -36,6 +36,7 @@ jicoco jicoco-test-kotlin jicoco-config + jicoco-websocket-client From c07f08a197c64804bd2fe1782d2a5ef317488ef9 Mon Sep 17 00:00:00 2001 From: Brian Baldino Date: Thu, 5 Nov 2020 13:56:23 -0800 Subject: [PATCH 2/4] add websocket client and tests --- jicoco-websocket-client/pom.xml | 77 ++++++++ .../jitsi/websocket_client/WebSocketClient.kt | 164 ++++++++++++++++++ .../jitsi/websocket_client/TestWsServer.kt | 76 ++++++++ .../websocket_client/WebSocketClientTest.kt | 120 +++++++++++++ 4 files changed, 437 insertions(+) create mode 100644 jicoco-websocket-client/src/main/kotlin/org/jitsi/websocket_client/WebSocketClient.kt create mode 100644 jicoco-websocket-client/src/test/kotlin/org/jitsi/websocket_client/TestWsServer.kt create mode 100644 jicoco-websocket-client/src/test/kotlin/org/jitsi/websocket_client/WebSocketClientTest.kt diff --git a/jicoco-websocket-client/pom.xml b/jicoco-websocket-client/pom.xml index a266cc45..6f01711b 100644 --- a/jicoco-websocket-client/pom.xml +++ b/jicoco-websocket-client/pom.xml @@ -31,14 +31,91 @@ 1.3.1 + 1.3.72 + 4.1.3 + + org.jitsi + jitsi-utils + 1.0-60-g07c4a0b + io.ktor ktor-client-cio ${ktor.version} + + org.jetbrains.kotlin + kotlin-stdlib-jdk8 + ${kotlin.version} + + + + io.kotest + kotest-runner-junit5-jvm + ${kotest.version} + test + + + io.kotest + kotest-assertions-core-jvm + ${kotest.version} + test + + + + io.ktor + ktor-server-jetty + ${ktor.version} + test + + + io.ktor + ktor-websockets + ${ktor.version} + test + + + src/main/kotlin + + + org.jetbrains.kotlin + kotlin-maven-plugin + ${kotlin.version} + + + compile + compile + + compile + + + + test-compile + test-compile + + test-compile + + + + + 1.8 + + -Xuse-experimental=io.ktor.util.KtorExperimentalAPI + + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.22.2 + + + + diff --git a/jicoco-websocket-client/src/main/kotlin/org/jitsi/websocket_client/WebSocketClient.kt b/jicoco-websocket-client/src/main/kotlin/org/jitsi/websocket_client/WebSocketClient.kt new file mode 100644 index 00000000..afe69f79 --- /dev/null +++ b/jicoco-websocket-client/src/main/kotlin/org/jitsi/websocket_client/WebSocketClient.kt @@ -0,0 +1,164 @@ +/* + * Copyright @ 2018 - present 8x8, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.jitsi.websocket_client + +import io.ktor.client.HttpClient +import io.ktor.client.engine.cio.CIO +import io.ktor.client.features.websocket.DefaultClientWebSocketSession +import io.ktor.client.features.websocket.WebSockets +import io.ktor.client.features.websocket.webSocketSession +import io.ktor.http.URLProtocol +import io.ktor.http.cio.websocket.CloseReason +import io.ktor.http.cio.websocket.Frame +import io.ktor.http.cio.websocket.close +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.cancelAndJoin +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.ClosedReceiveChannelException +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import org.jitsi.utils.logging2.Logger +import org.jitsi.utils.logging2.createChildLogger +import java.net.ConnectException + +/** + * A websocket client which sends messages and invokes a handler upon receiving + * messages from the far side. Sending is non-blocking, and the client has no + * notion of correlating "responses" to "requests": if request/response + * semantics are required then they must be implemented by a layer on top of + * this class. + */ +class WebSocketClient( + private val host: String, + private val wsProtocol: WsProtocol, + private val port: Int, + /** + * The path of the remote websocket URL + */ + private val path: String, + parentLogger: Logger, + private val incomingMessageHandler: (Frame) -> Unit = {}, + private val client: HttpClient = HttpClient(CIO) { + install(WebSockets) + }, + /** + * The dispatcher which will be used for all of the request and response + * processing. + */ + dispatcher: CoroutineDispatcher = Dispatchers.IO +) { + private val logger = createChildLogger(parentLogger) + private val job = Job() + private val coroutineScope = CoroutineScope(dispatcher + job) + private val msgsToSend = Channel(Channel.RENDEZVOUS) + private var wsSession: DefaultClientWebSocketSession? = null + + fun sendString(data: String) { + coroutineScope.launch { + msgsToSend.send(Frame.Text(data)) + } + } + + // Starts the run loop for sending and receiving websocket messages + private suspend fun DefaultClientWebSocketSession.startLoop() { + launch { + for (msg in incoming) { + incomingMessageHandler(msg) + } + } + try { + for (msg in msgsToSend) { + send(msg) + } + } catch (e: ClosedReceiveChannelException) { + logger.info("Websocket was closed") + return + } catch (e: CancellationException) { + logger.info("Websocket job was cancelled") + throw e + } catch (t: Throwable) { + logger.error("Error in websocket connection: ", t) + return + } + } + + /** + * Attempt to connect to the websocket server, returns true if the connection was + * successful, false otherwise. + * + * Known exceptions (not necessarily exhaustive): + * [ConnectException] if we can't connect + * [IllegalArgumentException] if expected WSS, but the server is WS + * [EOFException] if expected WS, but the server is WSS + * [SunCertPathBuilderException] WSS cert issue + */ + fun connect(): Boolean { + return try { + wsSession = runBlocking { + client.webSocketSession { + url { + protocol = wsProtocol.toUrlProtocol() + host = this@WebSocketClient.host + port = this@WebSocketClient.port + path(path) + } + } + } + true + } catch (t: Throwable) { + logger.error("Error connecting", t) + false + } + } + + /** + * Start the (asynchronous) loops to handle sending and receiving messages + */ + fun run() { + coroutineScope.launch { + requireNotNull(wsSession) + wsSession?.startLoop() + } + } + + /** + * Stop and close the websocket connection + */ + fun stop() { + logger.info("Stopping") + runBlocking { + wsSession?.close(CloseReason(CloseReason.Codes.NORMAL, "bye")) + job.cancelAndJoin() + } + } + + enum class WsProtocol { + WS, + WSS + } + + private fun WsProtocol.toUrlProtocol(): URLProtocol { + return when (this) { + WsProtocol.WS -> URLProtocol.WS + WsProtocol.WSS -> URLProtocol.WSS + } + } +} diff --git a/jicoco-websocket-client/src/test/kotlin/org/jitsi/websocket_client/TestWsServer.kt b/jicoco-websocket-client/src/test/kotlin/org/jitsi/websocket_client/TestWsServer.kt new file mode 100644 index 00000000..3984e047 --- /dev/null +++ b/jicoco-websocket-client/src/test/kotlin/org/jitsi/websocket_client/TestWsServer.kt @@ -0,0 +1,76 @@ +/* + * Copyright @ 2018 - present 8x8, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.jitsi.websocket_client + +import io.ktor.application.Application +import io.ktor.application.install +import io.ktor.http.cio.websocket.Frame +import io.ktor.http.cio.websocket.readText +import io.ktor.routing.route +import io.ktor.routing.routing +import io.ktor.websocket.WebSockets +import io.ktor.websocket.webSocket +import kotlinx.coroutines.cancel +import kotlinx.coroutines.delay + +/** + * A ws server configuration which defines multiple ws endpoints that can be + * used for different test scenarios. + */ +class TestWsServer { + val receivedMessages = mutableListOf() + val app: Application.() -> Unit = { + install(WebSockets) + + routing { + route("ws") { + // Receive messages and don't respond + webSocket("blackhole") { + for (frame in incoming) { + receivedMessages.add(frame) + } + } + // Receive messages and echo the content back + webSocket("echo") { + for (frame in incoming) { + receivedMessages.add(frame) + frame as Frame.Text + send(Frame.Text(frame.readText())) + } + } + // Receive messages and echo the content back after a delay + webSocket("delayecho") { + for (frame in incoming) { + receivedMessages.add(frame) + frame as Frame.Text + delay(1000) + send(Frame.Text(frame.readText())) + } + } + // Receive a message and then close the connection after a + // delay + webSocket("delayandclose") { + for (frame in incoming) { + receivedMessages.add(frame) + delay(1000) + cancel() + } + } + } + } + } +} diff --git a/jicoco-websocket-client/src/test/kotlin/org/jitsi/websocket_client/WebSocketClientTest.kt b/jicoco-websocket-client/src/test/kotlin/org/jitsi/websocket_client/WebSocketClientTest.kt new file mode 100644 index 00000000..19a17b22 --- /dev/null +++ b/jicoco-websocket-client/src/test/kotlin/org/jitsi/websocket_client/WebSocketClientTest.kt @@ -0,0 +1,120 @@ +/* + * Copyright @ 2018 - present 8x8, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.jitsi.websocket_client + +import io.kotest.assertions.timing.eventually +import io.kotest.core.spec.IsolationMode +import io.kotest.core.spec.style.ShouldSpec +import io.kotest.matchers.collections.shouldHaveSize +import io.kotest.matchers.shouldBe +import io.kotest.matchers.types.shouldBeInstanceOf +import io.ktor.client.HttpClient +import io.ktor.client.engine.cio.CIO +import io.ktor.client.features.websocket.WebSockets +import io.ktor.http.cio.websocket.Frame +import io.ktor.http.cio.websocket.readText +import io.ktor.server.engine.embeddedServer +import io.ktor.server.jetty.Jetty +import org.jitsi.utils.logging2.LoggerImpl +import kotlin.random.Random +import kotlin.time.ExperimentalTime +import kotlin.time.seconds + +@ExperimentalTime +class WebSocketClientTest : ShouldSpec() { + override fun isolationMode(): IsolationMode? = IsolationMode.InstancePerLeaf + + private val wsPort = Random.nextInt(1024, 65535).also { + println("Server running on port $it") + } + + private val client = HttpClient(CIO) { + install(WebSockets) + } + + private val wsServer = TestWsServer() + + private val server = embeddedServer(Jetty, port = wsPort) { + wsServer.app(this) + } + + private val receivedMessages = mutableListOf() + + private fun incomingMessageHandler(frame: Frame) { + receivedMessages.add(frame) + } + + private val testLogger = LoggerImpl("test") + + init { + server.start() + + context("sendString") { + context("when no reply is expected") { + val ws = createAndStartWsClient("/ws/blackhole") + ws.sendString("hello") + should("send a message") { + eventually(5.seconds) { + wsServer.receivedMessages shouldHaveSize 1 + } + wsServer.receivedMessages.first().shouldBeInstanceOf() + (wsServer.receivedMessages.first() as Frame.Text).readText() shouldBe "hello" + } + } + context("when a reply is expected") { + val ws = createAndStartWsClient("/ws/echo") + ws.sendString("hello") + should("invoke the incoming message handler") { + eventually(5.seconds) { + receivedMessages shouldHaveSize 1 + } + receivedMessages.first().shouldBeInstanceOf() + (receivedMessages.first() as Frame.Text).readText() shouldBe "hello" + } + } + context("when the client is stopped while waiting for a reply") { + val ws = createAndStartWsClient("/ws/delayandclose") + Thread.sleep(1000) + ws.sendString("hello") + ws.stop() + Thread.sleep(1000) + } + context("stop") { + val ws = createAndStartWsClient("/ws/echo") + ws.sendString("hello") + should("clean things up correctly") { + ws.stop() + } + } + } + } + + private fun createAndStartWsClient(path: String): WebSocketClient { + return WebSocketClient( + "localhost", + WebSocketClient.WsProtocol.WS, + wsPort, + path, + testLogger, + ::incomingMessageHandler, + client + ).apply { + connect() + run() + } + } +} From 93e2e526f2c9b8523f3e5d20a68dbcd6d607796e Mon Sep 17 00:00:00 2001 From: Brian Baldino Date: Thu, 5 Nov 2020 14:11:11 -0800 Subject: [PATCH 3/4] move wsprotocol enum to its own file --- .../jitsi/websocket_client/WebSocketClient.kt | 5 ---- .../org/jitsi/websocket_client/WsProtocol.kt | 23 +++++++++++++++++++ 2 files changed, 23 insertions(+), 5 deletions(-) create mode 100644 jicoco-websocket-client/src/main/kotlin/org/jitsi/websocket_client/WsProtocol.kt diff --git a/jicoco-websocket-client/src/main/kotlin/org/jitsi/websocket_client/WebSocketClient.kt b/jicoco-websocket-client/src/main/kotlin/org/jitsi/websocket_client/WebSocketClient.kt index afe69f79..b4508483 100644 --- a/jicoco-websocket-client/src/main/kotlin/org/jitsi/websocket_client/WebSocketClient.kt +++ b/jicoco-websocket-client/src/main/kotlin/org/jitsi/websocket_client/WebSocketClient.kt @@ -150,11 +150,6 @@ class WebSocketClient( } } - enum class WsProtocol { - WS, - WSS - } - private fun WsProtocol.toUrlProtocol(): URLProtocol { return when (this) { WsProtocol.WS -> URLProtocol.WS diff --git a/jicoco-websocket-client/src/main/kotlin/org/jitsi/websocket_client/WsProtocol.kt b/jicoco-websocket-client/src/main/kotlin/org/jitsi/websocket_client/WsProtocol.kt new file mode 100644 index 00000000..4def93cd --- /dev/null +++ b/jicoco-websocket-client/src/main/kotlin/org/jitsi/websocket_client/WsProtocol.kt @@ -0,0 +1,23 @@ +/* + * Copyright @ 2018 - present 8x8, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.jitsi.websocket_client + +enum class WsProtocol { + WS, + WSS +} + From 342cf748db1e3815a77f6bc4455fc14a269db7c8 Mon Sep 17 00:00:00 2001 From: Brian Baldino Date: Wed, 11 Nov 2020 15:21:26 -0800 Subject: [PATCH 4/4] tweaks/cleanup in websocketclient --- .../jitsi/websocket_client/WebSocketClient.kt | 16 ++++++++++------ .../org/jitsi/websocket_client/WsProtocol.kt | 10 +++++++++- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/jicoco-websocket-client/src/main/kotlin/org/jitsi/websocket_client/WebSocketClient.kt b/jicoco-websocket-client/src/main/kotlin/org/jitsi/websocket_client/WebSocketClient.kt index b4508483..ec806638 100644 --- a/jicoco-websocket-client/src/main/kotlin/org/jitsi/websocket_client/WebSocketClient.kt +++ b/jicoco-websocket-client/src/main/kotlin/org/jitsi/websocket_client/WebSocketClient.kt @@ -72,11 +72,14 @@ class WebSocketClient( private var wsSession: DefaultClientWebSocketSession? = null fun sendString(data: String) { + require(isConnected()) coroutineScope.launch { msgsToSend.send(Frame.Text(data)) } } + fun isConnected(): Boolean = wsSession != null + // Starts the run loop for sending and receiving websocket messages private suspend fun DefaultClientWebSocketSession.startLoop() { launch { @@ -111,6 +114,7 @@ class WebSocketClient( * [SunCertPathBuilderException] WSS cert issue */ fun connect(): Boolean { + logger.debug { "Connecting to $wsProtocol://$host:$port/$path" } return try { wsSession = runBlocking { client.webSocketSession { @@ -133,8 +137,8 @@ class WebSocketClient( * Start the (asynchronous) loops to handle sending and receiving messages */ fun run() { + require(isConnected()) coroutineScope.launch { - requireNotNull(wsSession) wsSession?.startLoop() } } @@ -149,11 +153,11 @@ class WebSocketClient( job.cancelAndJoin() } } +} - private fun WsProtocol.toUrlProtocol(): URLProtocol { - return when (this) { - WsProtocol.WS -> URLProtocol.WS - WsProtocol.WSS -> URLProtocol.WSS - } +private fun WsProtocol.toUrlProtocol(): URLProtocol { + return when (this) { + WsProtocol.WS -> URLProtocol.WS + WsProtocol.WSS -> URLProtocol.WSS } } diff --git a/jicoco-websocket-client/src/main/kotlin/org/jitsi/websocket_client/WsProtocol.kt b/jicoco-websocket-client/src/main/kotlin/org/jitsi/websocket_client/WsProtocol.kt index 4def93cd..80003f6f 100644 --- a/jicoco-websocket-client/src/main/kotlin/org/jitsi/websocket_client/WsProtocol.kt +++ b/jicoco-websocket-client/src/main/kotlin/org/jitsi/websocket_client/WsProtocol.kt @@ -18,6 +18,14 @@ package org.jitsi.websocket_client enum class WsProtocol { WS, - WSS + WSS; + + override fun toString(): String { + return when (this) { + WS -> "ws" + WSS -> "wss" + + } + } }