Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement a general-purpose websocket client #128

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions jicoco-websocket-client/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright @ 2018 - present 8x8, Inc.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.jitsi</groupId>
<artifactId>jicoco-parent</artifactId>
<version>1.1-SNAPSHOT</version>
</parent>

<artifactId>jicoco-websocket-client</artifactId>
<version>1.1-SNAPSHOT</version>

<properties>
<ktor.version>1.3.1</ktor.version>
<kotlin.version>1.3.72</kotlin.version>
<kotest.version>4.1.3</kotest.version>
</properties>

<dependencies>
<dependency>
<groupId>org.jitsi</groupId>
<artifactId>jitsi-utils</artifactId>
<version>1.0-60-g07c4a0b</version>
</dependency>
<dependency>
<groupId>io.ktor</groupId>
<artifactId>ktor-client-cio</artifactId>
<version>${ktor.version}</version>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib-jdk8</artifactId>
<version>${kotlin.version}</version>
</dependency>
<!-- Test Deps -->
<dependency>
<groupId>io.kotest</groupId>
<artifactId>kotest-runner-junit5-jvm</artifactId>
<version>${kotest.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.kotest</groupId>
<artifactId>kotest-assertions-core-jvm</artifactId>
<version>${kotest.version}</version>
<scope>test</scope>
</dependency>
<!-- We include the server libs in test, as it's currently the only way
to test websockets -->
<dependency>
<groupId>io.ktor</groupId>
<artifactId>ktor-server-jetty</artifactId>
<version>${ktor.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.ktor</groupId>
<artifactId>ktor-websockets</artifactId>
<version>${ktor.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<sourceDirectory>src/main/kotlin</sourceDirectory>
<plugins>
<plugin>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-maven-plugin</artifactId>
<version>${kotlin.version}</version>
<executions>
<execution>
<id>compile</id>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile</id>
<phase>test-compile</phase>
<goals>
<goal>test-compile</goal>
</goals>
</execution>
</executions>
<configuration>
<jvmTarget>1.8</jvmTarget>
<args>
<arg>-Xuse-experimental=io.ktor.util.KtorExperimentalAPI</arg>
</args>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.2</version>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Copyright @ 2018 - present 8x8, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.jitsi.websocket_client

import io.ktor.client.HttpClient
import io.ktor.client.engine.cio.CIO
import io.ktor.client.features.websocket.DefaultClientWebSocketSession
import io.ktor.client.features.websocket.WebSockets
import io.ktor.client.features.websocket.webSocketSession
import io.ktor.http.URLProtocol
import io.ktor.http.cio.websocket.CloseReason
import io.ktor.http.cio.websocket.Frame
import io.ktor.http.cio.websocket.close
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.jitsi.utils.logging2.Logger
import org.jitsi.utils.logging2.createChildLogger
import java.net.ConnectException

/**
* A websocket client which sends messages and invokes a handler upon receiving
* messages from the far side. Sending is non-blocking, and the client has no
* notion of correlating "responses" to "requests": if request/response
* semantics are required then they must be implemented by a layer on top of
* this class.
*/
class WebSocketClient(
private val host: String,
private val wsProtocol: WsProtocol,
private val port: Int,
/**
* The path of the remote websocket URL
*/
private val path: String,
parentLogger: Logger,
private val incomingMessageHandler: (Frame) -> Unit = {},
private val client: HttpClient = HttpClient(CIO) {
install(WebSockets)
},
/**
* The dispatcher which will be used for all of the request and response
* processing.
*/
dispatcher: CoroutineDispatcher = Dispatchers.IO
) {
private val logger = createChildLogger(parentLogger)
private val job = Job()
private val coroutineScope = CoroutineScope(dispatcher + job)
private val msgsToSend = Channel<Frame>(Channel.RENDEZVOUS)
private var wsSession: DefaultClientWebSocketSession? = null

fun sendString(data: String) {
require(isConnected())
coroutineScope.launch {
msgsToSend.send(Frame.Text(data))
}
}

fun isConnected(): Boolean = wsSession != null

// Starts the run loop for sending and receiving websocket messages
private suspend fun DefaultClientWebSocketSession.startLoop() {
launch {
for (msg in incoming) {
incomingMessageHandler(msg)
}
}
try {
for (msg in msgsToSend) {
send(msg)
}
} catch (e: ClosedReceiveChannelException) {
logger.info("Websocket was closed")
return
} catch (e: CancellationException) {
logger.info("Websocket job was cancelled")
throw e
} catch (t: Throwable) {
logger.error("Error in websocket connection: ", t)
return
}
}

/**
* Attempt to connect to the websocket server, returns true if the connection was
* successful, false otherwise.
*
* Known exceptions (not necessarily exhaustive):
* [ConnectException] if we can't connect
* [IllegalArgumentException] if expected WSS, but the server is WS
* [EOFException] if expected WS, but the server is WSS
* [SunCertPathBuilderException] WSS cert issue
*/
fun connect(): Boolean {
logger.debug { "Connecting to $wsProtocol://$host:$port/$path" }
return try {
wsSession = runBlocking {
client.webSocketSession {
url {
protocol = wsProtocol.toUrlProtocol()
host = [email protected]
port = [email protected]
path(path)
}
}
}
true
} catch (t: Throwable) {
logger.error("Error connecting", t)
false
}
}

/**
* Start the (asynchronous) loops to handle sending and receiving messages
*/
fun run() {
require(isConnected())
coroutineScope.launch {
wsSession?.startLoop()
}
}

/**
* Stop and close the websocket connection
*/
fun stop() {
logger.info("Stopping")
runBlocking {
wsSession?.close(CloseReason(CloseReason.Codes.NORMAL, "bye"))
job.cancelAndJoin()
}
}
}

private fun WsProtocol.toUrlProtocol(): URLProtocol {
return when (this) {
WsProtocol.WS -> URLProtocol.WS
WsProtocol.WSS -> URLProtocol.WSS
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright @ 2018 - present 8x8, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.jitsi.websocket_client

enum class WsProtocol {
WS,
WSS;

override fun toString(): String {
return when (this) {
WS -> "ws"
WSS -> "wss"

}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright @ 2018 - present 8x8, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.jitsi.websocket_client

import io.ktor.application.Application
import io.ktor.application.install
import io.ktor.http.cio.websocket.Frame
import io.ktor.http.cio.websocket.readText
import io.ktor.routing.route
import io.ktor.routing.routing
import io.ktor.websocket.WebSockets
import io.ktor.websocket.webSocket
import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay

/**
* A ws server configuration which defines multiple ws endpoints that can be
* used for different test scenarios.
*/
class TestWsServer {
val receivedMessages = mutableListOf<Frame>()
val app: Application.() -> Unit = {
install(WebSockets)

routing {
route("ws") {
// Receive messages and don't respond
webSocket("blackhole") {
for (frame in incoming) {
receivedMessages.add(frame)
}
}
// Receive messages and echo the content back
webSocket("echo") {
for (frame in incoming) {
receivedMessages.add(frame)
frame as Frame.Text
send(Frame.Text(frame.readText()))
}
}
// Receive messages and echo the content back after a delay
webSocket("delayecho") {
for (frame in incoming) {
receivedMessages.add(frame)
frame as Frame.Text
delay(1000)
send(Frame.Text(frame.readText()))
}
}
// Receive a message and then close the connection after a
// delay
webSocket("delayandclose") {
for (frame in incoming) {
receivedMessages.add(frame)
delay(1000)
cancel()
}
}
}
}
}
}
Loading