From 08fd3ca7d37bca5fd79e9f20b4c950d73097b8a1 Mon Sep 17 00:00:00 2001 From: "sergey.sitnikov" Date: Thu, 5 May 2022 08:35:35 +0000 Subject: [PATCH 1/3] [TS-981] Update project version and return backward compatibility --- README.md | 22 ++++++++-- gradle.properties | 2 +- .../com/exactpro/th2/conn/amqp/BoxMain.kt | 42 +++++++++++++++++-- .../th2/conn/amqp/client/AmqpClient.kt | 2 +- .../conn/amqp/configuration/Configuration.kt | 8 ++-- .../conn/amqp/connservice/ConnServiceImpl.kt | 2 +- 6 files changed, 66 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index 58bf516..6101614 100644 --- a/README.md +++ b/README.md @@ -12,8 +12,6 @@ The project contains the implementation of an AMQP connection **th2-conn-amqp**. > jks&trustStorePassword=<trustStorePassword>&keyStorePath=<keyStorePath>. > jks&keyStorePassword=<keyStorePassword>&saslMechanisms=EXTERNAL -* username = \ -* password = \ * sendQueue = \ * receiveQueue = \ @@ -50,6 +48,17 @@ spec: # factorylookup: # sendQueue: # receiveQueue: + +# Deprecated and will be deleted in the next major version. +# Instead of sessions there is a way to use parameters with sessionAlias for single connection +# But you can use only one of these ways + sessionAlias: session-alias + parameters: + # initialContextFactory: + # factorylookup: + # sendQueue: + # receiveQueue: + pins: - name: in_raw connection-type: mq @@ -60,4 +69,11 @@ spec: - name: to_send connection-type: mq attributes: ["send", "parsed", "subscribe"] -``` \ No newline at end of file +``` + +# Release notes +## 1.2.0 ++ Add possibility of multiple connections + + Add list of sessions with connection parameters + + Backward compatibility remains + diff --git a/gradle.properties b/gradle.properties index 01d8aea..8082041 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,6 +1,6 @@ kotlin.code.style=official -release_version=1.1.0 +release_version=1.2.0 kotlin_version=1.5.31 description = 'AMQP Client' diff --git a/src/main/kotlin/com/exactpro/th2/conn/amqp/BoxMain.kt b/src/main/kotlin/com/exactpro/th2/conn/amqp/BoxMain.kt index 34880d5..80f58f4 100644 --- a/src/main/kotlin/com/exactpro/th2/conn/amqp/BoxMain.kt +++ b/src/main/kotlin/com/exactpro/th2/conn/amqp/BoxMain.kt @@ -27,6 +27,7 @@ import com.exactpro.th2.common.metrics.readiness import com.exactpro.th2.common.schema.factory.CommonFactory import com.exactpro.th2.common.schema.message.MessageRouter import com.exactpro.th2.conn.amqp.configuration.Configuration +import com.exactpro.th2.conn.amqp.configuration.ConnParameters import com.exactpro.th2.conn.amqp.connservice.ConnService import com.exactpro.th2.conn.amqp.connservice.ConnServiceImpl import mu.KotlinLogging @@ -69,11 +70,46 @@ fun main(args: Array) { .build() ) + if (configuration.parameters != null) { + var reason: String? = null + if (configuration.sessionAlias == null) { + reason = "In old connection parameters version {sessionAlias} or {parameters} couldn't be null" + } else { + if (configuration.sessions != null) { + reason = "Configuration couldn't contain old and new connection parameters version. it must be {sessions} or {parameters}" + } else { + configuration.sessions = ArrayList() + configuration.sessions?.plusAssign( + ConnParameters( + sessionAlias = configuration.sessionAlias, + initialContextFactory = configuration.parameters.initialContextFactory, + factorylookup = configuration.parameters.factorylookup, + sendQueue = configuration.parameters.sendQueue, + receiveQueue = configuration.parameters.receiveQueue + ) + ) + } + } + if (reason != null) { + eventRouter.safeSend( + Event.start().endTimestamp() + .status(Event.Status.FAILED) + .type("Error") + .name(reason), + rootEvent.id + ) + throw java.lang.IllegalArgumentException(reason) + } + } + val rawRouter: MessageRouter = factory.messageRouterRawBatch val aliasToService = HashMap() val executor: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor() - configuration.sessions.forEach { connParameters -> + configuration.sessions?.forEach { connParameters -> run { + if (connParameters.sessionAlias == null) { + throw IllegalArgumentException("Connection parameter {sessionAlias} can't be blank") + } val publisher = MessagePublisher( connParameters.sessionAlias, configuration.drainIntervalMills, @@ -92,8 +128,9 @@ fun main(args: Array) { ) resources += service aliasToService[connParameters.sessionAlias] = service + service.start() } - } + }?: throw java.lang.IllegalArgumentException("Connection parameters can't be blank. Use {sessions} or {parameters + sessioAlias}") rawRouter.subscribeAll { _, rawBatch -> rawBatch.messagesList.forEach { msg -> msg.runCatching { @@ -133,7 +170,6 @@ fun main(args: Array) { } } } - aliasToService.forEach { (_, service) -> service.start() } readiness = true awaitShutdown(lock, condition) diff --git a/src/main/kotlin/com/exactpro/th2/conn/amqp/client/AmqpClient.kt b/src/main/kotlin/com/exactpro/th2/conn/amqp/client/AmqpClient.kt index ca800bf..a775a7e 100644 --- a/src/main/kotlin/com/exactpro/th2/conn/amqp/client/AmqpClient.kt +++ b/src/main/kotlin/com/exactpro/th2/conn/amqp/client/AmqpClient.kt @@ -31,7 +31,7 @@ import javax.jms.Message import javax.jms.MessageListener import javax.naming.InitialContext -typealias Config = Map +typealias Config = Map @NotThreadSafe class AmqpClient(config: Config, val errorReporter: (Throwable) -> Unit) : IClient { diff --git a/src/main/kotlin/com/exactpro/th2/conn/amqp/configuration/Configuration.kt b/src/main/kotlin/com/exactpro/th2/conn/amqp/configuration/Configuration.kt index 86ccb1f..c528a19 100644 --- a/src/main/kotlin/com/exactpro/th2/conn/amqp/configuration/Configuration.kt +++ b/src/main/kotlin/com/exactpro/th2/conn/amqp/configuration/Configuration.kt @@ -20,13 +20,15 @@ class Configuration( val enableMessageSendingEvent: Boolean = true, val drainIntervalMills: Long = 1000L, val rootEventName: String = "ConnAmqp", - val sessions: List + var sessions: MutableList?, + val sessionAlias: String?, + val parameters: ConnParameters? ) class ConnParameters( - val sessionAlias: String, + val sessionAlias: String?, val initialContextFactory: String, val factorylookup: String, val sendQueue: String, val receiveQueue: String -) +) \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/conn/amqp/connservice/ConnServiceImpl.kt b/src/main/kotlin/com/exactpro/th2/conn/amqp/connservice/ConnServiceImpl.kt index 4abb379..de8aba7 100644 --- a/src/main/kotlin/com/exactpro/th2/conn/amqp/connservice/ConnServiceImpl.kt +++ b/src/main/kotlin/com/exactpro/th2/conn/amqp/connservice/ConnServiceImpl.kt @@ -59,7 +59,7 @@ class ConnServiceImpl( client.stop() } companion object { - private fun ConnParameters.toMap(): Map = mutableMapOf( + private fun ConnParameters.toMap(): Map = mutableMapOf( Context.INITIAL_CONTEXT_FACTORY to initialContextFactory, "connectionfactory.factorylookup" to factorylookup, "queue.sendQueue" to sendQueue, From 7fd80e7b9938e0e130fb9e9265a41e86cc05f0b0 Mon Sep 17 00:00:00 2001 From: "sergey.sitnikov" Date: Thu, 5 May 2022 12:25:11 +0000 Subject: [PATCH 2/3] [TS-981] Fix configuration for backward compatibility --- .../com/exactpro/th2/conn/amqp/BoxMain.kt | 81 +++++++++++-------- .../th2/conn/amqp/client/AmqpClient.kt | 2 +- .../conn/amqp/configuration/Configuration.kt | 10 ++- .../conn/amqp/connservice/ConnServiceImpl.kt | 2 +- 4 files changed, 58 insertions(+), 37 deletions(-) diff --git a/src/main/kotlin/com/exactpro/th2/conn/amqp/BoxMain.kt b/src/main/kotlin/com/exactpro/th2/conn/amqp/BoxMain.kt index 80f58f4..0cae2d4 100644 --- a/src/main/kotlin/com/exactpro/th2/conn/amqp/BoxMain.kt +++ b/src/main/kotlin/com/exactpro/th2/conn/amqp/BoxMain.kt @@ -70,37 +70,7 @@ fun main(args: Array) { .build() ) - if (configuration.parameters != null) { - var reason: String? = null - if (configuration.sessionAlias == null) { - reason = "In old connection parameters version {sessionAlias} or {parameters} couldn't be null" - } else { - if (configuration.sessions != null) { - reason = "Configuration couldn't contain old and new connection parameters version. it must be {sessions} or {parameters}" - } else { - configuration.sessions = ArrayList() - configuration.sessions?.plusAssign( - ConnParameters( - sessionAlias = configuration.sessionAlias, - initialContextFactory = configuration.parameters.initialContextFactory, - factorylookup = configuration.parameters.factorylookup, - sendQueue = configuration.parameters.sendQueue, - receiveQueue = configuration.parameters.receiveQueue - ) - ) - } - } - if (reason != null) { - eventRouter.safeSend( - Event.start().endTimestamp() - .status(Event.Status.FAILED) - .type("Error") - .name(reason), - rootEvent.id - ) - throw java.lang.IllegalArgumentException(reason) - } - } + configureConnectionParameters(configuration, eventRouter, rootEvent) val rawRouter: MessageRouter = factory.messageRouterRawBatch val aliasToService = HashMap() @@ -130,14 +100,14 @@ fun main(args: Array) { aliasToService[connParameters.sessionAlias] = service service.start() } - }?: throw java.lang.IllegalArgumentException("Connection parameters can't be blank. Use {sessions} or {parameters + sessioAlias}") + } ?: throw java.lang.IllegalArgumentException("Connection parameters can't be blank. Use {sessions} or {parameters + sessioAlias}") rawRouter.subscribeAll { _, rawBatch -> rawBatch.messagesList.forEach { msg -> msg.runCatching { val alias = msg.metadata.id.connectionId.sessionAlias aliasToService.getOrElse( alias, - {throw IllegalArgumentException("Can't find service by alias {$alias}")} + { throw IllegalArgumentException("Can't find service by alias {$alias}") } ).send(this) }.onFailure { eventRouter.safeSend( @@ -179,6 +149,51 @@ fun main(args: Array) { } } +private fun configureConnectionParameters( + configuration: Configuration, + eventRouter: MessageRouter, + rootEvent: Event +) { + when { + configuration.parameters != null && configuration.sessionAlias == null -> { + sendError( + eventRouter, + "The connection {parameters} configuration requires {sessionAlias}. Please specify the option or use the {sessions} configuration instead", + rootEvent + ) + } + configuration.sessions != null && configuration.parameters != null -> + sendError( + eventRouter, + "Configuration can't contain both connection version. It must be {sessions} or {parameters}", + rootEvent + ) + + configuration.parameters != null && configuration.sessions == null && configuration.sessionAlias != null -> { + (configuration.sessions as ArrayList).plusAssign( + ConnParameters( + sessionAlias = configuration.sessionAlias, + initialContextFactory = configuration.parameters.initialContextFactory, + factorylookup = configuration.parameters.factorylookup, + sendQueue = configuration.parameters.sendQueue, + receiveQueue = configuration.parameters.receiveQueue + ) + ) + } + } +} + +private fun sendError(eventRouter: MessageRouter, reason: String?, rootEvent: Event) { + eventRouter.safeSend( + Event.start().endTimestamp() + .status(Event.Status.FAILED) + .type("Error") + .name(reason), + rootEvent.id + ) + throw java.lang.IllegalArgumentException(reason) +} + private fun MessageRouter.safeSend(event: Event, parentId: String?) { runCatching { send(EventBatch.newBuilder().addEvents(event.toProtoEvent(parentId)).build()) } .onFailure { LOGGER.error(it) { "Cannot send event with id ${event.id}" } } diff --git a/src/main/kotlin/com/exactpro/th2/conn/amqp/client/AmqpClient.kt b/src/main/kotlin/com/exactpro/th2/conn/amqp/client/AmqpClient.kt index a775a7e..ca800bf 100644 --- a/src/main/kotlin/com/exactpro/th2/conn/amqp/client/AmqpClient.kt +++ b/src/main/kotlin/com/exactpro/th2/conn/amqp/client/AmqpClient.kt @@ -31,7 +31,7 @@ import javax.jms.Message import javax.jms.MessageListener import javax.naming.InitialContext -typealias Config = Map +typealias Config = Map @NotThreadSafe class AmqpClient(config: Config, val errorReporter: (Throwable) -> Unit) : IClient { diff --git a/src/main/kotlin/com/exactpro/th2/conn/amqp/configuration/Configuration.kt b/src/main/kotlin/com/exactpro/th2/conn/amqp/configuration/Configuration.kt index c528a19..4758f50 100644 --- a/src/main/kotlin/com/exactpro/th2/conn/amqp/configuration/Configuration.kt +++ b/src/main/kotlin/com/exactpro/th2/conn/amqp/configuration/Configuration.kt @@ -22,11 +22,17 @@ class Configuration( val rootEventName: String = "ConnAmqp", var sessions: MutableList?, val sessionAlias: String?, - val parameters: ConnParameters? + val parameters: SingleConnectionConnParameters? ) class ConnParameters( - val sessionAlias: String?, + val sessionAlias: String, + val initialContextFactory: String, + val factorylookup: String, + val sendQueue: String, + val receiveQueue: String +) +class SingleConnectionConnParameters( val initialContextFactory: String, val factorylookup: String, val sendQueue: String, diff --git a/src/main/kotlin/com/exactpro/th2/conn/amqp/connservice/ConnServiceImpl.kt b/src/main/kotlin/com/exactpro/th2/conn/amqp/connservice/ConnServiceImpl.kt index de8aba7..4abb379 100644 --- a/src/main/kotlin/com/exactpro/th2/conn/amqp/connservice/ConnServiceImpl.kt +++ b/src/main/kotlin/com/exactpro/th2/conn/amqp/connservice/ConnServiceImpl.kt @@ -59,7 +59,7 @@ class ConnServiceImpl( client.stop() } companion object { - private fun ConnParameters.toMap(): Map = mutableMapOf( + private fun ConnParameters.toMap(): Map = mutableMapOf( Context.INITIAL_CONTEXT_FACTORY to initialContextFactory, "connectionfactory.factorylookup" to factorylookup, "queue.sendQueue" to sendQueue, From fcce9b958a576ea64c9bd0fe0d7b12b7ba095349 Mon Sep 17 00:00:00 2001 From: "sergey.sitnikov" Date: Thu, 5 May 2022 13:40:00 +0000 Subject: [PATCH 3/3] [TS-981] Update exception usage --- src/main/kotlin/com/exactpro/th2/conn/amqp/BoxMain.kt | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/src/main/kotlin/com/exactpro/th2/conn/amqp/BoxMain.kt b/src/main/kotlin/com/exactpro/th2/conn/amqp/BoxMain.kt index 0cae2d4..af638b7 100644 --- a/src/main/kotlin/com/exactpro/th2/conn/amqp/BoxMain.kt +++ b/src/main/kotlin/com/exactpro/th2/conn/amqp/BoxMain.kt @@ -77,9 +77,6 @@ fun main(args: Array) { val executor: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor() configuration.sessions?.forEach { connParameters -> run { - if (connParameters.sessionAlias == null) { - throw IllegalArgumentException("Connection parameter {sessionAlias} can't be blank") - } val publisher = MessagePublisher( connParameters.sessionAlias, configuration.drainIntervalMills, @@ -100,14 +97,14 @@ fun main(args: Array) { aliasToService[connParameters.sessionAlias] = service service.start() } - } ?: throw java.lang.IllegalArgumentException("Connection parameters can't be blank. Use {sessions} or {parameters + sessioAlias}") + } ?: error("Connection parameters can't be blank. Use {sessions} or {parameters + sessioAlias}") rawRouter.subscribeAll { _, rawBatch -> rawBatch.messagesList.forEach { msg -> msg.runCatching { val alias = msg.metadata.id.connectionId.sessionAlias aliasToService.getOrElse( alias, - { throw IllegalArgumentException("Can't find service by alias {$alias}") } + { error("Can't find service by alias {$alias}") } ).send(this) }.onFailure { eventRouter.safeSend( @@ -183,7 +180,7 @@ private fun configureConnectionParameters( } } -private fun sendError(eventRouter: MessageRouter, reason: String?, rootEvent: Event) { +private fun sendError(eventRouter: MessageRouter, reason: String, rootEvent: Event) { eventRouter.safeSend( Event.start().endTimestamp() .status(Event.Status.FAILED) @@ -191,7 +188,7 @@ private fun sendError(eventRouter: MessageRouter, reason: String?, r .name(reason), rootEvent.id ) - throw java.lang.IllegalArgumentException(reason) + error(reason) } private fun MessageRouter.safeSend(event: Event, parentId: String?) {