From 4be92b60fd826ac711e79ef59c96814569da1695 Mon Sep 17 00:00:00 2001 From: stefan bocutiu Date: Wed, 8 Aug 2018 21:04:00 +0100 Subject: [PATCH 1/3] fix authenticate fix test BUILD SUCCESSFUL in 5m 36s 12 actionable tasks: 12 executed --- build.gradle | 4 +- .../com/landoop/jdbc4/client/RestClient.kt | 624 +++++++++--------- .../com/landoop/jdbc4/BatchInsertTest.kt | 2 +- .../jdbc4/BatchNestedInsertStressTest.kt | 2 +- .../com/landoop/jdbc4/PreparedInsertTest.kt | 2 +- .../com/landoop/jdbc4/RestClientTest.kt | 2 +- .../com/landoop/jdbc4/ScaleQueryTest.kt | 2 +- .../jdbc4/SingleFieldSchemaQueryTest.kt | 6 +- 8 files changed, 321 insertions(+), 323 deletions(-) diff --git a/build.gradle b/build.gradle index 43e1231..b0e2515 100644 --- a/build.gradle +++ b/build.gradle @@ -5,7 +5,7 @@ buildscript { ext.slf4j_version = '1.7.25' ext.joda_version = '2.9.9' ext.avro_version = '1.8.2' - ext.httpclient_version = '4.5.3' + ext.httpclient_version = '4.5.6' ext.jackson_version = '2.9.5' ext.kafka_version = '0.11.0.2' repositories { @@ -140,12 +140,14 @@ task packageSources(type: Jar, dependsOn: 'classes') { classifier = 'sources' } +/* task copyToLib(type: Copy) { into "$buildDir/libs" from configurations.runtime } build.dependsOn(copyToLib) +*/ artifacts { archives jar diff --git a/src/main/kotlin/com/landoop/jdbc4/client/RestClient.kt b/src/main/kotlin/com/landoop/jdbc4/client/RestClient.kt index 7222cab..c92023f 100644 --- a/src/main/kotlin/com/landoop/jdbc4/client/RestClient.kt +++ b/src/main/kotlin/com/landoop/jdbc4/client/RestClient.kt @@ -2,14 +2,7 @@ package com.landoop.jdbc4.client import com.landoop.jdbc4.Constants import com.landoop.jdbc4.JacksonSupport -import com.landoop.jdbc4.client.domain.Credentials -import com.landoop.jdbc4.client.domain.InsertRecord -import com.landoop.jdbc4.client.domain.InsertResponse -import com.landoop.jdbc4.client.domain.Message -import com.landoop.jdbc4.client.domain.PreparedInsertResponse -import com.landoop.jdbc4.client.domain.StreamingSelectResult -import com.landoop.jdbc4.client.domain.Table -import com.landoop.jdbc4.client.domain.Topic +import com.landoop.jdbc4.client.domain.* import org.apache.http.HttpEntity import org.apache.http.HttpResponse import org.apache.http.client.config.RequestConfig @@ -28,372 +21,375 @@ import org.glassfish.tyrus.client.ClientProperties import org.slf4j.LoggerFactory import java.io.IOException import java.net.URI -import java.net.URL import java.net.URLEncoder import java.sql.SQLException import java.util.concurrent.Executors -import javax.websocket.ClientEndpointConfig -import javax.websocket.Endpoint -import javax.websocket.EndpointConfig -import javax.websocket.MessageHandler -import javax.websocket.Session +import javax.websocket.* class RestClient(private val urls: List, private val credentials: Credentials, private val weakSSL: Boolean // if set to true then will allow self signed certificates ) : AutoCloseable { - private val client = ClientManager.createClient().apply { - this.properties[ClientProperties.REDIRECT_ENABLED] = true - } - private val logger = LoggerFactory.getLogger(RestClient::class.java) - private val timeout = 60_000 - - private val defaultRequestConfig = RequestConfig.custom() - .setConnectTimeout(timeout) - .setConnectionRequestTimeout(timeout) - .setSocketTimeout(timeout) - .build() - - private val sslContext = SSLContextBuilder.create() - .loadTrustMaterial(TrustSelfSignedStrategy()) - .build() - - private val allowAllHosts = NoopHostnameVerifier() - - private val connectionFactory = SSLConnectionSocketFactory(sslContext, allowAllHosts) - - private val httpClient = HttpClientBuilder.create().let { - it.setKeepAliveStrategy(DefaultKeepAlive) - it.setDefaultRequestConfig(defaultRequestConfig) - if (weakSSL) - it.setSSLSocketFactory(connectionFactory) - it.build() - } - - // the token received the last time we attempted to authenticate - internal var token: String = authenticate() - - // some connection pools, eg org.apache.commons.dbcp, will check that the connection is open - // before they hand it over to be used. Since a rest client is always stateless, we can just - // return true here as there's nothing to close. - var isClosed: Boolean = false - private set - - override fun close() { - httpClient.close() - isClosed = true - } - - fun connectTimeout(): Int = defaultRequestConfig.connectTimeout - fun connectionRequestTimeout(): Int = defaultRequestConfig.connectionRequestTimeout - fun socketTimeout(): Int = defaultRequestConfig.socketTimeout - - // attempt a given request for each url until one is successful, or all have been exhausted - // a 401 or 403 will result in a short circuit exit - // an IOException, or an unsupported http status code will result in trying the next url - // once all urls are exhausted, the last exception will be thrown - private fun attempt(reqFn: (String) -> HttpUriRequest, respFn: (HttpResponse) -> T): T { - var lastException: Throwable? = null - for (url in urls) { - lastException = try { - val req = reqFn(url) - val resp = httpClient.execute(req) - logger.debug("Response $resp") - when (resp.statusLine.statusCode) { - 200, 201, 202 -> return respFn(resp) - 401, 403 -> throw AuthenticationException("Invalid credentials for user '${credentials.user}'") - else -> { - val body = resp.entity.content.bufferedReader().use { it.readText() } - throw SQLException("url=$url, req=$req, ${resp.statusLine.statusCode} ${resp.statusLine.reasonPhrase}: $body") - } - } - } catch (e: SQLException) { - e - } catch (e: IOException) { - e - } - } - throw lastException!! - } - - // attempts the given request with authentication by adding the current token as a header - private fun attemptAuthenticated(reqFn: (String) -> HttpUriRequest, respFn: (HttpResponse) -> T): T { - val reqWithTokenHeaderFn: (String) -> HttpUriRequest = { - reqFn(it).apply { - addHeader(Constants.LensesTokenHeader, token) - } - } - return attempt(reqWithTokenHeaderFn, respFn) - } - - // attempts the given request with authentication - // if an authentication error is received, then it will attempt to - // re-authenticate before retrying again - // if auth is still invalid then it will give up - private fun attemptAuthenticatedWithRetry(reqFn: (String) -> HttpUriRequest, respFn: (HttpResponse) -> T): T { - return try { - attemptAuthenticated(reqFn, respFn) - } catch (e: AuthenticationException) { - token = authenticate() - attemptAuthenticated(reqFn, respFn) + private val client = ClientManager.createClient().apply { + this.properties[ClientProperties.REDIRECT_ENABLED] = true } - } - - private fun attemptAuthenticatedWithRetry(endpoint: Endpoint, uri: URI) { - return try { - attemptAuthenticated(endpoint, uri) - } catch (e: Exception) { - token = authenticate() - attemptAuthenticated(endpoint, uri) - } - } + private val logger = LoggerFactory.getLogger(RestClient::class.java) + private val timeout = 60_000 + + private val defaultRequestConfig = RequestConfig.custom() + .setConnectTimeout(timeout) + .setConnectionRequestTimeout(timeout) + .setSocketTimeout(timeout) + .build() - private fun attemptAuthenticated(endpoint: Endpoint, uri: URI) { + private val sslContext = SSLContextBuilder.create() + .loadTrustMaterial(TrustSelfSignedStrategy()) + .build() - val configurator = object : ClientEndpointConfig.Configurator() { - override fun beforeRequest(headers: MutableMap>) { - headers[Constants.LensesTokenHeader] = mutableListOf(token) - } + private val allowAllHosts = NoopHostnameVerifier() + + private val connectionFactory = SSLConnectionSocketFactory(sslContext, allowAllHosts) + + private val httpClient = HttpClientBuilder.create().let { + it.setKeepAliveStrategy(DefaultKeepAlive) + it.setDefaultRequestConfig(defaultRequestConfig) + if (weakSSL) + it.setSSLSocketFactory(connectionFactory) + it.build() } - val config: ClientEndpointConfig = ClientEndpointConfig.Builder.create().configurator(configurator).build() - client.connectToServer(endpoint, config, uri) - } + // the token received the last time we attempted to authenticate + internal var token: String = authenticate() - // attempts to authenticate, and returns the token if successful - private fun authenticate(): String { + // some connection pools, eg org.apache.commons.dbcp, will check that the connection is open + // before they hand it over to be used. Since a rest client is always stateless, we can just + // return true here as there's nothing to close. + var isClosed: Boolean = false + private set - val requestFn: (String) -> HttpUriRequest = { - val entity = jsonEntity(credentials) - val endpoint = "$it/api/login" - logger.debug("Authenticating at $endpoint") - jsonPost(endpoint, entity) + override fun close() { + httpClient.close() + isClosed = true } - val responseFn: (HttpResponse) -> String = { - val token = EntityUtils.toString(it.entity) - logger.debug("Authentication token: $token") - token + fun connectTimeout(): Int = defaultRequestConfig.connectTimeout + fun connectionRequestTimeout(): Int = defaultRequestConfig.connectionRequestTimeout + fun socketTimeout(): Int = defaultRequestConfig.socketTimeout + + // attempt a given request for each url until one is successful, or all have been exhausted + // a 401 or 403 will result in a short circuit exit + // an IOException, or an unsupported http status code will result in trying the next url + // once all urls are exhausted, the last exception will be thrown + private fun attempt(reqFn: (String) -> HttpUriRequest, respFn: (HttpResponse) -> T): T { + var lastException: Throwable? = null + for (url in urls) { + lastException = try { + val req = reqFn(url) + val resp = httpClient.execute(req) + logger.debug("Response $resp") + when (resp.statusLine.statusCode) { + 200, 201, 202 -> return respFn(resp) + 401, 403 -> throw AuthenticationException("Invalid credentials for user '${credentials.user}'") + else -> { + val body = resp.entity.content.bufferedReader().use { it.readText() } + throw SQLException("url=$url, req=$req, ${resp.statusLine.statusCode} ${resp.statusLine.reasonPhrase}: $body") + } + } + } catch (e: SQLException) { + e + } catch (e: IOException) { + e + } + } + throw lastException!! } - return attempt(requestFn, responseFn) - } + // attempts the given request with authentication by adding the current token as a header + private fun attemptAuthenticated(reqFn: (String) -> HttpUriRequest, respFn: (HttpResponse) -> T): T { + val reqWithTokenHeaderFn: (String) -> HttpUriRequest = { + reqFn(it).apply { + addHeader(Constants.LensesTokenHeader, token) + } + } + return attempt(reqWithTokenHeaderFn, respFn) + } - fun topic(topicName: String): Topic { + // attempts the given request with authentication + // if an authentication error is received, then it will attempt to + // re-authenticate before retrying again + // if auth is still invalid then it will give up + private fun attemptAuthenticatedWithRetry(reqFn: (String) -> HttpUriRequest, respFn: (HttpResponse) -> T): T { + return try { + attemptAuthenticated(reqFn, respFn) + } catch (e: AuthenticationException) { + token = authenticate() + attemptAuthenticated(reqFn, respFn) + } + } - val requestFn: (String) -> HttpUriRequest = { - val endpoint = "$it/api/topics/$topicName" - logger.debug("Fetching topic @ $endpoint") - jsonGet(endpoint) + private fun attemptAuthenticatedWithRetry(endpoint: Endpoint, uri: URI) { + return try { + attemptAuthenticated(endpoint, uri) + } catch (e: Exception) { + token = authenticate() + attemptAuthenticated(endpoint, uri) + } } - // once we get 200 - val responseFn: (HttpResponse) -> Topic = { - logger.debug("Topic json") - val str = it.entity.content.bufferedReader().use { it.readText() } - logger.debug(str) - JacksonSupport.fromJson(str) + private fun attemptAuthenticated(endpoint: Endpoint, uri: URI) { + + val configurator = object : ClientEndpointConfig.Configurator() { + override fun beforeRequest(headers: MutableMap>) { + headers[Constants.LensesTokenHeader] = mutableListOf(token) + } + } + + val config: ClientEndpointConfig = ClientEndpointConfig.Builder.create().configurator(configurator).build() + client.connectToServer(endpoint, config, uri) } - return attemptAuthenticated(requestFn, responseFn) - } + // attempts to authenticate, and returns the token if successful + private fun authenticate(): String { - fun tables(): Array { + val requestFn: (String) -> HttpUriRequest = { + val entity = jsonEntity(credentials) + val endpoint = "$it/api/login" + logger.debug("Authenticating at $endpoint") + jsonPostWithTextPlain(endpoint, entity) + } + + val responseFn: (HttpResponse) -> String = { + val token = EntityUtils.toString(it.entity) + logger.debug("Authentication token: $token") + token + } - val requestFn: (String) -> HttpUriRequest = { - val endpoint = "$it/api/jdbc/metadata/table" - logger.debug("Fetching topics @ $endpoint") - jsonGet(endpoint) + return attempt(requestFn, responseFn) } - val responseFn: (HttpResponse) -> Array
= { - val str = it.entity.content.bufferedReader().use { it.readText() } - JacksonSupport.fromJson(str) + fun topic(topicName: String): Topic { + + val requestFn: (String) -> HttpUriRequest = { + val endpoint = "$it/api/topics/$topicName" + logger.debug("Fetching topic @ $endpoint") + jsonGet(endpoint) + } + + // once we get 200 + val responseFn: (HttpResponse) -> Topic = { + logger.debug("Topic json") + val str = it.entity.content.bufferedReader().use { it.readText() } + logger.debug(str) + JacksonSupport.fromJson(str) + } + + return attemptAuthenticated(requestFn, responseFn) } - return attemptAuthenticated(requestFn, responseFn) - } - - private fun escape(url: String): String { - //replace \n with ' ' - //captures SELECT/INSERT statements - return URLEncoder.encode(url.trim().replace(System.lineSeparator(), " "), "UTF-8").replace("%20", "+") - } - - fun insert(sql: String): InsertResponse { - val requestFn: (String) -> HttpUriRequest = { - val endpoint = "$it/api/jdbc/insert" - val entity = stringEntity(sql) - logger.debug("Executing query $endpoint") - logger.debug(sql) - plainTextPost(endpoint, entity) + fun tables(): Array
{ + + val requestFn: (String) -> HttpUriRequest = { + val endpoint = "$it/api/jdbc/metadata/table" + logger.debug("Fetching topics @ $endpoint") + jsonGet(endpoint) + } + + val responseFn: (HttpResponse) -> Array
= { + val str = it.entity.content.bufferedReader().use { it.readText() } + JacksonSupport.fromJson(str) + } + + return attemptAuthenticated(requestFn, responseFn) } - val responseFn: (HttpResponse) -> InsertResponse = { - InsertResponse(it.statusLine.statusCode.toString()) - /*val json = it.entity.content.bufferedReader().use { it.readText() } - try{ - JacksonSupport.fromJson(json) - }catch (t:Throwable){ - InsertResponse(json) - }*/ + private fun escape(url: String): String { + //replace \n with ' ' + //captures SELECT/INSERT statements + return URLEncoder.encode(url.trim().replace(System.lineSeparator(), " "), "UTF-8").replace("%20", "+") } - return attemptAuthenticatedWithRetry(requestFn, responseFn) - } - - /** - * Executes a prepared insert statement. - * - * @param topic the topic to run the insert again - * @param records the insert variables for each row - */ - fun executePreparedInsert(topic: String, keyType: String, valueType: String, records: List): Any { - - val requestFn: (String) -> HttpUriRequest = { - val endpoint = "$it/api/jdbc/insert/prepared/$topic?kt=$keyType&vt=$valueType" - val entity = jsonEntity(records) - jsonPost(endpoint, entity) + fun insert(sql: String): InsertResponse { + val requestFn: (String) -> HttpUriRequest = { + val endpoint = "$it/api/jdbc/insert" + val entity = stringEntity(sql) + logger.debug("Executing query $endpoint") + logger.debug(sql) + plainTextPost(endpoint, entity) + } + + val responseFn: (HttpResponse) -> InsertResponse = { + InsertResponse(it.statusLine.statusCode.toString()) + /*val json = it.entity.content.bufferedReader().use { it.readText() } + try{ + JacksonSupport.fromJson(json) + }catch (t:Throwable){ + InsertResponse(json) + }*/ + } + + return attemptAuthenticatedWithRetry(requestFn, responseFn) } - // at the moment the response just returns ok or an error status - // in the case of receiving an ok (201) there's not much to do but return true - val responseFn: (HttpResponse) -> Boolean = { - val entity = it.entity.content.bufferedReader().use { it.readText() } - logger.debug("Prepared insert response $entity") - true + /** + * Executes a prepared insert statement. + * + * @param topic the topic to run the insert again + * @param records the insert variables for each row + */ + fun executePreparedInsert(topic: String, keyType: String, valueType: String, records: List): Any { + + val requestFn: (String) -> HttpUriRequest = { + val endpoint = "$it/api/jdbc/insert/prepared/$topic?kt=$keyType&vt=$valueType" + val entity = jsonEntity(records) + jsonPost(endpoint, entity) + } + + // at the moment the response just returns ok or an error status + // in the case of receiving an ok (201) there's not much to do but return true + val responseFn: (HttpResponse) -> Boolean = { + val entity = it.entity.content.bufferedReader().use { it.readText() } + logger.debug("Prepared insert response $entity") + true + } + + return attemptAuthenticatedWithRetry(requestFn, responseFn) } - return attemptAuthenticatedWithRetry(requestFn, responseFn) - } - - fun select(sql: String): StreamingSelectResult { - logger.debug("Executing query $sql") - - // hacky fix for spark - val r = "SELECT.*?FROM\\s+SELECT".toRegex() - val normalizedSql = sql.replaceFirst(r, "SELECT") - - logger.debug("Normalized query $normalizedSql") - - val escapedSql = escape(normalizedSql) - val url = "${urls[0]}/api/ws/jdbc/data?sql=$escapedSql" - val uri = URI.create(url.replace("https://", "ws://").replace("http://", "ws://")) - - val executor = Executors.newSingleThreadExecutor() - val result = StreamingSelectResult() - val endpoint = object : Endpoint() { - - override fun onOpen(session: Session, config: EndpointConfig) { - session.addMessageHandler(MessageHandler.Whole { - executor.submit(messageHandler(it)) - }) - } - - fun messageHandler(message: String): Runnable = Runnable { - try { - when (message.take(1)) { - // records - "0" -> result.addRecord(message.drop(1)) - // error case - "1" -> { - val e = SQLException(message.drop(1)) - logger.error("Error from select protocol: $message") - logger.debug("Original query: $uri") - throw e + fun select(sql: String): StreamingSelectResult { + logger.debug("Executing query $sql") + + // hacky fix for spark + val r = "SELECT.*?FROM\\s+SELECT".toRegex() + val normalizedSql = sql.replaceFirst(r, "SELECT") + + logger.debug("Normalized query $normalizedSql") + + val escapedSql = escape(normalizedSql) + val url = "${urls[0]}/api/ws/jdbc/data?sql=$escapedSql" + val uri = URI.create(url.replace("https://", "ws://").replace("http://", "ws://")) + + val executor = Executors.newSingleThreadExecutor() + val result = StreamingSelectResult() + val endpoint = object : Endpoint() { + + override fun onOpen(session: Session, config: EndpointConfig) { + session.addMessageHandler(MessageHandler.Whole { + executor.submit(messageHandler(it)) + }) } - // schema - "2" -> result.setSchema(message.drop(1)) - // all done - "3" -> { - executor.submit { result.endStream() } - executor.shutdown() + + fun messageHandler(message: String): Runnable = Runnable { + try { + when (message.take(1)) { + // records + "0" -> result.addRecord(message.drop(1)) + // error case + "1" -> { + val e = SQLException(message.drop(1)) + logger.error("Error from select protocol: $message") + logger.debug("Original query: $uri") + throw e + } + // schema + "2" -> result.setSchema(message.drop(1)) + // all done + "3" -> { + executor.submit { result.endStream() } + executor.shutdown() + } + } + } catch (t: Throwable) { + t.printStackTrace() + result.setError(t) + executor.submit { result.endStream() } + executor.shutdown() + } } - } - } catch (t: Throwable) { - t.printStackTrace() - result.setError(t) - executor.submit { result.endStream() } - executor.shutdown() } - } + + attemptAuthenticatedWithRetry(endpoint, uri) + return result } - attemptAuthenticatedWithRetry(endpoint, uri) - return result - } + fun prepareStatement(sql: String): PreparedInsertResponse { + val requestFn: (String) -> HttpUriRequest = { + val escapedSql = escape(sql) + val endpoint = "$it/api/jdbc/insert/prepared?sql=$escapedSql" + logger.debug("Executing query $endpoint") + jsonGet(endpoint) + } - fun prepareStatement(sql: String): PreparedInsertResponse { - val requestFn: (String) -> HttpUriRequest = { - val escapedSql = escape(sql) - val endpoint = "$it/api/jdbc/insert/prepared?sql=$escapedSql" - logger.debug("Executing query $endpoint") - jsonGet(endpoint) - } + val responseFn: (HttpResponse) -> PreparedInsertResponse = { + val entity = it.entity.content.bufferedReader().use { it.readText() } + logger.debug("Prepare response $entity") + JacksonSupport.fromJson(entity) + } - val responseFn: (HttpResponse) -> PreparedInsertResponse = { - val entity = it.entity.content.bufferedReader().use { it.readText() } - logger.debug("Prepare response $entity") - JacksonSupport.fromJson(entity) + return attemptAuthenticated(requestFn, responseFn) } - return attemptAuthenticated(requestFn, responseFn) - } + fun messages(sql: String): List { + + val requestFn: (String) -> HttpUriRequest = { + val escapedSql = escape(sql) + val endpoint = "$it/api/sql/data?sql=$escapedSql" + logger.debug("Executing query $endpoint") + jsonGet(endpoint) + } - fun messages(sql: String): List { + val responseFn: (HttpResponse) -> List = { + JacksonSupport.fromJson(it.entity.content) + } - val requestFn: (String) -> HttpUriRequest = { - val escapedSql = escape(sql) - val endpoint = "$it/api/sql/data?sql=$escapedSql" - logger.debug("Executing query $endpoint") - jsonGet(endpoint) + return attemptAuthenticated(requestFn, responseFn) } - val responseFn: (HttpResponse) -> List = { - JacksonSupport.fromJson(it.entity.content) + // returns true if the connection is still valid, it can do this by attempting to reauth + fun isValid(): Boolean { + token = authenticate() + return true } - return attemptAuthenticated(requestFn, responseFn) - } - - // returns true if the connection is still valid, it can do this by attempting to reauth - fun isValid(): Boolean { - token = authenticate() - return true - } + companion object RestClient { - companion object RestClient { + fun jsonEntity(t: T): HttpEntity { + val entity = StringEntity(JacksonSupport.toJson(t)) + entity.setContentType("application/json") + return entity + } - fun jsonEntity(t: T): HttpEntity { - val entity = StringEntity(JacksonSupport.toJson(t)) - entity.setContentType("application/json") - return entity - } + fun stringEntity(string: String): HttpEntity { + return StringEntity(string) + } - fun stringEntity(string: String): HttpEntity { - return StringEntity(string) - } + fun jsonGet(endpoint: String): HttpGet { + return HttpGet(endpoint).apply { + this.setHeader("Accept", "application/json") + } + } - fun jsonGet(endpoint: String): HttpGet { - return HttpGet(endpoint).apply { - this.setHeader("Accept", "application/json") - } - } + fun plainTextPost(endpoint: String, entity: HttpEntity): HttpPost { + return HttpPost(endpoint).apply { + this.entity = entity + this.setHeader("Content-type", "text/plain") + } + } - fun plainTextPost(endpoint: String, entity: HttpEntity): HttpPost { - return HttpPost(endpoint).apply { - this.entity = entity - this.setHeader("Content-type", "text/plain") - } - } + fun jsonPost(endpoint: String, entity: HttpEntity): HttpPost { + return HttpPost(endpoint).apply { + this.entity = entity + this.setHeader("Accept", "application/json") + this.setHeader("Content-type", "application/json") + } + } - fun jsonPost(endpoint: String, entity: HttpEntity): HttpPost { - return HttpPost(endpoint).apply { - this.entity = entity - this.setHeader("Accept", "application/json") - this.setHeader("Content-type", "application/json") - } + fun jsonPostWithTextPlain(endpoint: String, entity: HttpEntity): HttpPost { + return HttpPost(endpoint).apply { + this.entity = entity + this.setHeader("Accept", "text/plain") + this.setHeader("Content-type", "application/json") + } + } } - } } \ No newline at end of file diff --git a/src/test/kotlin/com/landoop/jdbc4/BatchInsertTest.kt b/src/test/kotlin/com/landoop/jdbc4/BatchInsertTest.kt index 830a655..5147d28 100644 --- a/src/test/kotlin/com/landoop/jdbc4/BatchInsertTest.kt +++ b/src/test/kotlin/com/landoop/jdbc4/BatchInsertTest.kt @@ -17,7 +17,7 @@ class BatchInsertTest : WordSpec(), CCData { val batchSize = 20 val values = Array(batchSize, { _ -> generateCC() }) - val sql = "INSERT INTO cc_data (customerFirstName, number, currency, customerLastName, country, blocked) values (?,?,?,?,?,?)" + val sql = "SET _ktype='STRING'; SET _vtype='AVRO';INSERT INTO cc_data (customerFirstName, number, currency, customerLastName, country, blocked) values (?,?,?,?,?,?)" val stmt = conn.prepareStatement(sql) for (value in values) { diff --git a/src/test/kotlin/com/landoop/jdbc4/BatchNestedInsertStressTest.kt b/src/test/kotlin/com/landoop/jdbc4/BatchNestedInsertStressTest.kt index f66c7f5..ded2869 100644 --- a/src/test/kotlin/com/landoop/jdbc4/BatchNestedInsertStressTest.kt +++ b/src/test/kotlin/com/landoop/jdbc4/BatchNestedInsertStressTest.kt @@ -27,7 +27,7 @@ class BatchNestedInsertStressTest : WordSpec(), LocationData { createTopic(topic) - val sql = "SET _ktype='STRING';INSERT INTO `$topic` (id, address.street, address.number, address.zip,address.state, geo.lat, geo.lon) values (?,?,?,?,?,?,?)" + val sql = "SET _ktype='STRING'; SET _vtype='AVRO';INSERT INTO `$topic` (id, address.street, address.number, address.zip,address.state, geo.lat, geo.lon) values (?,?,?,?,?,?,?)" val stmt = conn.prepareStatement(sql) locations.asList().chunked(batchSize).forEach { batch -> diff --git a/src/test/kotlin/com/landoop/jdbc4/PreparedInsertTest.kt b/src/test/kotlin/com/landoop/jdbc4/PreparedInsertTest.kt index cedcac6..c6bcdab 100644 --- a/src/test/kotlin/com/landoop/jdbc4/PreparedInsertTest.kt +++ b/src/test/kotlin/com/landoop/jdbc4/PreparedInsertTest.kt @@ -18,7 +18,7 @@ class PreparedInsertTest : WordSpec(), MovieData { "JDBC Driver" should { "support prepared statements" { - val sql = "INSERT INTO cc_data (customerFirstName, number, currency, customerLastName, country, blocked) values (?,?,?,?,?,?)" + val sql = "SET _ktype='STRING';INSERT INTO cc_data (customerFirstName, number, currency, customerLastName, country, blocked) values (?,?,?,?,?,?)" val stmt = conn.prepareStatement(sql) stmt.setString(1, "sammy") stmt.setString(2, "4191005000501123") diff --git a/src/test/kotlin/com/landoop/jdbc4/RestClientTest.kt b/src/test/kotlin/com/landoop/jdbc4/RestClientTest.kt index 57efe67..571d550 100644 --- a/src/test/kotlin/com/landoop/jdbc4/RestClientTest.kt +++ b/src/test/kotlin/com/landoop/jdbc4/RestClientTest.kt @@ -14,7 +14,7 @@ class RestClientTest : WordSpec() { class LoginServer : NanoHTTPD(61864) { override fun serve(session: IHTTPSession): Response { - return newFixedLengthResponse("""{"success":true, "token": "wibble"}""".trimIndent()) + return newFixedLengthResponse("""wibble""".trimIndent()) } } diff --git a/src/test/kotlin/com/landoop/jdbc4/ScaleQueryTest.kt b/src/test/kotlin/com/landoop/jdbc4/ScaleQueryTest.kt index 2048d8d..3cbdff1 100644 --- a/src/test/kotlin/com/landoop/jdbc4/ScaleQueryTest.kt +++ b/src/test/kotlin/com/landoop/jdbc4/ScaleQueryTest.kt @@ -36,7 +36,7 @@ class ScaleQueryTest : WordSpec(), ProducerSetup { LsqlDriver() populateEquities() - val q = "SELECT * FROM $topic" + val q = "SELECT * FROM $topic WHERE _ktype=STRING AND _vtype=AVRO" val conn = DriverManager.getConnection("jdbc:lsql:kafka:http://localhost:3030", "admin", "admin") val stmt = conn.createStatement() diff --git a/src/test/kotlin/com/landoop/jdbc4/SingleFieldSchemaQueryTest.kt b/src/test/kotlin/com/landoop/jdbc4/SingleFieldSchemaQueryTest.kt index 4148803..13b3031 100644 --- a/src/test/kotlin/com/landoop/jdbc4/SingleFieldSchemaQueryTest.kt +++ b/src/test/kotlin/com/landoop/jdbc4/SingleFieldSchemaQueryTest.kt @@ -38,21 +38,21 @@ class SingleFieldSchemaQueryTest : WordSpec(), ProducerSetup { "JDBC Driver" should { "support wildcard for fixed schemas" { - val q = "SELECT * FROM $topic" + val q = "SELECT * FROM $topic WHERE _ktype=STRING and _vtype=AVRO" val stmt = conn().createStatement() val rs = stmt.executeQuery(q) rs.metaData.columnCount shouldBe 1 rs.metaData.getColumnLabel(1) shouldBe "name" } "support projection for fixed schemas" { - val q = "SELECT name FROM $topic" + val q = "SELECT name FROM $topic WHERE _ktype=STRING and _vtype=AVRO" val stmt = conn().createStatement() val rs = stmt.executeQuery(q) rs.metaData.columnCount shouldBe 1 rs.metaData.getColumnLabel(1) shouldBe "name" } "return data for fixed schema" { - val q = "SELECT * FROM $topic" + val q = "SELECT * FROM $topic WHERE _ktype=STRING and _vtype=AVRO" val stmt = conn().createStatement() stmt.execute(q) shouldBe true val rs = stmt.resultSet From bf2d9fc281606cace762bbbb0cb3bb40c7c085b1 Mon Sep 17 00:00:00 2001 From: stefan bocutiu Date: Thu, 9 Aug 2018 09:53:47 +0100 Subject: [PATCH 2/3] add back the option to collect all jars --- build.gradle | 2 -- 1 file changed, 2 deletions(-) diff --git a/build.gradle b/build.gradle index b0e2515..acb455c 100644 --- a/build.gradle +++ b/build.gradle @@ -140,14 +140,12 @@ task packageSources(type: Jar, dependsOn: 'classes') { classifier = 'sources' } -/* task copyToLib(type: Copy) { into "$buildDir/libs" from configurations.runtime } build.dependsOn(copyToLib) -*/ artifacts { archives jar From 82c68789f378ce054e33ae56ff9d869a0869323e Mon Sep 17 00:00:00 2001 From: stefan bocutiu Date: Thu, 9 Aug 2018 09:57:06 +0100 Subject: [PATCH 3/3] use 2 tabs not 4 in formatting --- .../com/landoop/jdbc4/client/RestClient.kt | 616 +++++++++--------- 1 file changed, 308 insertions(+), 308 deletions(-) diff --git a/src/main/kotlin/com/landoop/jdbc4/client/RestClient.kt b/src/main/kotlin/com/landoop/jdbc4/client/RestClient.kt index c92023f..f2fba37 100644 --- a/src/main/kotlin/com/landoop/jdbc4/client/RestClient.kt +++ b/src/main/kotlin/com/landoop/jdbc4/client/RestClient.kt @@ -31,365 +31,365 @@ class RestClient(private val urls: List, private val weakSSL: Boolean // if set to true then will allow self signed certificates ) : AutoCloseable { - private val client = ClientManager.createClient().apply { - this.properties[ClientProperties.REDIRECT_ENABLED] = true - } - private val logger = LoggerFactory.getLogger(RestClient::class.java) - private val timeout = 60_000 - - private val defaultRequestConfig = RequestConfig.custom() - .setConnectTimeout(timeout) - .setConnectionRequestTimeout(timeout) - .setSocketTimeout(timeout) - .build() - - private val sslContext = SSLContextBuilder.create() - .loadTrustMaterial(TrustSelfSignedStrategy()) - .build() - - private val allowAllHosts = NoopHostnameVerifier() - - private val connectionFactory = SSLConnectionSocketFactory(sslContext, allowAllHosts) - - private val httpClient = HttpClientBuilder.create().let { - it.setKeepAliveStrategy(DefaultKeepAlive) - it.setDefaultRequestConfig(defaultRequestConfig) - if (weakSSL) - it.setSSLSocketFactory(connectionFactory) - it.build() - } - - // the token received the last time we attempted to authenticate - internal var token: String = authenticate() - - // some connection pools, eg org.apache.commons.dbcp, will check that the connection is open - // before they hand it over to be used. Since a rest client is always stateless, we can just - // return true here as there's nothing to close. - var isClosed: Boolean = false - private set - - override fun close() { - httpClient.close() - isClosed = true - } - - fun connectTimeout(): Int = defaultRequestConfig.connectTimeout - fun connectionRequestTimeout(): Int = defaultRequestConfig.connectionRequestTimeout - fun socketTimeout(): Int = defaultRequestConfig.socketTimeout - - // attempt a given request for each url until one is successful, or all have been exhausted - // a 401 or 403 will result in a short circuit exit - // an IOException, or an unsupported http status code will result in trying the next url - // once all urls are exhausted, the last exception will be thrown - private fun attempt(reqFn: (String) -> HttpUriRequest, respFn: (HttpResponse) -> T): T { - var lastException: Throwable? = null - for (url in urls) { - lastException = try { - val req = reqFn(url) - val resp = httpClient.execute(req) - logger.debug("Response $resp") - when (resp.statusLine.statusCode) { - 200, 201, 202 -> return respFn(resp) - 401, 403 -> throw AuthenticationException("Invalid credentials for user '${credentials.user}'") - else -> { - val body = resp.entity.content.bufferedReader().use { it.readText() } - throw SQLException("url=$url, req=$req, ${resp.statusLine.statusCode} ${resp.statusLine.reasonPhrase}: $body") - } - } - } catch (e: SQLException) { - e - } catch (e: IOException) { - e - } + private val client = ClientManager.createClient().apply { + this.properties[ClientProperties.REDIRECT_ENABLED] = true + } + private val logger = LoggerFactory.getLogger(RestClient::class.java) + private val timeout = 60_000 + + private val defaultRequestConfig = RequestConfig.custom() + .setConnectTimeout(timeout) + .setConnectionRequestTimeout(timeout) + .setSocketTimeout(timeout) + .build() + + private val sslContext = SSLContextBuilder.create() + .loadTrustMaterial(TrustSelfSignedStrategy()) + .build() + + private val allowAllHosts = NoopHostnameVerifier() + + private val connectionFactory = SSLConnectionSocketFactory(sslContext, allowAllHosts) + + private val httpClient = HttpClientBuilder.create().let { + it.setKeepAliveStrategy(DefaultKeepAlive) + it.setDefaultRequestConfig(defaultRequestConfig) + if (weakSSL) + it.setSSLSocketFactory(connectionFactory) + it.build() + } + + // the token received the last time we attempted to authenticate + internal var token: String = authenticate() + + // some connection pools, eg org.apache.commons.dbcp, will check that the connection is open + // before they hand it over to be used. Since a rest client is always stateless, we can just + // return true here as there's nothing to close. + var isClosed: Boolean = false + private set + + override fun close() { + httpClient.close() + isClosed = true + } + + fun connectTimeout(): Int = defaultRequestConfig.connectTimeout + fun connectionRequestTimeout(): Int = defaultRequestConfig.connectionRequestTimeout + fun socketTimeout(): Int = defaultRequestConfig.socketTimeout + + // attempt a given request for each url until one is successful, or all have been exhausted + // a 401 or 403 will result in a short circuit exit + // an IOException, or an unsupported http status code will result in trying the next url + // once all urls are exhausted, the last exception will be thrown + private fun attempt(reqFn: (String) -> HttpUriRequest, respFn: (HttpResponse) -> T): T { + var lastException: Throwable? = null + for (url in urls) { + lastException = try { + val req = reqFn(url) + val resp = httpClient.execute(req) + logger.debug("Response $resp") + when (resp.statusLine.statusCode) { + 200, 201, 202 -> return respFn(resp) + 401, 403 -> throw AuthenticationException("Invalid credentials for user '${credentials.user}'") + else -> { + val body = resp.entity.content.bufferedReader().use { it.readText() } + throw SQLException("url=$url, req=$req, ${resp.statusLine.statusCode} ${resp.statusLine.reasonPhrase}: $body") + } } - throw lastException!! + } catch (e: SQLException) { + e + } catch (e: IOException) { + e + } } - - // attempts the given request with authentication by adding the current token as a header - private fun attemptAuthenticated(reqFn: (String) -> HttpUriRequest, respFn: (HttpResponse) -> T): T { - val reqWithTokenHeaderFn: (String) -> HttpUriRequest = { - reqFn(it).apply { - addHeader(Constants.LensesTokenHeader, token) - } - } - return attempt(reqWithTokenHeaderFn, respFn) + throw lastException!! + } + + // attempts the given request with authentication by adding the current token as a header + private fun attemptAuthenticated(reqFn: (String) -> HttpUriRequest, respFn: (HttpResponse) -> T): T { + val reqWithTokenHeaderFn: (String) -> HttpUriRequest = { + reqFn(it).apply { + addHeader(Constants.LensesTokenHeader, token) + } } - - // attempts the given request with authentication - // if an authentication error is received, then it will attempt to - // re-authenticate before retrying again - // if auth is still invalid then it will give up - private fun attemptAuthenticatedWithRetry(reqFn: (String) -> HttpUriRequest, respFn: (HttpResponse) -> T): T { - return try { - attemptAuthenticated(reqFn, respFn) - } catch (e: AuthenticationException) { - token = authenticate() - attemptAuthenticated(reqFn, respFn) - } + return attempt(reqWithTokenHeaderFn, respFn) + } + + // attempts the given request with authentication + // if an authentication error is received, then it will attempt to + // re-authenticate before retrying again + // if auth is still invalid then it will give up + private fun attemptAuthenticatedWithRetry(reqFn: (String) -> HttpUriRequest, respFn: (HttpResponse) -> T): T { + return try { + attemptAuthenticated(reqFn, respFn) + } catch (e: AuthenticationException) { + token = authenticate() + attemptAuthenticated(reqFn, respFn) } - - private fun attemptAuthenticatedWithRetry(endpoint: Endpoint, uri: URI) { - return try { - attemptAuthenticated(endpoint, uri) - } catch (e: Exception) { - token = authenticate() - attemptAuthenticated(endpoint, uri) - } + } + + private fun attemptAuthenticatedWithRetry(endpoint: Endpoint, uri: URI) { + return try { + attemptAuthenticated(endpoint, uri) + } catch (e: Exception) { + token = authenticate() + attemptAuthenticated(endpoint, uri) } + } - private fun attemptAuthenticated(endpoint: Endpoint, uri: URI) { - - val configurator = object : ClientEndpointConfig.Configurator() { - override fun beforeRequest(headers: MutableMap>) { - headers[Constants.LensesTokenHeader] = mutableListOf(token) - } - } + private fun attemptAuthenticated(endpoint: Endpoint, uri: URI) { - val config: ClientEndpointConfig = ClientEndpointConfig.Builder.create().configurator(configurator).build() - client.connectToServer(endpoint, config, uri) + val configurator = object : ClientEndpointConfig.Configurator() { + override fun beforeRequest(headers: MutableMap>) { + headers[Constants.LensesTokenHeader] = mutableListOf(token) + } } - // attempts to authenticate, and returns the token if successful - private fun authenticate(): String { + val config: ClientEndpointConfig = ClientEndpointConfig.Builder.create().configurator(configurator).build() + client.connectToServer(endpoint, config, uri) + } - val requestFn: (String) -> HttpUriRequest = { - val entity = jsonEntity(credentials) - val endpoint = "$it/api/login" - logger.debug("Authenticating at $endpoint") - jsonPostWithTextPlain(endpoint, entity) - } - - val responseFn: (HttpResponse) -> String = { - val token = EntityUtils.toString(it.entity) - logger.debug("Authentication token: $token") - token - } + // attempts to authenticate, and returns the token if successful + private fun authenticate(): String { - return attempt(requestFn, responseFn) + val requestFn: (String) -> HttpUriRequest = { + val entity = jsonEntity(credentials) + val endpoint = "$it/api/login" + logger.debug("Authenticating at $endpoint") + jsonPostWithTextPlain(endpoint, entity) } - fun topic(topicName: String): Topic { - - val requestFn: (String) -> HttpUriRequest = { - val endpoint = "$it/api/topics/$topicName" - logger.debug("Fetching topic @ $endpoint") - jsonGet(endpoint) - } - - // once we get 200 - val responseFn: (HttpResponse) -> Topic = { - logger.debug("Topic json") - val str = it.entity.content.bufferedReader().use { it.readText() } - logger.debug(str) - JacksonSupport.fromJson(str) - } - - return attemptAuthenticated(requestFn, responseFn) + val responseFn: (HttpResponse) -> String = { + val token = EntityUtils.toString(it.entity) + logger.debug("Authentication token: $token") + token } - fun tables(): Array
{ + return attempt(requestFn, responseFn) + } - val requestFn: (String) -> HttpUriRequest = { - val endpoint = "$it/api/jdbc/metadata/table" - logger.debug("Fetching topics @ $endpoint") - jsonGet(endpoint) - } - - val responseFn: (HttpResponse) -> Array
= { - val str = it.entity.content.bufferedReader().use { it.readText() } - JacksonSupport.fromJson(str) - } + fun topic(topicName: String): Topic { - return attemptAuthenticated(requestFn, responseFn) + val requestFn: (String) -> HttpUriRequest = { + val endpoint = "$it/api/topics/$topicName" + logger.debug("Fetching topic @ $endpoint") + jsonGet(endpoint) } - private fun escape(url: String): String { - //replace \n with ' ' - //captures SELECT/INSERT statements - return URLEncoder.encode(url.trim().replace(System.lineSeparator(), " "), "UTF-8").replace("%20", "+") + // once we get 200 + val responseFn: (HttpResponse) -> Topic = { + logger.debug("Topic json") + val str = it.entity.content.bufferedReader().use { it.readText() } + logger.debug(str) + JacksonSupport.fromJson(str) } - fun insert(sql: String): InsertResponse { - val requestFn: (String) -> HttpUriRequest = { - val endpoint = "$it/api/jdbc/insert" - val entity = stringEntity(sql) - logger.debug("Executing query $endpoint") - logger.debug(sql) - plainTextPost(endpoint, entity) - } + return attemptAuthenticated(requestFn, responseFn) + } - val responseFn: (HttpResponse) -> InsertResponse = { - InsertResponse(it.statusLine.statusCode.toString()) - /*val json = it.entity.content.bufferedReader().use { it.readText() } - try{ - JacksonSupport.fromJson(json) - }catch (t:Throwable){ - InsertResponse(json) - }*/ - } + fun tables(): Array
{ - return attemptAuthenticatedWithRetry(requestFn, responseFn) + val requestFn: (String) -> HttpUriRequest = { + val endpoint = "$it/api/jdbc/metadata/table" + logger.debug("Fetching topics @ $endpoint") + jsonGet(endpoint) } - /** - * Executes a prepared insert statement. - * - * @param topic the topic to run the insert again - * @param records the insert variables for each row - */ - fun executePreparedInsert(topic: String, keyType: String, valueType: String, records: List): Any { - - val requestFn: (String) -> HttpUriRequest = { - val endpoint = "$it/api/jdbc/insert/prepared/$topic?kt=$keyType&vt=$valueType" - val entity = jsonEntity(records) - jsonPost(endpoint, entity) - } - - // at the moment the response just returns ok or an error status - // in the case of receiving an ok (201) there's not much to do but return true - val responseFn: (HttpResponse) -> Boolean = { - val entity = it.entity.content.bufferedReader().use { it.readText() } - logger.debug("Prepared insert response $entity") - true - } - - return attemptAuthenticatedWithRetry(requestFn, responseFn) + val responseFn: (HttpResponse) -> Array
= { + val str = it.entity.content.bufferedReader().use { it.readText() } + JacksonSupport.fromJson(str) } - fun select(sql: String): StreamingSelectResult { - logger.debug("Executing query $sql") - - // hacky fix for spark - val r = "SELECT.*?FROM\\s+SELECT".toRegex() - val normalizedSql = sql.replaceFirst(r, "SELECT") + return attemptAuthenticated(requestFn, responseFn) + } + + private fun escape(url: String): String { + //replace \n with ' ' + //captures SELECT/INSERT statements + return URLEncoder.encode(url.trim().replace(System.lineSeparator(), " "), "UTF-8").replace("%20", "+") + } + + fun insert(sql: String): InsertResponse { + val requestFn: (String) -> HttpUriRequest = { + val endpoint = "$it/api/jdbc/insert" + val entity = stringEntity(sql) + logger.debug("Executing query $endpoint") + logger.debug(sql) + plainTextPost(endpoint, entity) + } - logger.debug("Normalized query $normalizedSql") + val responseFn: (HttpResponse) -> InsertResponse = { + InsertResponse(it.statusLine.statusCode.toString()) + /*val json = it.entity.content.bufferedReader().use { it.readText() } + try{ + JacksonSupport.fromJson(json) + }catch (t:Throwable){ + InsertResponse(json) + }*/ + } - val escapedSql = escape(normalizedSql) - val url = "${urls[0]}/api/ws/jdbc/data?sql=$escapedSql" - val uri = URI.create(url.replace("https://", "ws://").replace("http://", "ws://")) + return attemptAuthenticatedWithRetry(requestFn, responseFn) + } + + /** + * Executes a prepared insert statement. + * + * @param topic the topic to run the insert again + * @param records the insert variables for each row + */ + fun executePreparedInsert(topic: String, keyType: String, valueType: String, records: List): Any { + + val requestFn: (String) -> HttpUriRequest = { + val endpoint = "$it/api/jdbc/insert/prepared/$topic?kt=$keyType&vt=$valueType" + val entity = jsonEntity(records) + jsonPost(endpoint, entity) + } - val executor = Executors.newSingleThreadExecutor() - val result = StreamingSelectResult() - val endpoint = object : Endpoint() { + // at the moment the response just returns ok or an error status + // in the case of receiving an ok (201) there's not much to do but return true + val responseFn: (HttpResponse) -> Boolean = { + val entity = it.entity.content.bufferedReader().use { it.readText() } + logger.debug("Prepared insert response $entity") + true + } - override fun onOpen(session: Session, config: EndpointConfig) { - session.addMessageHandler(MessageHandler.Whole { - executor.submit(messageHandler(it)) - }) + return attemptAuthenticatedWithRetry(requestFn, responseFn) + } + + fun select(sql: String): StreamingSelectResult { + logger.debug("Executing query $sql") + + // hacky fix for spark + val r = "SELECT.*?FROM\\s+SELECT".toRegex() + val normalizedSql = sql.replaceFirst(r, "SELECT") + + logger.debug("Normalized query $normalizedSql") + + val escapedSql = escape(normalizedSql) + val url = "${urls[0]}/api/ws/jdbc/data?sql=$escapedSql" + val uri = URI.create(url.replace("https://", "ws://").replace("http://", "ws://")) + + val executor = Executors.newSingleThreadExecutor() + val result = StreamingSelectResult() + val endpoint = object : Endpoint() { + + override fun onOpen(session: Session, config: EndpointConfig) { + session.addMessageHandler(MessageHandler.Whole { + executor.submit(messageHandler(it)) + }) + } + + fun messageHandler(message: String): Runnable = Runnable { + try { + when (message.take(1)) { + // records + "0" -> result.addRecord(message.drop(1)) + // error case + "1" -> { + val e = SQLException(message.drop(1)) + logger.error("Error from select protocol: $message") + logger.debug("Original query: $uri") + throw e } - - fun messageHandler(message: String): Runnable = Runnable { - try { - when (message.take(1)) { - // records - "0" -> result.addRecord(message.drop(1)) - // error case - "1" -> { - val e = SQLException(message.drop(1)) - logger.error("Error from select protocol: $message") - logger.debug("Original query: $uri") - throw e - } - // schema - "2" -> result.setSchema(message.drop(1)) - // all done - "3" -> { - executor.submit { result.endStream() } - executor.shutdown() - } - } - } catch (t: Throwable) { - t.printStackTrace() - result.setError(t) - executor.submit { result.endStream() } - executor.shutdown() - } + // schema + "2" -> result.setSchema(message.drop(1)) + // all done + "3" -> { + executor.submit { result.endStream() } + executor.shutdown() } + } + } catch (t: Throwable) { + t.printStackTrace() + result.setError(t) + executor.submit { result.endStream() } + executor.shutdown() } - - attemptAuthenticatedWithRetry(endpoint, uri) - return result + } } - fun prepareStatement(sql: String): PreparedInsertResponse { - val requestFn: (String) -> HttpUriRequest = { - val escapedSql = escape(sql) - val endpoint = "$it/api/jdbc/insert/prepared?sql=$escapedSql" - logger.debug("Executing query $endpoint") - jsonGet(endpoint) - } + attemptAuthenticatedWithRetry(endpoint, uri) + return result + } - val responseFn: (HttpResponse) -> PreparedInsertResponse = { - val entity = it.entity.content.bufferedReader().use { it.readText() } - logger.debug("Prepare response $entity") - JacksonSupport.fromJson(entity) - } - - return attemptAuthenticated(requestFn, responseFn) + fun prepareStatement(sql: String): PreparedInsertResponse { + val requestFn: (String) -> HttpUriRequest = { + val escapedSql = escape(sql) + val endpoint = "$it/api/jdbc/insert/prepared?sql=$escapedSql" + logger.debug("Executing query $endpoint") + jsonGet(endpoint) } - fun messages(sql: String): List { + val responseFn: (HttpResponse) -> PreparedInsertResponse = { + val entity = it.entity.content.bufferedReader().use { it.readText() } + logger.debug("Prepare response $entity") + JacksonSupport.fromJson(entity) + } - val requestFn: (String) -> HttpUriRequest = { - val escapedSql = escape(sql) - val endpoint = "$it/api/sql/data?sql=$escapedSql" - logger.debug("Executing query $endpoint") - jsonGet(endpoint) - } + return attemptAuthenticated(requestFn, responseFn) + } - val responseFn: (HttpResponse) -> List = { - JacksonSupport.fromJson(it.entity.content) - } + fun messages(sql: String): List { - return attemptAuthenticated(requestFn, responseFn) + val requestFn: (String) -> HttpUriRequest = { + val escapedSql = escape(sql) + val endpoint = "$it/api/sql/data?sql=$escapedSql" + logger.debug("Executing query $endpoint") + jsonGet(endpoint) } - // returns true if the connection is still valid, it can do this by attempting to reauth - fun isValid(): Boolean { - token = authenticate() - return true + val responseFn: (HttpResponse) -> List = { + JacksonSupport.fromJson(it.entity.content) } - companion object RestClient { + return attemptAuthenticated(requestFn, responseFn) + } - fun jsonEntity(t: T): HttpEntity { - val entity = StringEntity(JacksonSupport.toJson(t)) - entity.setContentType("application/json") - return entity - } + // returns true if the connection is still valid, it can do this by attempting to reauth + fun isValid(): Boolean { + token = authenticate() + return true + } - fun stringEntity(string: String): HttpEntity { - return StringEntity(string) - } + companion object RestClient { - fun jsonGet(endpoint: String): HttpGet { - return HttpGet(endpoint).apply { - this.setHeader("Accept", "application/json") - } - } + fun jsonEntity(t: T): HttpEntity { + val entity = StringEntity(JacksonSupport.toJson(t)) + entity.setContentType("application/json") + return entity + } - fun plainTextPost(endpoint: String, entity: HttpEntity): HttpPost { - return HttpPost(endpoint).apply { - this.entity = entity - this.setHeader("Content-type", "text/plain") - } - } + fun stringEntity(string: String): HttpEntity { + return StringEntity(string) + } - fun jsonPost(endpoint: String, entity: HttpEntity): HttpPost { - return HttpPost(endpoint).apply { - this.entity = entity - this.setHeader("Accept", "application/json") - this.setHeader("Content-type", "application/json") - } - } + fun jsonGet(endpoint: String): HttpGet { + return HttpGet(endpoint).apply { + this.setHeader("Accept", "application/json") + } + } - fun jsonPostWithTextPlain(endpoint: String, entity: HttpEntity): HttpPost { - return HttpPost(endpoint).apply { - this.entity = entity - this.setHeader("Accept", "text/plain") - this.setHeader("Content-type", "application/json") - } - } + fun plainTextPost(endpoint: String, entity: HttpEntity): HttpPost { + return HttpPost(endpoint).apply { + this.entity = entity + this.setHeader("Content-type", "text/plain") + } + } + + fun jsonPost(endpoint: String, entity: HttpEntity): HttpPost { + return HttpPost(endpoint).apply { + this.entity = entity + this.setHeader("Accept", "application/json") + this.setHeader("Content-type", "application/json") + } + } + + fun jsonPostWithTextPlain(endpoint: String, entity: HttpEntity): HttpPost { + return HttpPost(endpoint).apply { + this.entity = entity + this.setHeader("Accept", "text/plain") + this.setHeader("Content-type", "application/json") + } } + } } \ No newline at end of file