diff --git a/jicoco-websocket-client/pom.xml b/jicoco-websocket-client/pom.xml new file mode 100644 index 0000000..6f01711 --- /dev/null +++ b/jicoco-websocket-client/pom.xml @@ -0,0 +1,121 @@ + + + + + 4.0.0 + + + org.jitsi + jicoco-parent + 1.1-SNAPSHOT + + + jicoco-websocket-client + 1.1-SNAPSHOT + + + 1.3.1 + 1.3.72 + 4.1.3 + + + + + org.jitsi + jitsi-utils + 1.0-60-g07c4a0b + + + io.ktor + ktor-client-cio + ${ktor.version} + + + org.jetbrains.kotlin + kotlin-stdlib-jdk8 + ${kotlin.version} + + + + io.kotest + kotest-runner-junit5-jvm + ${kotest.version} + test + + + io.kotest + kotest-assertions-core-jvm + ${kotest.version} + test + + + + io.ktor + ktor-server-jetty + ${ktor.version} + test + + + io.ktor + ktor-websockets + ${ktor.version} + test + + + + + src/main/kotlin + + + org.jetbrains.kotlin + kotlin-maven-plugin + ${kotlin.version} + + + compile + compile + + compile + + + + test-compile + test-compile + + test-compile + + + + + 1.8 + + -Xuse-experimental=io.ktor.util.KtorExperimentalAPI + + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.22.2 + + + + + diff --git a/jicoco-websocket-client/src/main/kotlin/org/jitsi/websocket_client/WebSocketClient.kt b/jicoco-websocket-client/src/main/kotlin/org/jitsi/websocket_client/WebSocketClient.kt new file mode 100644 index 0000000..ec80663 --- /dev/null +++ b/jicoco-websocket-client/src/main/kotlin/org/jitsi/websocket_client/WebSocketClient.kt @@ -0,0 +1,163 @@ +/* + * Copyright @ 2018 - present 8x8, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.jitsi.websocket_client + +import io.ktor.client.HttpClient +import io.ktor.client.engine.cio.CIO +import io.ktor.client.features.websocket.DefaultClientWebSocketSession +import io.ktor.client.features.websocket.WebSockets +import io.ktor.client.features.websocket.webSocketSession +import io.ktor.http.URLProtocol +import io.ktor.http.cio.websocket.CloseReason +import io.ktor.http.cio.websocket.Frame +import io.ktor.http.cio.websocket.close +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.cancelAndJoin +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.ClosedReceiveChannelException +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import org.jitsi.utils.logging2.Logger +import org.jitsi.utils.logging2.createChildLogger +import java.net.ConnectException + +/** + * A websocket client which sends messages and invokes a handler upon receiving + * messages from the far side. Sending is non-blocking, and the client has no + * notion of correlating "responses" to "requests": if request/response + * semantics are required then they must be implemented by a layer on top of + * this class. + */ +class WebSocketClient( + private val host: String, + private val wsProtocol: WsProtocol, + private val port: Int, + /** + * The path of the remote websocket URL + */ + private val path: String, + parentLogger: Logger, + private val incomingMessageHandler: (Frame) -> Unit = {}, + private val client: HttpClient = HttpClient(CIO) { + install(WebSockets) + }, + /** + * The dispatcher which will be used for all of the request and response + * processing. + */ + dispatcher: CoroutineDispatcher = Dispatchers.IO +) { + private val logger = createChildLogger(parentLogger) + private val job = Job() + private val coroutineScope = CoroutineScope(dispatcher + job) + private val msgsToSend = Channel(Channel.RENDEZVOUS) + private var wsSession: DefaultClientWebSocketSession? = null + + fun sendString(data: String) { + require(isConnected()) + coroutineScope.launch { + msgsToSend.send(Frame.Text(data)) + } + } + + fun isConnected(): Boolean = wsSession != null + + // Starts the run loop for sending and receiving websocket messages + private suspend fun DefaultClientWebSocketSession.startLoop() { + launch { + for (msg in incoming) { + incomingMessageHandler(msg) + } + } + try { + for (msg in msgsToSend) { + send(msg) + } + } catch (e: ClosedReceiveChannelException) { + logger.info("Websocket was closed") + return + } catch (e: CancellationException) { + logger.info("Websocket job was cancelled") + throw e + } catch (t: Throwable) { + logger.error("Error in websocket connection: ", t) + return + } + } + + /** + * Attempt to connect to the websocket server, returns true if the connection was + * successful, false otherwise. + * + * Known exceptions (not necessarily exhaustive): + * [ConnectException] if we can't connect + * [IllegalArgumentException] if expected WSS, but the server is WS + * [EOFException] if expected WS, but the server is WSS + * [SunCertPathBuilderException] WSS cert issue + */ + fun connect(): Boolean { + logger.debug { "Connecting to $wsProtocol://$host:$port/$path" } + return try { + wsSession = runBlocking { + client.webSocketSession { + url { + protocol = wsProtocol.toUrlProtocol() + host = this@WebSocketClient.host + port = this@WebSocketClient.port + path(path) + } + } + } + true + } catch (t: Throwable) { + logger.error("Error connecting", t) + false + } + } + + /** + * Start the (asynchronous) loops to handle sending and receiving messages + */ + fun run() { + require(isConnected()) + coroutineScope.launch { + wsSession?.startLoop() + } + } + + /** + * Stop and close the websocket connection + */ + fun stop() { + logger.info("Stopping") + runBlocking { + wsSession?.close(CloseReason(CloseReason.Codes.NORMAL, "bye")) + job.cancelAndJoin() + } + } +} + +private fun WsProtocol.toUrlProtocol(): URLProtocol { + return when (this) { + WsProtocol.WS -> URLProtocol.WS + WsProtocol.WSS -> URLProtocol.WSS + } +} diff --git a/jicoco-websocket-client/src/main/kotlin/org/jitsi/websocket_client/WsProtocol.kt b/jicoco-websocket-client/src/main/kotlin/org/jitsi/websocket_client/WsProtocol.kt new file mode 100644 index 0000000..80003f6 --- /dev/null +++ b/jicoco-websocket-client/src/main/kotlin/org/jitsi/websocket_client/WsProtocol.kt @@ -0,0 +1,31 @@ +/* + * Copyright @ 2018 - present 8x8, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.jitsi.websocket_client + +enum class WsProtocol { + WS, + WSS; + + override fun toString(): String { + return when (this) { + WS -> "ws" + WSS -> "wss" + + } + } +} + diff --git a/jicoco-websocket-client/src/test/kotlin/org/jitsi/websocket_client/TestWsServer.kt b/jicoco-websocket-client/src/test/kotlin/org/jitsi/websocket_client/TestWsServer.kt new file mode 100644 index 0000000..3984e04 --- /dev/null +++ b/jicoco-websocket-client/src/test/kotlin/org/jitsi/websocket_client/TestWsServer.kt @@ -0,0 +1,76 @@ +/* + * Copyright @ 2018 - present 8x8, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.jitsi.websocket_client + +import io.ktor.application.Application +import io.ktor.application.install +import io.ktor.http.cio.websocket.Frame +import io.ktor.http.cio.websocket.readText +import io.ktor.routing.route +import io.ktor.routing.routing +import io.ktor.websocket.WebSockets +import io.ktor.websocket.webSocket +import kotlinx.coroutines.cancel +import kotlinx.coroutines.delay + +/** + * A ws server configuration which defines multiple ws endpoints that can be + * used for different test scenarios. + */ +class TestWsServer { + val receivedMessages = mutableListOf() + val app: Application.() -> Unit = { + install(WebSockets) + + routing { + route("ws") { + // Receive messages and don't respond + webSocket("blackhole") { + for (frame in incoming) { + receivedMessages.add(frame) + } + } + // Receive messages and echo the content back + webSocket("echo") { + for (frame in incoming) { + receivedMessages.add(frame) + frame as Frame.Text + send(Frame.Text(frame.readText())) + } + } + // Receive messages and echo the content back after a delay + webSocket("delayecho") { + for (frame in incoming) { + receivedMessages.add(frame) + frame as Frame.Text + delay(1000) + send(Frame.Text(frame.readText())) + } + } + // Receive a message and then close the connection after a + // delay + webSocket("delayandclose") { + for (frame in incoming) { + receivedMessages.add(frame) + delay(1000) + cancel() + } + } + } + } + } +} diff --git a/jicoco-websocket-client/src/test/kotlin/org/jitsi/websocket_client/WebSocketClientTest.kt b/jicoco-websocket-client/src/test/kotlin/org/jitsi/websocket_client/WebSocketClientTest.kt new file mode 100644 index 0000000..19a17b2 --- /dev/null +++ b/jicoco-websocket-client/src/test/kotlin/org/jitsi/websocket_client/WebSocketClientTest.kt @@ -0,0 +1,120 @@ +/* + * Copyright @ 2018 - present 8x8, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.jitsi.websocket_client + +import io.kotest.assertions.timing.eventually +import io.kotest.core.spec.IsolationMode +import io.kotest.core.spec.style.ShouldSpec +import io.kotest.matchers.collections.shouldHaveSize +import io.kotest.matchers.shouldBe +import io.kotest.matchers.types.shouldBeInstanceOf +import io.ktor.client.HttpClient +import io.ktor.client.engine.cio.CIO +import io.ktor.client.features.websocket.WebSockets +import io.ktor.http.cio.websocket.Frame +import io.ktor.http.cio.websocket.readText +import io.ktor.server.engine.embeddedServer +import io.ktor.server.jetty.Jetty +import org.jitsi.utils.logging2.LoggerImpl +import kotlin.random.Random +import kotlin.time.ExperimentalTime +import kotlin.time.seconds + +@ExperimentalTime +class WebSocketClientTest : ShouldSpec() { + override fun isolationMode(): IsolationMode? = IsolationMode.InstancePerLeaf + + private val wsPort = Random.nextInt(1024, 65535).also { + println("Server running on port $it") + } + + private val client = HttpClient(CIO) { + install(WebSockets) + } + + private val wsServer = TestWsServer() + + private val server = embeddedServer(Jetty, port = wsPort) { + wsServer.app(this) + } + + private val receivedMessages = mutableListOf() + + private fun incomingMessageHandler(frame: Frame) { + receivedMessages.add(frame) + } + + private val testLogger = LoggerImpl("test") + + init { + server.start() + + context("sendString") { + context("when no reply is expected") { + val ws = createAndStartWsClient("/ws/blackhole") + ws.sendString("hello") + should("send a message") { + eventually(5.seconds) { + wsServer.receivedMessages shouldHaveSize 1 + } + wsServer.receivedMessages.first().shouldBeInstanceOf() + (wsServer.receivedMessages.first() as Frame.Text).readText() shouldBe "hello" + } + } + context("when a reply is expected") { + val ws = createAndStartWsClient("/ws/echo") + ws.sendString("hello") + should("invoke the incoming message handler") { + eventually(5.seconds) { + receivedMessages shouldHaveSize 1 + } + receivedMessages.first().shouldBeInstanceOf() + (receivedMessages.first() as Frame.Text).readText() shouldBe "hello" + } + } + context("when the client is stopped while waiting for a reply") { + val ws = createAndStartWsClient("/ws/delayandclose") + Thread.sleep(1000) + ws.sendString("hello") + ws.stop() + Thread.sleep(1000) + } + context("stop") { + val ws = createAndStartWsClient("/ws/echo") + ws.sendString("hello") + should("clean things up correctly") { + ws.stop() + } + } + } + } + + private fun createAndStartWsClient(path: String): WebSocketClient { + return WebSocketClient( + "localhost", + WebSocketClient.WsProtocol.WS, + wsPort, + path, + testLogger, + ::incomingMessageHandler, + client + ).apply { + connect() + run() + } + } +} diff --git a/pom.xml b/pom.xml index 046b79c..b5b33b4 100644 --- a/pom.xml +++ b/pom.xml @@ -36,6 +36,7 @@ jicoco jicoco-test-kotlin jicoco-config + jicoco-websocket-client