diff --git a/build.gradle b/build.gradle index 128cf74..ed65d07 100644 --- a/build.gradle +++ b/build.gradle @@ -59,15 +59,16 @@ dependencies { compile 'org.glassfish.tyrus:tyrus-client:1.15' compile 'org.glassfish.tyrus:tyrus-core:1.15' compile 'org.glassfish.tyrus:tyrus-container-grizzly-client:1.15' - implementation "io.ktor:ktor-client-core:$ktorVersion" - implementation "io.ktor:ktor-client-cio:$ktorVersion" - implementation "io.ktor:ktor-client-jackson:$ktorVersion" - implementation "io.ktor:ktor-client-websockets:$ktorVersion" - implementation "io.ktor:ktor-client-encoding:$ktorVersion" - implementation "io.ktor:ktor-client-encoding-jvm:$ktorVersion" - implementation 'org.springframework:spring-websocket:5.1.9.RELEASE' + compile "io.ktor:ktor-client-core:$ktorVersion" + compile "io.ktor:ktor-client-cio:$ktorVersion" + compile "io.ktor:ktor-client-jackson:$ktorVersion" + compile "io.ktor:ktor-client-websockets:$ktorVersion" + compile "io.ktor:ktor-client-encoding:$ktorVersion" + compile "io.ktor:ktor-client-encoding-jvm:$ktorVersion" + compile 'org.springframework:spring-websocket:5.1.9.RELEASE' implementation 'io.arrow-kt:arrow-core-data:0.9.0' + //compile 'jakarta.websocket:jakarta.websocket-api:1.1.1' testCompile "org.apache.logging.log4j:log4j-slf4j-impl:$log4j_version" testCompile "io.kotlintest:kotlintest-runner-junit5:$kotlin_test" @@ -110,6 +111,7 @@ shadowJar { manifest { attributes "Main-Class": "io.lenses.jdbc4.LensesDriver" relocate 'com.fasterxml', 'shadow.com.fasterxml' + relocate 'org.springframework', 'shadow.org.springframework' } } diff --git a/gradle.properties b/gradle.properties index 3426cf4..65a96c8 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,5 +1,3 @@ kotlin.incremental=true -version=3.0.1 +version=3.0.2 -ossrhUsername=you -ossrhPassword=me \ No newline at end of file diff --git a/src/main/kotlin/io/lenses/jdbc4/client/LensesClient.kt b/src/main/kotlin/io/lenses/jdbc4/client/LensesClient.kt index 5ceb39b..afc07c0 100644 --- a/src/main/kotlin/io/lenses/jdbc4/client/LensesClient.kt +++ b/src/main/kotlin/io/lenses/jdbc4/client/LensesClient.kt @@ -1,11 +1,6 @@ package io.lenses.jdbc4.client -import arrow.core.Either -import arrow.core.Right -import arrow.core.Try -import arrow.core.flatMap -import arrow.core.left -import arrow.core.right +import arrow.core.* import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.node.NullNode import com.fasterxml.jackson.databind.node.TextNode @@ -38,12 +33,8 @@ import kotlinx.coroutines.ObsoleteCoroutinesApi import org.apache.avro.Schema import org.glassfish.tyrus.client.ClientManager import org.glassfish.tyrus.client.ClientProperties -import org.springframework.web.socket.CloseStatus -import org.springframework.web.socket.TextMessage -import org.springframework.web.socket.WebSocketHandler -import org.springframework.web.socket.WebSocketHttpHeaders -import org.springframework.web.socket.WebSocketMessage -import org.springframework.web.socket.WebSocketSession +import org.springframework.web.socket.* +import org.springframework.web.socket.client.WebSocketClient import org.springframework.web.socket.client.standard.StandardWebSocketClient import java.net.URI import java.util.concurrent.LinkedBlockingQueue @@ -179,7 +170,22 @@ class LensesClient(private val url: String, val uri = URI.create(url.replace("https://", "ws://").replace("http://", "ws://")) val headers = WebSocketHttpHeaders() headers.add(LensesTokenHeader, token.value) - val wsclient = StandardWebSocketClient() + //Expected to read from env variables + //val sslContextConfigurator = SslContextConfigurator() + /* sslContextConfigurator.setTrustStoreFile("..."); + * sslContextConfigurator.setTrustStorePassword("..."); + * sslContextConfigurator.setTrustStoreType("..."); + * sslContextConfigurator.setKeyStoreFile("..."); + * sslContextConfigurator.setKeyStorePassword("..."); + * sslContextConfigurator.setKeyStoreType("..."); + */ + //val sslEngineConfigurator = SslEngineConfigurator(sslContextConfigurator, true,false, false) + + val clientManager:ClientManager = ClientManager.createClient() + clientManager.properties[ClientProperties.REDIRECT_ENABLED] = true + //clientManager.properties[ClientProperties.SSL_ENGINE_CONFIGURATOR] = sslEngineConfigurator + val wsclient: WebSocketClient = StandardWebSocketClient(clientManager) + val queue = LinkedBlockingQueue(200) val jdbcRequest = JdbcRequestMessage(sql, token.value) val handler = object : WebSocketHandler { @@ -216,12 +222,21 @@ class LensesClient(private val url: String, override fun supportsPartialMessages(): Boolean = false } + logger.debug("Connecting to websocket at $uri") val sess = wsclient.doHandshake(handler, headers, uri).get() val conn = object : WebsocketConnection { override val queue = queue - override fun close() = sess.close() + override fun close() { + if(sess.isOpen) { + try { + sess.close() + } catch (t: Throwable) { + + } + } + } override fun isClosed(): Boolean = !sess.isOpen } conn.right() diff --git a/src/test/kotlin/io/lenses/jdbc4/ProducerSetup.kt b/src/test/kotlin/io/lenses/jdbc4/ProducerSetup.kt index d0da604..b50e876 100644 --- a/src/test/kotlin/io/lenses/jdbc4/ProducerSetup.kt +++ b/src/test/kotlin/io/lenses/jdbc4/ProducerSetup.kt @@ -21,7 +21,7 @@ interface ProducerSetup : Logging { fun conn(): Connection { LensesDriver() - return DriverManager.getConnection("jdbc:lenses:kafka:http://localhost:3030", "admin", "admin") + return DriverManager.getConnection("jdbc:lenses:kafka:https://localhost:3030", "admin", "admin") } fun schemaClient() = CachedSchemaRegistryClient("http://127.0.0.1:8081", 1000)