diff --git a/README.md b/README.md index c3e36b5fc..2c34812a0 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# th2 common library (Java) (5.8.0) +# th2 common library (Java) (5.9.0) ## Usage @@ -93,6 +93,9 @@ The `CommonFactory` reads a RabbitMQ configuration from the rabbitMQ.json file. * maxConnectionRecoveryTimeout - this option defines a maximum interval in milliseconds between reconnect attempts, with its default value set to 60000. Common factory increases the reconnect interval values from minConnectionRecoveryTimeout to maxConnectionRecoveryTimeout. +* retryTimeDeviationPercent - specifies random deviation to delay interval duration. Default value is 10 percents. + E.g. if delay interval is 30 seconds and `retryTimeDeviationPercent` is 10 percents the actual duration of interval + will be random value from 27 to 33 seconds. * prefetchCount - this option is the maximum number of messages that the server will deliver, with its value set to 0 if unlimited, the default value is set to 10. * messageRecursionLimit - an integer number denotes how deep the nested protobuf message might be, set by default 100 @@ -110,6 +113,7 @@ The `CommonFactory` reads a RabbitMQ configuration from the rabbitMQ.json file. "maxRecoveryAttempts": 5, "minConnectionRecoveryTimeout": 10000, "maxConnectionRecoveryTimeout": 60000, + "retryTimeDeviationPercent": 10, "prefetchCount": 10, "messageRecursionLimit": 100 } @@ -379,6 +383,7 @@ describes gRPC service structure) This kind of router provides the ability for component to send / receive messages via RabbitMQ. Router has several methods to subscribe and publish RabbitMQ messages steam (th2 use batches of messages or events as transport). +Supports recovery of subscriptions cancelled by RabbitMQ due to following errors: "delivery acknowledgement timed out" and "queue not found". #### Choice pin by attributes @@ -502,6 +507,13 @@ dependencies { ## Release notes +### 5.9.0-dev ++ Added retry in case of a RabbitMQ channel or connection error (when possible). ++ Added InterruptedException to basicConsume method signature. ++ Added additional logging for RabbitMQ errors. ++ Fixed connection recovery delay time. ++ Integration tests for RabbitMQ retry scenarios. + ### 5.8.0-dev + Added `NOT_WILDCARD` filter operation, which filter a field which isn't matched by wildcard expression. @@ -972,4 +984,4 @@ dependencies { ### 3.0.1 + metrics related to time measurement of an incoming message handling (Raw / Parsed / Event) migrated to - Prometheus [histogram](https://prometheus.io/docs/concepts/metric_types/#histogram) + Prometheus [histogram](https://prometheus.io/docs/concepts/metric_types/#histogram) \ No newline at end of file diff --git a/build.gradle b/build.gradle index 777ab7eb2..a89a8034c 100644 --- a/build.gradle +++ b/build.gradle @@ -12,7 +12,7 @@ buildscript { } dependencies { - classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:${kotlin_version}" + classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version" } } @@ -21,15 +21,15 @@ plugins { id 'java-library' id 'java-test-fixtures' id 'maven-publish' - id "io.github.gradle-nexus.publish-plugin" version "1.0.0" + id "io.github.gradle-nexus.publish-plugin" version "1.3.0" id 'signing' - id 'org.jetbrains.kotlin.jvm' version "${kotlin_version}" - id 'org.jetbrains.kotlin.kapt' version "${kotlin_version}" - id "org.owasp.dependencycheck" version "8.3.1" - id "me.champeau.jmh" version "0.6.8" + id 'org.jetbrains.kotlin.jvm' version "$kotlin_version" + id 'org.jetbrains.kotlin.kapt' version "$kotlin_version" + id "org.owasp.dependencycheck" version "9.0.9" + id "me.champeau.jmh" version "0.7.2" id "com.gorylenko.gradle-git-properties" version "2.4.1" id 'com.github.jk1.dependency-license-report' version '2.5' - id "de.undercouch.download" version "5.4.0" + id "de.undercouch.download" version "5.6.0" id "com.google.protobuf" version "0.9.4" } @@ -41,8 +41,8 @@ ext { protobufVersion = '3.23.2' // The protoc:3.23.3 https://github.com/protocolbuffers/protobuf/issues/13070 serviceGeneratorVersion = '3.5.1' - cradleVersion = '5.1.1-dev' - junitVersion = '5.10.0' + cradleVersion = '5.1.4-dev' + junitVersion = '5.10.2' genBaseDir = file("${buildDir}/generated/source/proto") } @@ -181,10 +181,10 @@ tasks.register('integrationTest', Test) { dependencies { api platform("com.exactpro.th2:bom:4.5.0") - api('com.exactpro.th2:grpc-common:4.3.0-dev') { + api('com.exactpro.th2:grpc-common:4.4.0-dev') { because('protobuf transport is main now, this dependnecy should be moved to grpc, mq protobuf modules after splitting') } - api("com.exactpro.th2:cradle-core:${cradleVersion}") { + api("com.exactpro.th2:cradle-core:$cradleVersion") { because('cradle is included into common library now, this dependnecy should be moved to a cradle module after splitting') } api('io.netty:netty-buffer') { @@ -195,10 +195,10 @@ dependencies { jmh 'org.openjdk.jmh:jmh-generator-annprocess:0.9' implementation 'com.google.protobuf:protobuf-java-util' - implementation "com.exactpro.th2:grpc-service-generator:${serviceGeneratorVersion}" - implementation "com.exactpro.th2:cradle-cassandra:${cradleVersion}" + implementation "com.exactpro.th2:grpc-service-generator:$serviceGeneratorVersion" + implementation "com.exactpro.th2:cradle-cassandra:$cradleVersion" - def autoValueVersion = '1.10.1' + def autoValueVersion = '1.10.4' implementation "com.google.auto.value:auto-value-annotations:$autoValueVersion" kapt("com.google.auto.value:auto-value:$autoValueVersion") { //FIXME: Updated library because it is fat jar @@ -237,7 +237,7 @@ dependencies { implementation "com.fasterxml.jackson.module:jackson-module-kotlin" implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-cbor' - implementation 'com.fasterxml.uuid:java-uuid-generator:4.0.1' + implementation "com.fasterxml.uuid:java-uuid-generator:5.0.0" implementation 'org.apache.logging.log4j:log4j-slf4j2-impl' implementation 'org.apache.logging.log4j:log4j-core' @@ -247,30 +247,29 @@ dependencies { implementation 'io.prometheus:simpleclient_httpserver' implementation 'io.prometheus:simpleclient_log4j2' - implementation('com.squareup.okio:okio:3.5.0') { + implementation("com.squareup.okio:okio:3.8.0") { because('fix vulnerability in transitive dependency ') } - implementation('com.squareup.okhttp3:okhttp:4.11.0') { + implementation("com.squareup.okhttp3:okhttp:4.12.0") { because('fix vulnerability in transitive dependency ') } - implementation('io.fabric8:kubernetes-client:6.8.0') { + implementation("io.fabric8:kubernetes-client:6.10.0") { exclude group: 'com.fasterxml.jackson.dataformat', module: 'jackson-dataformat-yaml' } - implementation 'io.github.microutils:kotlin-logging:3.0.0' // The last version bases on kotlin 1.6.0 + implementation "io.github.microutils:kotlin-logging:3.0.5" testImplementation 'javax.annotation:javax.annotation-api:1.3.2' - testImplementation "org.junit.jupiter:junit-jupiter:${junitVersion}" - testImplementation 'org.mockito.kotlin:mockito-kotlin:4.0.0' + testImplementation "org.junit.jupiter:junit-jupiter:$junitVersion" + testImplementation "org.mockito.kotlin:mockito-kotlin:5.2.1" testImplementation 'org.jetbrains.kotlin:kotlin-test-junit5' - testImplementation "org.testcontainers:testcontainers:1.17.1" - testImplementation "org.testcontainers:rabbitmq:1.17.1" - testImplementation("org.junit-pioneer:junit-pioneer:2.1.0") { + testImplementation "org.testcontainers:testcontainers:1.19.6" + testImplementation "org.testcontainers:rabbitmq:1.19.6" + testImplementation("org.junit-pioneer:junit-pioneer:2.2.0") { because("system property tests") } - - testFixturesImplementation group: 'org.jetbrains.kotlin', name: 'kotlin-test-junit5', version: kotlin_version - testFixturesImplementation "org.junit.jupiter:junit-jupiter:${junitVersion}" + testFixturesImplementation "org.jetbrains.kotlin:kotlin-test-junit5:$kotlin_version" + testFixturesImplementation "org.junit.jupiter:junit-jupiter:$junitVersion" } jar { @@ -294,14 +293,14 @@ sourceSets { protobuf { protoc { - artifact = "com.google.protobuf:protoc:${protobufVersion}" + artifact = "com.google.protobuf:protoc:$protobufVersion" } plugins { grpc { - artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}" + artifact = "io.grpc:protoc-gen-grpc-java:$grpcVersion" } services { - artifact = "com.exactpro.th2:grpc-service-generator:${serviceGeneratorVersion}:all@jar" + artifact = "com.exactpro.th2:grpc-service-generator:$serviceGeneratorVersion:all@jar" } } generateProtoTasks { @@ -355,4 +354,4 @@ licenseReport { ] excludeOwnGroup = false allowedLicensesFile = new URL("https://raw.githubusercontent.com/th2-net/.github/main/license-compliance/gradle-license-report/allowed-licenses.json") -} +} \ No newline at end of file diff --git a/gradle.properties b/gradle.properties index 77ff7d2c4..5f8be7c82 100644 --- a/gradle.properties +++ b/gradle.properties @@ -13,7 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # -release_version=5.8.0 + +release_version=5.9.0 description='th2 common library (Java)' vcs_url=https://github.com/th2-net/th2-common-j -kapt.include.compile.classpath=false +kapt.include.compile.classpath=false \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/common/schema/factory/AbstractCommonFactory.java b/src/main/java/com/exactpro/th2/common/schema/factory/AbstractCommonFactory.java index bd42b51e9..d1d82e338 100644 --- a/src/main/java/com/exactpro/th2/common/schema/factory/AbstractCommonFactory.java +++ b/src/main/java/com/exactpro/th2/common/schema/factory/AbstractCommonFactory.java @@ -658,7 +658,7 @@ protected PrometheusConfiguration loadPrometheusConfiguration() { } protected ConnectionManager createRabbitMQConnectionManager() { - return new ConnectionManager(getRabbitMqConfiguration(), getConnectionManagerConfiguration(), livenessMonitor::disable); + return new ConnectionManager(getRabbitMqConfiguration(), getConnectionManagerConfiguration()); } protected ConnectionManager getRabbitMqConnectionManager() { diff --git a/src/main/java/com/exactpro/th2/common/schema/grpc/router/AbstractGrpcRouter.java b/src/main/java/com/exactpro/th2/common/schema/grpc/router/AbstractGrpcRouter.java index e4ce18803..3774fb596 100644 --- a/src/main/java/com/exactpro/th2/common/schema/grpc/router/AbstractGrpcRouter.java +++ b/src/main/java/com/exactpro/th2/common/schema/grpc/router/AbstractGrpcRouter.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2022 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -15,8 +15,8 @@ package com.exactpro.th2.common.schema.grpc.router; -import com.exactpro.th2.common.grpc.router.ServerGrpcInterceptor; import com.exactpro.th2.common.grpc.router.MethodDetails; +import com.exactpro.th2.common.grpc.router.ServerGrpcInterceptor; import com.exactpro.th2.common.metrics.CommonMetrics; import com.exactpro.th2.common.schema.grpc.configuration.GrpcConfiguration; import com.exactpro.th2.common.schema.grpc.configuration.GrpcRouterConfiguration; diff --git a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitSubscriber.java b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitSubscriber.java index 40e44c94d..b9fa6cd7b 100644 --- a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitSubscriber.java +++ b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitSubscriber.java @@ -153,7 +153,7 @@ private void subscribe() { consumerMonitor.updateAndGet(previous -> previous == EMPTY_INITIALIZER ? Suppliers.memoize(this::basicConsume) : previous) - .get(); // initialize subscribtion + .get(); // initialize subscription } catch (Exception e) { throw new IllegalStateException("Can not start listening", e); } @@ -189,6 +189,10 @@ private SubscriberMonitor basicConsume() { return connectionManager.basicConsume(queue, this::handle, this::canceled); } catch (IOException e) { throw new IllegalStateException("Can not subscribe to queue = " + queue, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.error("Interrupted exception while consuming from queue '{}'", queue); + throw new IllegalStateException("Thread was interrupted while consuming", e); } } @@ -232,4 +236,4 @@ private void canceled(String consumerTag) { private static Supplier emptySupplier() { return (Supplier) EMPTY_INITIALIZER; } -} +} \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java index e1564aff8..e209c4b4c 100644 --- a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java +++ b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java @@ -1,5 +1,6 @@ /* * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -12,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.exactpro.th2.common.schema.message.impl.rabbitmq.connection; import com.exactpro.th2.common.metrics.HealthMetrics; @@ -22,6 +24,7 @@ import com.exactpro.th2.common.schema.message.impl.OnlyOnceConfirmation; import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.ConnectionManagerConfiguration; import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.RabbitMQConfiguration; +import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.RetryingDelay; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.BlockedListener; @@ -32,10 +35,14 @@ import com.rabbitmq.client.Consumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.ExceptionHandler; +import com.rabbitmq.client.Method; import com.rabbitmq.client.Recoverable; import com.rabbitmq.client.RecoveryListener; import com.rabbitmq.client.ShutdownNotifier; +import com.rabbitmq.client.ShutdownSignalException; import com.rabbitmq.client.TopologyRecoveryException; +import com.rabbitmq.client.impl.AMQImpl; +import com.rabbitmq.client.impl.recovery.AutorecoveringChannel; import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.builder.EqualsBuilder; @@ -43,15 +50,17 @@ import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.concurrent.GuardedBy; import java.io.IOException; -import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Collections; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -72,23 +81,36 @@ public class ConnectionManager implements AutoCloseable { private final Connection connection; private final Map channelsByPin = new ConcurrentHashMap<>(); - private final AtomicInteger connectionRecoveryAttempts = new AtomicInteger(0); private final AtomicBoolean connectionIsClosed = new AtomicBoolean(false); private final ConnectionManagerConfiguration configuration; private final String subscriberName; private final AtomicInteger nextSubscriberId = new AtomicInteger(1); private final ExecutorService sharedExecutor; - private final ScheduledExecutorService channelChecker = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder() - .setNameFormat("channel-checker-%d") - .build()); + private final ScheduledExecutorService channelChecker = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("channel-checker-%d").build() + ); private final HealthMetrics metrics = new HealthMetrics(this); + private final RecoveryListener recoveryListener = new RecoveryListener() { + @Override + public void handleRecovery(Recoverable recoverable) { + metrics.getReadinessMonitor().enable(); + LOGGER.info("Recovery finished. Set RabbitMQ readiness to true"); + metrics.getLivenessMonitor().enable(); + } + + @Override + public void handleRecoveryStarted(Recoverable recoverable) { + LOGGER.warn("Recovery started..."); + } + }; + public ConnectionManagerConfiguration getConfiguration() { return configuration; } - public ConnectionManager(@NotNull RabbitMQConfiguration rabbitMQConfiguration, @NotNull ConnectionManagerConfiguration connectionManagerConfiguration, Runnable onFailedRecoveryConnection) { + public ConnectionManager(@NotNull RabbitMQConfiguration rabbitMQConfiguration, @NotNull ConnectionManagerConfiguration connectionManagerConfiguration) { Objects.requireNonNull(rabbitMQConfiguration, "RabbitMQ configuration cannot be null"); this.configuration = Objects.requireNonNull(connectionManagerConfiguration, "Connection manager configuration can not be null"); @@ -120,7 +142,7 @@ public ConnectionManager(@NotNull RabbitMQConfiguration rabbitMQConfiguration, @ factory.setPassword(password); } - if (connectionManagerConfiguration.getConnectionTimeout() > 0) { + if (connectionManagerConfiguration.getConnectionTimeout() > 0) { factory.setConnectionTimeout(connectionManagerConfiguration.getConnectionTimeout()); } @@ -165,48 +187,31 @@ public void handleTopologyRecoveryException(Connection conn, Channel ch, Topolog turnOffReadiness(exception); } - private void turnOffReadiness(Throwable exception){ + private void turnOffReadiness(Throwable exception) { metrics.getReadinessMonitor().disable(); LOGGER.debug("Set RabbitMQ readiness to false. RabbitMQ error", exception); } }); - factory.setAutomaticRecoveryEnabled(true); - factory.setConnectionRecoveryTriggeringCondition(shutdownSignal -> { - if (connectionIsClosed.get()) { - return false; - } - - int tmpCountTriesToRecovery = connectionRecoveryAttempts.get(); - - if (tmpCountTriesToRecovery < connectionManagerConfiguration.getMaxRecoveryAttempts()) { - LOGGER.info("Try to recovery connection to RabbitMQ. Count tries = {}", tmpCountTriesToRecovery + 1); - return true; - } - LOGGER.error("Can not connect to RabbitMQ. Count tries = {}", tmpCountTriesToRecovery); - if (onFailedRecoveryConnection != null) { - onFailedRecoveryConnection.run(); - } else { - // TODO: we should stop the execution of the application. Don't use System.exit!!! - throw new IllegalStateException("Cannot recover connection to RabbitMQ"); - } - return false; - }); + factory.setConnectionRecoveryTriggeringCondition(shutdownSignal -> !connectionIsClosed.get()); factory.setRecoveryDelayHandler(recoveryAttempts -> { - int tmpCountTriesToRecovery = connectionRecoveryAttempts.getAndIncrement(); + int minTime = connectionManagerConfiguration.getMinConnectionRecoveryTimeout(); + int maxTime = connectionManagerConfiguration.getMaxConnectionRecoveryTimeout(); + int maxRecoveryAttempts = connectionManagerConfiguration.getMaxRecoveryAttempts(); + int deviationPercent = connectionManagerConfiguration.getRetryTimeDeviationPercent(); + + LOGGER.debug("Try to recovery connection to RabbitMQ. Count tries = {}", recoveryAttempts); + int recoveryDelay = RetryingDelay.getRecoveryDelay(recoveryAttempts, minTime, maxTime, maxRecoveryAttempts, deviationPercent); + if (recoveryAttempts >= maxRecoveryAttempts && metrics.getLivenessMonitor().isEnabled()) { + LOGGER.info("Set RabbitMQ liveness to false. Can't recover connection"); + metrics.getLivenessMonitor().disable(); + } - int recoveryDelay = connectionManagerConfiguration.getMinConnectionRecoveryTimeout() - + (connectionManagerConfiguration.getMaxRecoveryAttempts() > 1 - ? (connectionManagerConfiguration.getMaxConnectionRecoveryTimeout() - connectionManagerConfiguration.getMinConnectionRecoveryTimeout()) - / (connectionManagerConfiguration.getMaxRecoveryAttempts() - 1) - * tmpCountTriesToRecovery - : 0); + LOGGER.info("Recovery delay for '{}' try = {}", recoveryAttempts, recoveryDelay); + return recoveryDelay; + }); - LOGGER.info("Recovery delay for '{}' try = {}", tmpCountTriesToRecovery, recoveryDelay); - return recoveryDelay; - } - ); sharedExecutor = Executors.newFixedThreadPool(configuration.getWorkingThreads(), new ThreadFactoryBuilder() .setNameFormat("rabbitmq-shared-pool-%d") .build()); @@ -215,6 +220,9 @@ private void turnOffReadiness(Throwable exception){ try { connection = factory.newConnection(); LOGGER.info("Created RabbitMQ connection {} [{}]", connection, connection.hashCode()); + addShutdownListenerToConnection(this.connection); + addBlockedListenersToConnection(this.connection); + addRecoveryListenerToConnection(this.connection); metrics.getReadinessMonitor().enable(); LOGGER.debug("Set RabbitMQ readiness to true"); } catch (IOException | TimeoutException e) { @@ -222,34 +230,100 @@ private void turnOffReadiness(Throwable exception){ LOGGER.debug("Set RabbitMQ readiness to false. Can not create connection", e); throw new IllegalStateException("Failed to create RabbitMQ connection using configuration", e); } + } - this.connection.addBlockedListener(new BlockedListener() { - @Override - public void handleBlocked(String reason) { - LOGGER.warn("RabbitMQ blocked connection: {}", reason); - } - - @Override - public void handleUnblocked() { - LOGGER.warn("RabbitMQ unblocked connection"); + private final static List recoverableErrors = List.of( + "reply-text=NOT_FOUND", + "reply-text=PRECONDITION_FAILED" + ); + + private void addShutdownListenerToChannel(Channel channel, Boolean withRecovery) { + channel.addShutdownListener(cause -> { + LOGGER.debug("Closing the channel: ", cause); + if (!cause.isHardError() && cause.getReference() instanceof Channel) { + Channel channelCause = (Channel) cause.getReference(); + Method reason = cause.getReason(); + if (reason instanceof AMQImpl.Channel.Close) { + var castedReason = (AMQImpl.Channel.Close) reason; + if (castedReason.getReplyCode() != 200) { + StringBuilder errorBuilder = new StringBuilder("RabbitMQ soft error occurred: "); + castedReason.appendArgumentDebugStringTo(errorBuilder); + errorBuilder.append(" on channel "); + errorBuilder.append(channelCause); + String errorString = errorBuilder.toString(); + LOGGER.warn(errorString); + if (withRecovery) { + var pinIdToChannelHolder = getChannelHolderByChannel(channel); + if (pinIdToChannelHolder != null && recoverableErrors.stream().anyMatch(errorString::contains)) { + var holder = pinIdToChannelHolder.getValue(); + if (holder.isChannelSubscribed(channel)) { + var pinId = pinIdToChannelHolder.getKey(); + recoverSubscriptionsOfChannel(pinId, channel, holder); + } + } + } + } + } } }); + } - if (this.connection instanceof Recoverable) { - Recoverable recoverableConnection = (Recoverable) this.connection; - RecoveryListener recoveryListener = new RecoveryListener() { - @Override - public void handleRecovery(Recoverable recoverable) { - LOGGER.debug("Count tries to recovery connection reset to 0"); - connectionRecoveryAttempts.set(0); - metrics.getReadinessMonitor().enable(); - LOGGER.debug("Set RabbitMQ readiness to true"); + private @Nullable Map.Entry getChannelHolderByChannel(Channel channel) { + return channelsByPin + .entrySet() + .stream() + .filter(entry -> channel == entry.getValue().channel) + .findAny() + .orElse(null); + } + + private void recoverSubscriptionsOfChannel(@NotNull final PinId pinId, Channel channel, @NotNull final ChannelHolder holder) { + channelChecker.execute(() -> holder.withLock(() -> { + try { + var subscriptionCallbacks = holder.getCallbacksForRecovery(channel); + + if (subscriptionCallbacks != null) { + + LOGGER.info("Changing channel for holder with pin id: " + pinId); + + var removedHolder = channelsByPin.remove(pinId); + if (removedHolder != holder) throw new IllegalStateException("Channel holder has been replaced"); + + basicConsume(pinId.queue, subscriptionCallbacks.deliverCallback, subscriptionCallbacks.cancelCallback); } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.info("Recovering channel's subscriptions interrupted", e); + } catch (Throwable e) { + // this code executed in executor service and exception thrown here will not be handled anywhere + LOGGER.error("Failed to recovery channel's subscriptions", e); + } + })); + } - @Override - public void handleRecoveryStarted(Recoverable recoverable) { + private void addShutdownListenerToConnection(Connection conn) { + conn.addShutdownListener(cause -> { + LOGGER.debug("Closing the connection: ", cause); + if (cause.isHardError() && cause.getReference() instanceof Connection) { + Connection connectionCause = (Connection) cause.getReference(); + Method reason = cause.getReason(); + if (reason instanceof AMQImpl.Connection.Close) { + var castedReason = (AMQImpl.Connection.Close) reason; + if (castedReason.getReplyCode() != 200) { + StringBuilder errorBuilder = new StringBuilder("RabbitMQ hard error occupied: "); + castedReason.appendArgumentDebugStringTo(errorBuilder); + errorBuilder.append(" on connection "); + errorBuilder.append(connectionCause); + LOGGER.warn(errorBuilder.toString()); + } } - }; + } + }); + } + + private void addRecoveryListenerToConnection(Connection conn) { + if (conn instanceof Recoverable) { + Recoverable recoverableConnection = (Recoverable) conn; recoverableConnection.addRecoveryListener(recoveryListener); LOGGER.debug("Recovery listener was added to connection."); } else { @@ -257,6 +331,20 @@ public void handleRecoveryStarted(Recoverable recoverable) { } } + private void addBlockedListenersToConnection(Connection conn) { + conn.addBlockedListener(new BlockedListener() { + @Override + public void handleBlocked(String reason) { + LOGGER.warn("RabbitMQ blocked connection: {}", reason); + } + + @Override + public void handleUnblocked() { + LOGGER.warn("RabbitMQ unblocked connection"); + } + }); + } + public boolean isOpen() { return connection.isOpen() && !connectionIsClosed.get(); } @@ -270,11 +358,17 @@ public void close() { LOGGER.info("Closing connection manager"); + for (ChannelHolder channelHolder: channelsByPin.values()) { + try { + channelHolder.channel.abort(); + } catch (IOException e) { + LOGGER.error("Cannot close channel", e); + } + } + int closeTimeout = configuration.getConnectionCloseTimeout(); if (connection.isOpen()) { try { - // We close the connection and don't close channels - // because when a channel's connection is closed, so is the channel connection.close(closeTimeout); } catch (IOException e) { LOGGER.error("Cannot close connection", e); @@ -285,14 +379,14 @@ public void close() { shutdownExecutor(channelChecker, closeTimeout, "channel-checker"); } - public void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException { - ChannelHolder holder = getChannelFor(PinId.forRoutingKey(exchange, routingKey)); - holder.withLock(channel -> channel.basicPublish(exchange, routingKey, props, body)); + public void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws InterruptedException { + ChannelHolder holder = getOrCreateChannelFor(PinId.forRoutingKey(exchange, routingKey)); + holder.retryingPublishWithLock(channel -> channel.basicPublish(exchange, routingKey, props, body), configuration); } public String queueDeclare() throws IOException { ChannelHolder holder = new ChannelHolder(this::createChannel, this::waitForConnectionRecovery, configuration.getPrefetchCount()); - return holder.mapWithLock( channel -> { + return holder.mapWithLock(channel -> { String queue = channel.queueDeclare( "", // queue name false, // durable @@ -305,9 +399,10 @@ public String queueDeclare() throws IOException { }); } - public ExclusiveSubscriberMonitor basicConsume(String queue, ManualAckDeliveryCallback deliverCallback, CancelCallback cancelCallback) throws IOException { - ChannelHolder holder = getChannelFor(PinId.forQueue(queue)); - String tag = holder.mapWithLock(channel -> + public ExclusiveSubscriberMonitor basicConsume(String queue, ManualAckDeliveryCallback deliverCallback, CancelCallback cancelCallback) throws IOException, InterruptedException { + PinId pinId = PinId.forQueue(queue); + ChannelHolder holder = getOrCreateChannelFor(pinId, new SubscriptionCallbacks(deliverCallback, cancelCallback)); + String tag = holder.retryingConsumeWithLock(channel -> channel.basicConsume(queue, false, subscriberName + "_" + nextSubscriberId.getAndIncrement(), (tagTmp, delivery) -> { try { Envelope envelope = delivery.getEnvelope(); @@ -321,6 +416,9 @@ public void reject() throws IOException { holder.withLock(ch -> { try { basicReject(ch, deliveryTag); + } catch (IOException | ShutdownSignalException e) { + LOGGER.warn("Error during basicReject of message with deliveryTag = {} inside channel #{}: {}", deliveryTag, ch.getChannelNumber(), e); + throw e; } finally { holder.release(() -> metrics.getReadinessMonitor().enable()); } @@ -332,6 +430,9 @@ public void confirm() throws IOException { holder.withLock(ch -> { try { basicAck(ch, deliveryTag); + } catch (IOException | ShutdownSignalException e) { + LOGGER.warn("Error during basicAck of message with deliveryTag = {} inside channel #{}: {}", deliveryTag, ch.getChannelNumber(), e); + throw e; } finally { holder.release(() -> metrics.getReadinessMonitor().enable()); } @@ -341,7 +442,6 @@ public void confirm() throws IOException { Confirmation confirmation = OnlyOnceConfirmation.wrap("from " + routingKey + " to " + queue, wrappedConfirmation); - holder.withLock(() -> holder.acquireAndSubmitCheck(() -> channelChecker.schedule(() -> { holder.withLock(() -> { @@ -359,7 +459,7 @@ public void confirm() throws IOException { } catch (IOException | RuntimeException e) { LOGGER.error("Cannot handle delivery for tag {}: {}", tagTmp, e.getMessage(), e); } - }, cancelCallback)); + }, cancelCallback), configuration); return new RabbitMqSubscriberMonitor(holder, queue, tag, this::basicCancel); } @@ -377,7 +477,7 @@ private void basicCancel(Channel channel, String consumerTag) throws IOException } public String queueExclusiveDeclareAndBind(String exchange) throws IOException, TimeoutException { - try(Channel channel = createChannel()) { + try (Channel channel = createChannel()) { String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue, exchange, EMPTY_ROUTING_KEY); LOGGER.info("Declared the '{}' queue to listen to the '{}'", queue, exchange); @@ -398,13 +498,30 @@ private void shutdownExecutor(ExecutorService executor, int closeTimeout, String } } - private ChannelHolder getChannelFor(PinId pinId) { + private static final class SubscriptionCallbacks { + private final ManualAckDeliveryCallback deliverCallback; + private final CancelCallback cancelCallback; + + public SubscriptionCallbacks(ManualAckDeliveryCallback deliverCallback, CancelCallback cancelCallback) { + this.deliverCallback = deliverCallback; + this.cancelCallback = cancelCallback; + } + } + + private ChannelHolder getOrCreateChannelFor(PinId pinId) { return channelsByPin.computeIfAbsent(pinId, ignore -> { LOGGER.trace("Creating channel holder for {}", pinId); return new ChannelHolder(this::createChannel, this::waitForConnectionRecovery, configuration.getPrefetchCount()); }); } + private ChannelHolder getOrCreateChannelFor(PinId pinId, SubscriptionCallbacks subscriptionCallbacks) { + return channelsByPin.computeIfAbsent(pinId, ignore -> { + LOGGER.trace("Creating channel holder with callbacks for {}", pinId); + return new ChannelHolder(() -> createChannelWithOptionalRecovery(true), this::waitForConnectionRecovery, configuration.getPrefetchCount(), subscriptionCallbacks); + }); + } + private void putChannelFor(PinId pinId, ChannelHolder holder) { ChannelHolder previous = channelsByPin.putIfAbsent(pinId, holder); if (previous != null) { @@ -413,12 +530,17 @@ private void putChannelFor(PinId pinId, ChannelHolder holder) { } private Channel createChannel() { + return createChannelWithOptionalRecovery(false); + } + + private Channel createChannelWithOptionalRecovery(Boolean withRecovery) { waitForConnectionRecovery(connection); try { Channel channel = connection.createChannel(); Objects.requireNonNull(channel, () -> "No channels are available in the connection. Max channel number: " + connection.getChannelMax()); channel.basicQos(configuration.getPrefetchCount()); + addShutdownListenerToChannel(channel, withRecovery); channel.addReturnListener(ret -> LOGGER.warn("Can not router message to exchange '{}', routing key '{}'. Reply code '{}' and text = {}", ret.getExchange(), ret.getRoutingKey(), ret.getReplyCode(), ret.getReplyText())); LOGGER.info("Created new RabbitMQ channel {} via connection {}", channel.getChannelNumber(), connection.hashCode()); @@ -460,7 +582,7 @@ private void waitForRecovery(ShutdownNotifier notifier) { } private boolean isConnectionRecovery(ShutdownNotifier notifier) { - return !notifier.isOpen() && !connectionIsClosed.get(); + return !(notifier instanceof AutorecoveringChannel) && !notifier.isOpen() && !connectionIsClosed.get(); } /** @@ -476,16 +598,12 @@ private static void basicReject(Channel channel, long deliveryTag) throws IOExce } private static class RabbitMqSubscriberMonitor implements ExclusiveSubscriberMonitor { - private final ChannelHolder holder; private final String queue; private final String tag; private final CancelAction action; - public RabbitMqSubscriberMonitor(ChannelHolder holder, - String queue, - String tag, - CancelAction action) { + public RabbitMqSubscriberMonitor(ChannelHolder holder, String queue, String tag, CancelAction action) { this.holder = holder; this.queue = queue; this.tag = tag; @@ -499,7 +617,7 @@ public RabbitMqSubscriberMonitor(ChannelHolder holder, @Override public void unsubscribe() throws IOException { - holder.withLock(false, channel -> action.execute(channel, tag)); + holder.unsubscribeWithLock(tag, action); } } @@ -566,12 +684,16 @@ private static class ChannelHolder { private final Supplier supplier; private final BiConsumer reconnectionChecker; private final int maxCount; + private final SubscriptionCallbacks subscriptionCallbacks; @GuardedBy("lock") private int pending; @GuardedBy("lock") private Future check; - @GuardedBy("lock") + @GuardedBy("lock") // or by `subscribingLock` for `basicConsume` channels private Channel channel; + private final Lock subscribingLock = new ReentrantLock(); + @GuardedBy("subscribingLock") + private boolean isSubscribed = false; public ChannelHolder( Supplier supplier, @@ -581,6 +703,20 @@ public ChannelHolder( this.supplier = Objects.requireNonNull(supplier, "'Supplier' parameter"); this.reconnectionChecker = Objects.requireNonNull(reconnectionChecker, "'Reconnection checker' parameter"); this.maxCount = maxCount; + this.subscriptionCallbacks = null; + } + + public ChannelHolder( + Supplier supplier, + BiConsumer reconnectionChecker, + int maxCount, + SubscriptionCallbacks subscriptionCallbacks + + ) { + this.supplier = Objects.requireNonNull(supplier, "'Supplier' parameter"); + this.reconnectionChecker = Objects.requireNonNull(reconnectionChecker, "'Reconnection checker' parameter"); + this.maxCount = maxCount; + this.subscriptionCallbacks = subscriptionCallbacks; } public void withLock(ChannelConsumer consumer) throws IOException { @@ -605,6 +741,123 @@ public void withLock(boolean waitForRecovery, ChannelConsumer consumer) throws I } } + public void retryingPublishWithLock(ChannelConsumer consumer, ConnectionManagerConfiguration configuration) throws InterruptedException { + lock.lock(); + try { + Iterator iterator = configuration.createRetryingDelaySequence().iterator(); + Channel tempChannel = getChannel(true); + while (true) { + try { + consumer.consume(tempChannel); + break; + } catch (IOException | ShutdownSignalException e) { + var currentValue = iterator.next(); + var recoveryDelay = currentValue.getDelay(); + LOGGER.warn("Retrying publishing #{}, waiting for {}ms. Reason: {}", currentValue.getTryNumber(), recoveryDelay, e); + TimeUnit.MILLISECONDS.sleep(recoveryDelay); + + // We should not recover the channel if its connection is closed + // If we do that the channel will be also auto recovered by RabbitMQ client + // during connection recovery, and we will get two new channels instead of one closed. + if (!tempChannel.isOpen() && tempChannel.getConnection().isOpen()) { + tempChannel = recreateChannel(); + } + } + } + } finally { + lock.unlock(); + } + } + + public T retryingConsumeWithLock(ChannelMapper mapper, ConnectionManagerConfiguration configuration) throws InterruptedException, IOException { + lock.lock(); + try { + Iterator iterator = null; + IOException exception; + Channel tempChannel = null; + boolean isChannelClosed = false; + while (true) { + subscribingLock.lock(); + try { + if (tempChannel == null) { + tempChannel = getChannel(); + } else if (isChannelClosed) { + tempChannel = recreateChannel(); + } + + var tag = mapper.map(tempChannel); + isSubscribed = true; + return tag; + } catch (IOException e) { + var reason = tempChannel.getCloseReason(); + isChannelClosed = reason != null; + + // We should not retry in this case because we never will be able to connect to the queue if we + // receive this error. This error happens if we try to subscribe to an exclusive queue that was + // created by another connection. + if (isChannelClosed && reason.getMessage().contains("reply-text=RESOURCE_LOCKED")) { + throw e; + } + exception = e; + } finally { + subscribingLock.unlock(); + } + iterator = handleAndSleep(configuration, iterator, "Retrying consume", exception); + } + } finally { + lock.unlock(); + } + } + + public @Nullable SubscriptionCallbacks getCallbacksForRecovery(Channel channelToRecover) { + if (!isSubscribed) { + // if unsubscribe() method was invoked after channel failure + LOGGER.warn("Channel's consumer was unsubscribed."); + return null; + } + + if (channel != channelToRecover) { + // this can happens if basicConsume() method was invoked by client code after channel failure + LOGGER.warn("Channel already recreated."); + return null; + } + + // recovery should not be called for `basicPublish` channels + if (subscriptionCallbacks == null) throw new IllegalStateException("Channel has no consumer"); + + return subscriptionCallbacks; + } + + public void unsubscribeWithLock(String tag, CancelAction action) throws IOException { + lock.lock(); + try { + subscribingLock.lock(); + try { + action.execute(channel, tag); + isSubscribed = false; + } finally { + subscribingLock.unlock(); + } + } finally { + lock.unlock(); + } + } + + @NotNull + private static Iterator handleAndSleep( + ConnectionManagerConfiguration configuration, + Iterator iterator, + String comment, + Exception e) throws InterruptedException { + iterator = iterator == null ? configuration.createRetryingDelaySequence().iterator() : iterator; + RetryingDelay currentValue = iterator.next(); + int recoveryDelay = currentValue.getDelay(); + + LOGGER.warn("{} #{}, waiting for {} ms, then recreating channel. Reason: {}", comment, currentValue.getTryNumber(), recoveryDelay, e); + TimeUnit.MILLISECONDS.sleep(recoveryDelay); + return iterator; + } + public T mapWithLock(ChannelMapper mapper) throws IOException { lock.lock(); try { @@ -660,6 +913,15 @@ public void acquireAndSubmitCheck(Supplier> futureSupplier) { } } + public boolean isChannelSubscribed(Channel channel) { + subscribingLock.lock(); + try { + return isSubscribed && channel == this.channel; + } finally { + subscribingLock.unlock(); + } + } + public boolean reachedPendingLimit() { lock.lock(); try { @@ -673,6 +935,12 @@ private Channel getChannel() { return getChannel(true); } + private Channel recreateChannel() { + channel = supplier.get(); + reconnectionChecker.accept(channel, true); + return channel; + } + private Channel getChannel(boolean waitForRecovery) { if (channel == null) { channel = supplier.get(); @@ -689,4 +957,4 @@ private interface ChannelMapper { private interface ChannelConsumer { void consume(Channel channel) throws IOException; } -} +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/configuration/RabbitMQConfiguration.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/configuration/RabbitMQConfiguration.kt index a8d2813be..6c147e5c0 100644 --- a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/configuration/RabbitMQConfiguration.kt +++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/configuration/RabbitMQConfiguration.kt @@ -1,5 +1,5 @@ /* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -18,31 +18,88 @@ package com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration import com.exactpro.th2.common.schema.configuration.Configuration import com.fasterxml.jackson.annotation.JsonProperty import java.time.Duration +import java.util.concurrent.ThreadLocalRandom data class RabbitMQConfiguration( - @JsonProperty(required = true) var host: String, - @JsonProperty(required = true) @get:JsonProperty("vHost") var vHost: String, - @JsonProperty(required = true) var port: Int = 5672, - @JsonProperty(required = true) var username: String, - @JsonProperty(required = true) var password: String, + @JsonProperty(required = true) val host: String, + @JsonProperty(required = true) @get:JsonProperty("vHost") val vHost: String, + @JsonProperty(required = true) val port: Int = 5672, + @JsonProperty(required = true) val username: String, + @JsonProperty(required = true) val password: String, @Deprecated(message = "Please use subscriber name from ConnectionManagerConfiguration") - var subscriberName: String? = null, //FIXME: Remove in future version - var exchangeName: String? = null) : Configuration() + val subscriberName: String? = null, //FIXME: Remove in future version + val exchangeName: String? = null +) : Configuration() data class ConnectionManagerConfiguration( - var subscriberName: String? = null, - var connectionTimeout: Int = -1, - var connectionCloseTimeout: Int = 10000, - var maxRecoveryAttempts: Int = 5, - var minConnectionRecoveryTimeout: Int = 10000, - var maxConnectionRecoveryTimeout: Int = 60000, + val subscriberName: String? = null, + val connectionTimeout: Int = -1, + val connectionCloseTimeout: Int = 10000, + val maxRecoveryAttempts: Int = 5, + val minConnectionRecoveryTimeout: Int = 10000, + val maxConnectionRecoveryTimeout: Int = 60000, val prefetchCount: Int = 10, + val retryTimeDeviationPercent: Int = 10, val messageRecursionLimit: Int = 100, val workingThreads: Int = 1, val confirmationTimeout: Duration = Duration.ofMinutes(5) ) : Configuration() { init { + check(maxRecoveryAttempts > 0) { "expected 'maxRecoveryAttempts' greater than 0 but was $maxRecoveryAttempts" } + check(minConnectionRecoveryTimeout > 0) { "expected 'minConnectionRecoveryTimeout' greater than 0 but was $minConnectionRecoveryTimeout" } + check(maxConnectionRecoveryTimeout >= minConnectionRecoveryTimeout) { "expected 'maxConnectionRecoveryTimeout' ($maxConnectionRecoveryTimeout) no less than 'minConnectionRecoveryTimeout' ($minConnectionRecoveryTimeout)" } + check(retryTimeDeviationPercent in 0..100) { "expected 'retryTimeDeviationPercent' no less than 0 and not greater than 100 but was $retryTimeDeviationPercent" } check(workingThreads > 0) { "expected 'workingThreads' greater than 0 but was $workingThreads" } check(!confirmationTimeout.run { isNegative || isZero }) { "expected 'confirmationTimeout' greater than 0 but was $confirmationTimeout" } } + + fun createRetryingDelaySequence(): Sequence { + return generateSequence(RetryingDelay(0, minConnectionRecoveryTimeout)) { + RetryingDelay(it.tryNumber + 1, RetryingDelay.getRecoveryDelay( + it.tryNumber + 1, + minConnectionRecoveryTimeout, + maxConnectionRecoveryTimeout, + maxRecoveryAttempts, + retryTimeDeviationPercent + )) + } + } +} + +data class RetryingDelay(val tryNumber: Int, val delay: Int) { + companion object { + @JvmStatic + fun getRecoveryDelay( + numberOfTries: Int, + minTime: Int, + maxTime: Int, + maxRecoveryAttempts: Int, + deviationPercent: Int + ): Int { + return if (numberOfTries <= maxRecoveryAttempts) { + getRecoveryDelayWithIncrement(numberOfTries, minTime, maxTime, maxRecoveryAttempts, deviationPercent) + } else { + getRecoveryDelayWithDeviation(maxTime, deviationPercent) + } + } + + private fun getRecoveryDelayWithDeviation(maxTime: Int, deviationPercent: Int): Int { + val recoveryDelay: Int + val deviation = maxTime * deviationPercent / 100 + recoveryDelay = ThreadLocalRandom.current().nextInt(maxTime - deviation, maxTime + deviation + 1) + return recoveryDelay + } + + private fun getRecoveryDelayWithIncrement( + numberOfTries: Int, + minTime: Int, + maxTime: Int, + maxRecoveryAttempts: Int, + deviationPercent: Int + ): Int { + val delay = minTime + (maxTime - minTime) / maxRecoveryAttempts * numberOfTries + val deviation = maxTime * deviationPercent / 100 + return ThreadLocalRandom.current().nextInt(delay - deviation, delay + deviation + 1) + } + } } \ No newline at end of file diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/message/ContainerConstants.kt b/src/test/kotlin/com/exactpro/th2/common/schema/message/ContainerConstants.kt new file mode 100644 index 000000000..807838118 --- /dev/null +++ b/src/test/kotlin/com/exactpro/th2/common/schema/message/ContainerConstants.kt @@ -0,0 +1,30 @@ +/* + * Copyright 2023-2024 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.common.schema.message + +import org.testcontainers.utility.DockerImageName +import java.time.Duration + +object ContainerConstants { + @JvmField val RABBITMQ_IMAGE_NAME: DockerImageName = DockerImageName.parse("rabbitmq:3.13.0-management-alpine") + const val ROUTING_KEY = "routingKey" + const val QUEUE_NAME = "queue" + const val EXCHANGE = "test-exchange" + + const val DEFAULT_PREFETCH_COUNT = 10 + @JvmField val DEFAULT_CONFIRMATION_TIMEOUT: Duration = Duration.ofSeconds(1) +} \ No newline at end of file diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitRouterIntegrationTest.kt b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitRouterIntegrationTest.kt index d432f2ccc..2a0864404 100644 --- a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitRouterIntegrationTest.kt +++ b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitRouterIntegrationTest.kt @@ -13,10 +13,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.exactpro.th2.common.schema.message.impl.rabbitmq import com.exactpro.th2.common.annotations.IntegrationTest import com.exactpro.th2.common.schema.box.configuration.BoxConfiguration +import com.exactpro.th2.common.schema.message.ContainerConstants.DEFAULT_CONFIRMATION_TIMEOUT +import com.exactpro.th2.common.schema.message.ContainerConstants.DEFAULT_PREFETCH_COUNT +import com.exactpro.th2.common.schema.message.ContainerConstants.EXCHANGE +import com.exactpro.th2.common.schema.message.ContainerConstants.QUEUE_NAME +import com.exactpro.th2.common.schema.message.ContainerConstants.RABBITMQ_IMAGE_NAME +import com.exactpro.th2.common.schema.message.ContainerConstants.ROUTING_KEY import com.exactpro.th2.common.schema.message.ManualAckDeliveryCallback import com.exactpro.th2.common.schema.message.configuration.MessageRouterConfiguration import com.exactpro.th2.common.schema.message.configuration.QueueConfiguration @@ -30,7 +37,6 @@ import mu.KotlinLogging import org.junit.jupiter.api.Assertions.assertNull import org.mockito.kotlin.mock import org.testcontainers.containers.RabbitMQContainer -import org.testcontainers.utility.DockerImageName import java.time.Duration import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.TimeUnit @@ -43,7 +49,7 @@ class AbstractRabbitRouterIntegrationTest { @Test fun `receive unconfirmed message after resubscribe`() { - RabbitMQContainer(DockerImageName.parse(RABBITMQ_MANAGEMENT_ALPINE)) + RabbitMQContainer(RABBITMQ_IMAGE_NAME) .withExchange(EXCHANGE, BuiltinExchangeType.DIRECT.type, false, false, true, emptyMap()) .withQueue(QUEUE_NAME, false, true, emptyMap()) .withBinding( @@ -166,9 +172,7 @@ class AbstractRabbitRouterIntegrationTest { prefetchCount = prefetchCount, confirmationTimeout = confirmationTimeout, ), - ) { - K_LOGGER.error { "Fatal connection problem" } - } + ) private fun createRouter(connectionManager: ConnectionManager) = RabbitCustomRouter( "test-custom-tag", @@ -203,14 +207,6 @@ class AbstractRabbitRouterIntegrationTest { companion object { private val K_LOGGER = KotlinLogging.logger { } - private const val RABBITMQ_MANAGEMENT_ALPINE = "rabbitmq:3.11.2-management-alpine" - private const val ROUTING_KEY = "routingKey" - private const val QUEUE_NAME = "queue" - private const val EXCHANGE = "test-exchange" - - private const val DEFAULT_PREFETCH_COUNT = 10 - private val DEFAULT_CONFIRMATION_TIMEOUT: Duration = Duration.ofSeconds(1) - private class Expectation( val message: String, val redelivery: Boolean, diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt index 66396b795..342b58d9a 100644 --- a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt +++ b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt @@ -1,5 +1,5 @@ /* - * Copyright 2022 Exactpro (Exactpro Systems Limited) + * Copyright 2022-2023 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,65 +13,68 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.exactpro.th2.common.schema.message.impl.rabbitmq.connection import com.exactpro.th2.common.annotations.IntegrationTest +import com.exactpro.th2.common.schema.message.ContainerConstants.DEFAULT_CONFIRMATION_TIMEOUT +import com.exactpro.th2.common.schema.message.ContainerConstants.DEFAULT_PREFETCH_COUNT +import com.exactpro.th2.common.schema.message.ContainerConstants.RABBITMQ_IMAGE_NAME +import com.exactpro.th2.common.schema.message.DeliveryMetadata import com.exactpro.th2.common.schema.message.ManualAckDeliveryCallback +import com.exactpro.th2.common.schema.message.SubscriberMonitor import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.ConnectionManagerConfiguration import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.RabbitMQConfiguration +import com.exactpro.th2.common.util.declareFanoutExchangeWithBinding +import com.exactpro.th2.common.util.declareQueue +import com.exactpro.th2.common.util.getChannelsInfo +import com.exactpro.th2.common.util.getQueuesInfo +import com.exactpro.th2.common.util.getSubscribedChannelsCount +import com.exactpro.th2.common.util.putMessageInQueue import com.rabbitmq.client.BuiltinExchangeType +import java.time.Duration +import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicInteger +import kotlin.concurrent.thread import com.rabbitmq.client.CancelCallback +import com.rabbitmq.client.Delivery import mu.KotlinLogging +import org.junit.jupiter.api.AfterAll import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertNotEquals import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.BeforeAll import org.junit.jupiter.api.Test import org.testcontainers.containers.RabbitMQContainer -import org.testcontainers.utility.DockerImageName +import org.testcontainers.utility.MountableFile import java.io.IOException -import java.time.Duration -import java.util.concurrent.ArrayBlockingQueue -import java.util.concurrent.CountDownLatch -import java.util.concurrent.TimeUnit import kotlin.test.assertFailsWith -private val LOGGER = KotlinLogging.logger { } - - @IntegrationTest class TestConnectionManager { - @Test fun `connection manager reports unacked messages when confirmation timeout elapsed`() { - val routingKey = "routingKey" - val queueName = "queue" - val exchange = "test-exchange" - val prefetchCount = 10 - RabbitMQContainer(DockerImageName.parse("rabbitmq:3.8-management-alpine")) - .withExchange(exchange, BuiltinExchangeType.FANOUT.type, false, false, true, emptyMap()) - .withQueue(queueName) - .withBinding(exchange, queueName, emptyMap(), routingKey, "queue") - .use { - it.start() + val routingKey = "routingKey1" + val queueName = "queue1" + val exchange = "test-exchange1" + rabbit + .let { + declareQueue(rabbit, queueName) + declareFanoutExchangeWithBinding(rabbit, exchange, queueName) LOGGER.info { "Started with port ${it.amqpPort}" } - val queue = ArrayBlockingQueue(prefetchCount) - val countDown = CountDownLatch(prefetchCount) - val confirmationTimeout = Duration.ofSeconds(1) - ConnectionManager( - RabbitMQConfiguration( - host = it.host, - vHost = "", - port = it.amqpPort, - username = it.adminUsername, - password = it.adminPassword, - ), - ConnectionManagerConfiguration( + val queue = ArrayBlockingQueue(DEFAULT_PREFETCH_COUNT) + val countDown = CountDownLatch(DEFAULT_PREFETCH_COUNT) + createConnectionManager( + it, ConnectionManagerConfiguration( subscriberName = "test", - prefetchCount = prefetchCount, - confirmationTimeout = confirmationTimeout, - ), - ) { - LOGGER.error { "Fatal connection problem" } - }.use { manager -> + prefetchCount = DEFAULT_PREFETCH_COUNT, + confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT, + ) + ).use { manager -> manager.basicConsume(queueName, { _, delivery, ack -> LOGGER.info { "Received ${delivery.body.toString(Charsets.UTF_8)} from ${delivery.envelope.routingKey}" } queue += ack @@ -80,24 +83,19 @@ class TestConnectionManager { LOGGER.info { "Canceled $it" } } - repeat(prefetchCount + 1) { index -> + repeat(DEFAULT_PREFETCH_COUNT + 1) { index -> manager.basicPublish(exchange, routingKey, null, "Hello $index".toByteArray(Charsets.UTF_8)) } - assertTrue( - countDown.await( - 1L, - TimeUnit.SECONDS - ) - ) { "Not all messages were received: ${countDown.count}" } + countDown.assertComplete("Not all messages were received") assertTrue(manager.isAlive) { "Manager should still be alive" } assertTrue(manager.isReady) { "Manager should be ready until the confirmation timeout expires" } - Thread.sleep(confirmationTimeout.toMillis() + 100/*just in case*/) // wait for confirmation timeout + Thread.sleep(DEFAULT_CONFIRMATION_TIMEOUT.toMillis() + 100/*just in case*/) // wait for confirmation timeout assertTrue(manager.isAlive) { "Manager should still be alive" } - Assertions.assertFalse(manager.isReady) { "Manager should not be ready" } + assertFalse(manager.isReady) { "Manager should not be ready" } queue.poll().confirm() @@ -107,32 +105,824 @@ class TestConnectionManager { val receivedData = generateSequence { queue.poll(10L, TimeUnit.MILLISECONDS) } .onEach(ManualAckDeliveryCallback.Confirmation::confirm) .count() - Assertions.assertEquals(prefetchCount, receivedData) { "Unexpected number of messages received" } + assertEquals(DEFAULT_PREFETCH_COUNT, receivedData) { "Unexpected number of messages received" } } } } @Test - fun `connection manager exclusive queue test`() { - RabbitMQContainer(DockerImageName.parse("rabbitmq:3.8-management-alpine")) - .use { rabbitMQContainer -> - rabbitMQContainer.start() + fun `connection manager receives a message from a queue that did not exist at the time of subscription`() { + val wrongQueue = "wrong-queue2" + rabbit + .let { rabbitMQContainer -> LOGGER.info { "Started with port ${rabbitMQContainer.amqpPort}" } + createConnectionManager( + rabbitMQContainer, + ConnectionManagerConfiguration( + subscriberName = "test", + prefetchCount = DEFAULT_PREFETCH_COUNT, + confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT, + minConnectionRecoveryTimeout = 100, + maxConnectionRecoveryTimeout = 200, + maxRecoveryAttempts = 5 + ), + ).use { connectionManager -> + var thread: Thread? = null + var monitor: SubscriberMonitor? = null + val consume = CountDownLatch(1) + try { + thread = thread { + monitor = connectionManager.basicConsume(wrongQueue, { _, delivery, ack -> + LOGGER.info { "Received ${delivery.body.toString(Charsets.UTF_8)} from ${delivery.envelope.routingKey}" } + consume.countDown() + ack.confirm() + }) { + LOGGER.info { "Canceled $it" } + } + } + + assertTarget(true, message = "Thread for consuming isn't started", func = thread::isAlive) + // todo check isReady and isAlive, it should be false at some point +// assertTarget(false, "Readiness probe doesn't fall down", connectionManager::isReady) + + LOGGER.info { "creating the queue..." } + declareQueue(rabbitMQContainer, wrongQueue) + assertTarget(false, message = "Thread for consuming isn't completed", func = thread::isAlive) + + LOGGER.info { + "Adding message to the queue:\n${putMessageInQueue(rabbitMQContainer, wrongQueue)}" + } + LOGGER.info { + "queues list: \n ${ + rabbitMQContainer.execInContainer( + "rabbitmqctl", + "list_queues" + ) + }" + } + + consume.assertComplete("Unexpected number of messages received. The message should be received") + + assertEquals(1, getSubscribedChannelsCount(rabbitMQContainer, wrongQueue)) + assertTrue(connectionManager.isAlive) + assertTrue(connectionManager.isReady) + } finally { + Assertions.assertDoesNotThrow { + monitor!!.unsubscribe() + } + thread?.let { + thread.interrupt() + thread.join(100) + assertFalse(thread.isAlive) + } + } + + } + } + } + + @Test + fun `connection manager sends a message to wrong exchange`() { + val queueName = "queue3" + val exchange = "test-exchange3" + rabbit + .let { + declareQueue(rabbit, queueName) + LOGGER.info { "Started with port ${it.amqpPort}" } + val counter = AtomicInteger(0) + createConnectionManager( + it, + ConnectionManagerConfiguration( + subscriberName = "test", + prefetchCount = DEFAULT_PREFETCH_COUNT, + confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT, + minConnectionRecoveryTimeout = 100, + maxConnectionRecoveryTimeout = 200, + maxRecoveryAttempts = 5 + ), + ).use { connectionManager -> + var monitor: SubscriberMonitor? = null + try { + monitor = connectionManager.basicConsume(queueName, { _, delivery, _ -> + counter.incrementAndGet() + LOGGER.info { "Received ${delivery.body.toString(Charsets.UTF_8)} from \"${delivery.envelope.routingKey}\"" } + }) { + LOGGER.info { "Canceled $it" } + } + + LOGGER.info { "Starting first publishing..." } + connectionManager.basicPublish(exchange, "", null, "Hello1".toByteArray(Charsets.UTF_8)) + Thread.sleep(200) + LOGGER.info { "Publication finished!" } + assertEquals( + 0, + counter.get() + ) { "Unexpected number of messages received. The first message shouldn't be received" } + Thread.sleep(200) + LOGGER.info { "Creating the correct exchange..." } + declareFanoutExchangeWithBinding(it, exchange, queueName) + Thread.sleep(200) + LOGGER.info { "Exchange created!" } + + Assertions.assertDoesNotThrow { + connectionManager.basicPublish(exchange, "", null, "Hello2".toByteArray(Charsets.UTF_8)) + } + + Thread.sleep(200) + assertEquals( + 1, + counter.get() + ) { "Unexpected number of messages received. The second message should be received" } + } finally { + Assertions.assertNotNull(monitor) + Assertions.assertDoesNotThrow { + monitor!!.unsubscribe() + } + } + + } + } + } + + @Test + fun `connection manager handles ack timeout`() { + val configFilename = "rabbitmq_it.conf" + val queueName = "queue4" + + RabbitMQContainer(RABBITMQ_IMAGE_NAME) + .withRabbitMQConfig(MountableFile.forClasspathResource(configFilename)) + .withQueue(queueName) + .use { + it.start() + LOGGER.info { "Started with port ${it.amqpPort}" } + + createConnectionManager( + it, + ConnectionManagerConfiguration( + subscriberName = "test", + prefetchCount = DEFAULT_PREFETCH_COUNT, + confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT, + minConnectionRecoveryTimeout = 100, + maxConnectionRecoveryTimeout = 200, + maxRecoveryAttempts = 5 + ), + ).use { connectionManager -> + val consume = CountDownLatch(3) + + connectionManager.basicConsume(queueName, { _, delivery, _ -> + LOGGER.info { "Received 1 ${delivery.body.toString(Charsets.UTF_8)} from \"${delivery.envelope.routingKey}\"" } + consume.countDown() + }) { + LOGGER.info { "Canceled $it" } + } + + LOGGER.info { "Sending first message" } + putMessageInQueue(it, queueName) + assertTarget(3 - 1, message = "Consume first message") { consume.count } + + LOGGER.info { "queues list: \n ${getQueuesInfo(it)}" } + val channels1 = getChannelsInfo(it) + + LOGGER.info { channels1 } + LOGGER.info { "Waiting for ack timeout ..." } + + assertTarget(3 - 2, 63_000, "Consume first message again") { consume.count } + val channels2 = getChannelsInfo(it) + LOGGER.info { channels2 } + + LOGGER.info { "Sending second message" } + putMessageInQueue(it, queueName) + + val queuesListExecResult = getQueuesInfo(it) + LOGGER.info { "queues list: \n $queuesListExecResult" } + + assertEquals(1, getSubscribedChannelsCount(it, queueName)) + { "There is must be single channel after recovery" } + assertNotEquals(channels1, channels2) { "The recovered channel must have another pid" } + + consume.assertComplete("Wrong number of received messages") + assertTrue( + queuesListExecResult.toString().contains("$queueName\t2") + ) { "There should be no messages left in the queue" } + + } + } + } + + @Test + fun `connection manager handles ack timeout with several channels`() { + val configFilename = "rabbitmq_it.conf" + val queueNames = arrayOf("separate_queues1", "separate_queues2", "separate_queues3") + + RabbitMQContainer(RABBITMQ_IMAGE_NAME) + .withRabbitMQConfig(MountableFile.forClasspathResource(configFilename)) + .withQueue(queueNames[0]) + .withQueue(queueNames[1]) + .withQueue(queueNames[2]) + .use { + it.start() + LOGGER.info { "Started with port ${it.amqpPort}" } + val counters = mapOf( + queueNames[0] to AtomicInteger(), // this subscriber won't ack the first delivery + queueNames[1] to AtomicInteger(-1), // this subscriber won't ack two first deliveries + queueNames[2] to AtomicInteger(1) + ) + createConnectionManager( + it, + ConnectionManagerConfiguration( + subscriberName = "test", + prefetchCount = DEFAULT_PREFETCH_COUNT, + confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT, + minConnectionRecoveryTimeout = 100, + maxConnectionRecoveryTimeout = 200, + maxRecoveryAttempts = 5 + ), + ).use { connectionManager -> + + fun subscribeOnQueue( + queue: String + ) { + connectionManager.basicConsume(queue, { _, delivery, ack -> + LOGGER.info { "Received from queue $queue ${delivery.body.toString(Charsets.UTF_8)}" } + if (counters[queue]!!.get() > 0) { + ack.confirm() + LOGGER.info { "Confirmed message form $queue" } + } else { + LOGGER.info { "Left this message from $queue unacked" } + } + counters[queue]!!.incrementAndGet() + }, { + LOGGER.info { "Canceled message form queue $queue" } + }) + } + + subscribeOnQueue(queueNames[0]) + subscribeOnQueue(queueNames[1]) + subscribeOnQueue(queueNames[2]) + + LOGGER.info { "Sending the first message batch" } + putMessageInQueue(it, queueNames[0]) + putMessageInQueue(it, queueNames[1]) + putMessageInQueue(it, queueNames[2]) + + LOGGER.info { "queues list: \n ${getQueuesInfo(it)}" } + LOGGER.info { "Sleeping..." } + Thread.sleep(30000) + + LOGGER.info { "Sending the second message batch" } + putMessageInQueue(it, queueNames[0]) + putMessageInQueue(it, queueNames[1]) + putMessageInQueue(it, queueNames[2]) + + LOGGER.info { "Still sleeping. Waiting for PRECONDITION_FAILED..." } + Thread.sleep(32000) + + LOGGER.info { "Sending the third message batch" } + putMessageInQueue(it, queueNames[0]) + putMessageInQueue(it, queueNames[1]) + putMessageInQueue(it, queueNames[2]) + + val queuesListExecResult = getQueuesInfo(it) + LOGGER.info { "queues list: \n $queuesListExecResult" } + + for (queueName in queueNames) { + assertTrue(queuesListExecResult.toString().contains("$queueName\t0")) + { "There should be no messages left in queue $queueName" } + assertEquals(1, getSubscribedChannelsCount(it, queueName)) + } + + // 0 + 1 failed ack + 2 successful ack + 1 ack of requeued message + assertEquals(4, counters[queueNames[0]]!!.get()) + { "Wrong number of received messages from queue ${queueNames[0]}" } + // -1 + 2 failed ack + 2 ack of requeued message + 1 successful ack + assertEquals(4, counters[queueNames[1]]!!.get()) + { "Wrong number of received messages from queue ${queueNames[1]}" } + assertEquals(4, counters[queueNames[2]]!!.get()) + { "Wrong number of received messages from queue ${queueNames[2]}" } + + } + } + } + + @Test + fun `connection manager receives a messages after container restart`() { + val queueName = "queue5" + val amqpPort = 5672 + val container = object : RabbitMQContainer(RABBITMQ_IMAGE_NAME) { + init { super.addFixedExposedPort(amqpPort, amqpPort) } + } + + container + .use { + it.start() + declareQueue(it, queueName) + LOGGER.info { "Started with port ${it.amqpPort}" } + ConnectionManager( + RabbitMQConfiguration( + host = it.host, + vHost = "", + port = amqpPort, + username = it.adminUsername, + password = it.adminPassword, + ), + ConnectionManagerConfiguration( + subscriberName = "test", + prefetchCount = DEFAULT_PREFETCH_COUNT, + confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT, + minConnectionRecoveryTimeout = 1000, + maxConnectionRecoveryTimeout = 2000, + connectionTimeout = 1000, + maxRecoveryAttempts = 5 + ), + ).use { connectionManager -> + val consume = CountDownLatch(1) + connectionManager.basicConsume(queueName, { _, delivery, ack -> + LOGGER.info { "Received ${delivery.body.toString(Charsets.UTF_8)} from ${delivery.envelope.routingKey}" } + consume.countDown() + ack.confirm() + }) { + LOGGER.info { "Canceled $it" } + } + LOGGER.info { "Rabbit address- ${it.host}:${it.amqpPort}" } + + LOGGER.info { "Restarting the container" } + it.stop() + it.start() + Thread.sleep(5_000) + declareQueue(it, queueName) + Thread.sleep(5_000) + + LOGGER.info { "Rabbit address after restart - ${it.host}:${it.amqpPort}" } + LOGGER.info { getQueuesInfo(it) } + + LOGGER.info { "Starting publishing..." } + putMessageInQueue(it, queueName) + assertEquals(1, getSubscribedChannelsCount(it, queueName)) + + LOGGER.info { "Publication finished!" } + LOGGER.info { getQueuesInfo(it) } + + consume.assertComplete("Wrong number of received messages") + assertTrue(getQueuesInfo(it).toString().contains("$queueName\t0")) { + "There should be no messages left in the queue" + } + } + } + } + + @Test + fun `connection manager publish a message and receives it`() { + val queueName = "queue6" + val exchange = "test-exchange6" + val routingKey = "routingKey6" + + rabbit + .let { + LOGGER.info { "Started with port ${it.amqpPort}" } + val counter = AtomicInteger(0) + createConnectionManager( + it, + ConnectionManagerConfiguration( + subscriberName = "test", + prefetchCount = DEFAULT_PREFETCH_COUNT, + confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT, + minConnectionRecoveryTimeout = 10000, + maxConnectionRecoveryTimeout = 20000, + connectionTimeout = 10000, + maxRecoveryAttempts = 5 + ), + ).use { connectionManager -> + var monitor: SubscriberMonitor? = null + try { + declareQueue(it, queueName) + declareFanoutExchangeWithBinding(it, exchange, queueName) + + connectionManager.basicPublish(exchange, routingKey, null, "Hello1".toByteArray(Charsets.UTF_8)) + + Thread.sleep(200) + monitor = connectionManager.basicConsume(queueName, { _, delivery, ack -> + LOGGER.info { "Received ${delivery.body.toString(Charsets.UTF_8)} from ${delivery.envelope.routingKey}" } + counter.incrementAndGet() + ack.confirm() + }) { + LOGGER.info { "Canceled $it" } + } + Thread.sleep(200) + + assertEquals(1, getSubscribedChannelsCount(it, queueName)) + assertEquals(1, counter.get()) { "Wrong number of received messages" } + assertTrue( + getQueuesInfo(it).toString().contains("$queueName\t0") + ) { "There should be no messages left in the queue" } + } finally { + Assertions.assertNotNull(monitor) + Assertions.assertDoesNotThrow { + monitor!!.unsubscribe() + } + } + + } + } + } + + @Test + fun `connection manager handles ack timeout on queue with publishing by the manager`() { + val configFilename = "rabbitmq_it.conf" + val queueName = "queue7" + val exchange = "test-exchange7" + val routingKey = "routingKey7" + + + RabbitMQContainer(RABBITMQ_IMAGE_NAME) + .withRabbitMQConfig(MountableFile.forClasspathResource(configFilename)) + .withExchange(exchange, BuiltinExchangeType.FANOUT.type, false, false, true, emptyMap()) + .withQueue(queueName) + .withBinding(exchange, queueName, emptyMap(), routingKey, "queue") + .use { + it.start() + LOGGER.info { "Started with port ${it.amqpPort}" } + val counter = AtomicInteger(0) + createConnectionManager( + it, + ConnectionManagerConfiguration( + subscriberName = "test", + prefetchCount = DEFAULT_PREFETCH_COUNT, + confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT, + minConnectionRecoveryTimeout = 100, + maxConnectionRecoveryTimeout = 200, + maxRecoveryAttempts = 5 + ), + ).use { connectionManager -> + connectionManager.basicConsume(queueName, { _, delivery, ack -> + LOGGER.info { "Received ${delivery.body.toString(Charsets.UTF_8)} " } + if (counter.get() != 0) { + ack.confirm() + LOGGER.info { "Confirmed!" } + } else { + LOGGER.info { "Left this message unacked" } + } + counter.incrementAndGet() + }) { + LOGGER.info { "Canceled $it" } + } + + + LOGGER.info { "Sending the first message" } + connectionManager.basicPublish(exchange, routingKey, null, "Hello1".toByteArray(Charsets.UTF_8)) + + LOGGER.info { "queues list: \n ${getQueuesInfo(it)}" } + LOGGER.info { "Sleeping..." } + Thread.sleep(33000) + + + LOGGER.info { "Sending the second message" } + connectionManager.basicPublish(exchange, routingKey, null, "Hello2".toByteArray(Charsets.UTF_8)) + + Thread.sleep(30000) + + LOGGER.info { "Sending the third message" } + connectionManager.basicPublish(exchange, routingKey, null, "Hello3".toByteArray(Charsets.UTF_8)) + + val queuesListExecResult = getQueuesInfo(it) + LOGGER.info { "queues list: \n $queuesListExecResult" } + + assertEquals(1, getSubscribedChannelsCount(it, queueName)) + assertEquals(4, counter.get()) { "Wrong number of received messages" } + assertTrue( + queuesListExecResult.toString().contains("$queueName\t0") + ) { "There should be no messages left in the queue" } + + } + } + } + + @Test + fun `thread interruption test`() { + val queueName = "queue8" + rabbit + .let { + LOGGER.info { "Started with port ${it.amqpPort}" } + val counter = AtomicInteger(0) + createConnectionManager( + it, + ConnectionManagerConfiguration( + subscriberName = "test", + prefetchCount = DEFAULT_PREFETCH_COUNT, + confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT, + minConnectionRecoveryTimeout = 100, + maxConnectionRecoveryTimeout = 2000, + connectionTimeout = 1000, + maxRecoveryAttempts = 5 + ), + ).use { connectionManager -> + var thread: Thread? = null + try { + thread = thread { + connectionManager.basicConsume(queueName, { _, delivery, ack -> + LOGGER.info { "Received ${delivery.body.toString(Charsets.UTF_8)} from ${delivery.envelope.routingKey}" } + counter.incrementAndGet() + ack.confirm() + }) { + LOGGER.info { "Canceled $it" } + } + } + + assertTarget(true, message = "Thread for consuming isn't started", func = thread::isAlive) + Thread.sleep(1000) + assertTrue(thread.isAlive) + LOGGER.info { "Interrupting..." } + thread.interrupt() + LOGGER.info { "Interrupted!" } + assertTarget(false, message = "Thread for consuming isn't stopped", func = thread::isAlive) + assertEquals(0, counter.get()) { "Wrong number of received messages" } + assertEquals(0, getSubscribedChannelsCount(it, queueName)) {"There should be no subscribed channels"} + } finally { + thread?.let { + thread.interrupt() + thread.join(100) + assertFalse(thread.isAlive) + } + } + } + } + } + + @Test + fun `connection manager handles subscription cancel`() { + val queueName = "queue9" + rabbit + .let { + LOGGER.info { "Started with port ${it.amqpPort}" } + val counter = AtomicInteger(0) + createConnectionManager( + it, + ConnectionManagerConfiguration( + subscriberName = "test", + prefetchCount = DEFAULT_PREFETCH_COUNT, + confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT, + minConnectionRecoveryTimeout = 100, + maxConnectionRecoveryTimeout = 2000, + connectionTimeout = 1000, + maxRecoveryAttempts = 5 + ), + ).use { connectionManager -> + + var thread: Thread? = null + var monitor: SubscriberMonitor? + try { + declareQueue(it, queueName) + + thread = thread { + monitor = connectionManager.basicConsume(queueName, { _, delivery, ack -> + LOGGER.info { "Received ${delivery.body.toString(Charsets.UTF_8)} from ${delivery.envelope.routingKey}" } + counter.incrementAndGet() + ack.confirm() + }) { + LOGGER.info { "Canceled $it" } + } + + Thread.sleep(2500) + LOGGER.info { "Unsubscribing..." } + monitor!!.unsubscribe() + } + for (i in 1..5) { + putMessageInQueue(it, queueName) + Thread.sleep(1000) + } - createConnectionManager(rabbitMQContainer).use { firstManager -> - createConnectionManager(rabbitMQContainer).use { secondManager -> - val queue = firstManager.queueDeclare() + assertEquals(0, getSubscribedChannelsCount(it, queueName)) {"There should be no subscribed channels"} - assertFailsWith("Another connection can subscribe to the $queue queue") { - secondManager.basicConsume(queue, { _, _, _ -> }, {}) + assertEquals(3, counter.get()) { "Wrong number of received messages" } + assertTrue( + getQueuesInfo(it).toString().contains("$queueName\t2") + ) { "There should be messages in the queue" } + } finally { + Assertions.assertNotNull(thread) + Assertions.assertDoesNotThrow { + thread!!.interrupt() } + assertFalse(thread!!.isAlive) + } + + } + } + } + + @Test + fun `connection manager handles ack timeout (multiple subscribers in parallel)`() { + val configFilename = "rabbitmq_it.conf" + + class Counters { + val messages = AtomicInteger(0) + val redeliveredMessages = AtomicInteger(0) + } + + class ConsumerParams( + val unsubscribe: Boolean, + val expectedReceivedMessages: Int, + val expectedRedeliveredMessages: Int + ) + + class TestParams( + val queueName: String, + val subscriberName: String, + val consumers: List, + val messagesToSend: Int, + val expectedChannelsCount: Int, + val expectedLeftMessages: Int + ) + + val testCases = listOf( + TestParams( + queueName = "queue1", + subscriberName = "subscriber1", + consumers = listOf( + ConsumerParams( + unsubscribe = false, + expectedReceivedMessages = 3, + expectedRedeliveredMessages = 1 + ) + ), + expectedChannelsCount = 1, + messagesToSend = 2, + expectedLeftMessages = 0 + ), + + TestParams( + queueName = "queue2", + subscriberName = "subscriber2", + consumers = listOf( + ConsumerParams( + unsubscribe = true, + expectedReceivedMessages = 2, + expectedRedeliveredMessages = 0 + ) + ), + expectedChannelsCount = 0, + messagesToSend = 2, + expectedLeftMessages = 1 + ) + ) + + RabbitMQContainer(RABBITMQ_IMAGE_NAME) + .withRabbitMQConfig(MountableFile.forClasspathResource(configFilename)) + .apply { testCases.forEach { withQueue(it.queueName) } } + .use { container -> + container.start() + LOGGER.info { "Started with port ${container.amqpPort}" } + + class TestCaseContext( + val connectionManager: ConnectionManager, + val consumersThreads: List, + val consumerCounters: List + ) + + val testCasesContexts: List = testCases.map { params -> + val connectionManager = createConnectionManager( + container, + ConnectionManagerConfiguration( + subscriberName = params.subscriberName, + prefetchCount = DEFAULT_PREFETCH_COUNT, + confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT, + minConnectionRecoveryTimeout = 100, + maxConnectionRecoveryTimeout = 200, + maxRecoveryAttempts = 5 + ) + ) + + val consumerCounters: List = List(params.consumers.size) { Counters() } - extracted(firstManager, secondManager, queue, 3) - extracted(firstManager, secondManager, queue, 6) + class DeliverCallback(private val consumerNumber: Int) : ManualAckDeliveryCallback { + override fun handle( + deliveryMetadata: DeliveryMetadata, + delivery: Delivery, + confirmProcessed: ManualAckDeliveryCallback.Confirmation + ) { + val consumerCounter = consumerCounters[consumerNumber] + + LOGGER.info { "Received ${delivery.body.toString(Charsets.UTF_8)} from \"${delivery.envelope.routingKey}\"" } + if (consumerCounter.messages.getAndIncrement() == 1) { + LOGGER.info { "Left this message unacked" } + } else { + confirmProcessed.confirm() + LOGGER.info { "Confirmed!" } + } + + if (delivery.envelope.isRedeliver) { + consumerCounter.redeliveredMessages.incrementAndGet() + } + } + } + + val consumersThreads = params.consumers.mapIndexed { index, subscriberParams -> + thread { + val subscriberMonitor = connectionManager.basicConsume(params.queueName, DeliverCallback(index)) { + LOGGER.info { "Canceled $it" } + } + + Thread.sleep(1000) + + if (subscriberParams.unsubscribe) { + LOGGER.info { "Unsubscribing..." } + subscriberMonitor.unsubscribe() + } + } + } + + repeat(params.messagesToSend) { + LOGGER.info { "Sending message ${it + 1} to queue ${params.queueName}" } + putMessageInQueue(container, params.queueName) } + + TestCaseContext(connectionManager, consumersThreads, consumerCounters) } + LOGGER.info { "Sleeping..." } + Thread.sleep(63000) + + val queuesListExecResult = getQueuesInfo(container) + LOGGER.info { "queues list: \n $queuesListExecResult" } + + testCases.forEachIndexed { index, params -> + val context = testCasesContexts[index] + assertEquals(params.expectedChannelsCount, getSubscribedChannelsCount(container, params.queueName)) { + "Wrong number of opened channels (subscriber: `${params.subscriberName}`)" + } + + params.consumers.forEachIndexed { consumerIndex, consumerParams -> + val counters = context.consumerCounters[consumerIndex] + assertEquals(consumerParams.expectedReceivedMessages, counters.messages.get()) { + "Wrong number of received messages (subscriber: `${params.subscriberName}`, consumer index: `${consumerIndex}`)" + } + + assertEquals(consumerParams.expectedRedeliveredMessages, counters.redeliveredMessages.get()) { + "Wrong number of redelivered messages (subscriber: `${params.subscriberName}`, consumer index: `${consumerIndex}`)" + } + } + + assertTrue(queuesListExecResult.toString().contains("${params.queueName}\t${params.expectedLeftMessages}")) { + "There should ${params.expectedLeftMessages} message(s) left in the '${params.queueName}' queue" + } + } + + testCasesContexts.forEach { context -> + context.consumersThreads.forEach { it.interrupt() } + context.connectionManager.close() + } + } + } + + private fun CountDownLatch.assertComplete(message: String) { + assertTrue( + await( + 1L, + TimeUnit.SECONDS + ) + ) { "$message, actual count: $count" } + } + + private fun assertTarget(target: T, timeout: Long = 1_000, message: String, func: () -> T) { + val start = System.currentTimeMillis() + while (System.currentTimeMillis() - start < timeout) { + if (func() == target) { + return + } + Thread.sleep(100) + } + assertEquals(target, func(), message) + } + + private fun createConnectionManager(container: RabbitMQContainer, configuration: ConnectionManagerConfiguration) = + ConnectionManager( + RabbitMQConfiguration( + host = container.host, + vHost = "", + port = container.amqpPort, + username = container.adminUsername, + password = container.adminPassword, + ), + configuration + ) + + @Test + fun `connection manager exclusive queue test`() { + RabbitMQContainer(RABBITMQ_IMAGE_NAME).use { rabbitMQContainer -> + rabbitMQContainer.start() + LOGGER.info { "Started with port ${rabbitMQContainer.amqpPort}" } + + createConnectionManager(rabbitMQContainer).use { firstManager -> + createConnectionManager(rabbitMQContainer).use { secondManager -> + val queue = firstManager.queueDeclare() + + assertFailsWith("Another connection can subscribe to the $queue queue") { + secondManager.basicConsume(queue, { _, _, _ -> }, {}) + } + + extracted(firstManager, secondManager, queue, 3) + extracted(firstManager, secondManager, queue, 6) + } } + } } private fun extracted( @@ -178,7 +968,7 @@ class TestConnectionManager { private fun createConnectionManager( rabbitMQContainer: RabbitMQContainer, prefetchCount: Int = DEFAULT_PREFETCH_COUNT, - confirmationTimeout: Duration = DEFAULT_CONFIRMATION_TIMEOUT + confirmationTimeout: Duration = DEFAULT_CONFIRMATION_TIMEOUT, ) = ConnectionManager( RabbitMQConfiguration( host = rabbitMQContainer.host, @@ -190,14 +980,26 @@ class TestConnectionManager { ConnectionManagerConfiguration( subscriberName = "test", prefetchCount = prefetchCount, - confirmationTimeout = confirmationTimeout, - ), - ) { - LOGGER.error { "Fatal connection problem" } - } + confirmationTimeout = confirmationTimeout + ) + ) companion object { - private const val DEFAULT_PREFETCH_COUNT = 10 - private val DEFAULT_CONFIRMATION_TIMEOUT: Duration = Duration.ofSeconds(1) + private val LOGGER = KotlinLogging.logger { } + + private lateinit var rabbit: RabbitMQContainer + + @BeforeAll + @JvmStatic + fun initRabbit() { + rabbit = RabbitMQContainer(RABBITMQ_IMAGE_NAME) + rabbit.start() + } + + @AfterAll + @JvmStatic + fun closeRabbit() { + rabbit.close() + } } } \ No newline at end of file diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/IntegrationTestRabbitMessageBatchRouter.kt b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/IntegrationTestRabbitMessageBatchRouter.kt index 963825782..b9e953ac5 100644 --- a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/IntegrationTestRabbitMessageBatchRouter.kt +++ b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/IntegrationTestRabbitMessageBatchRouter.kt @@ -1,5 +1,6 @@ /* - * Copyright 2022-2022 Exactpro (Exactpro Systems Limited) + * Copyright 2022-2023 Exactpro (Exactpro Systems Limited) + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -12,11 +13,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.exactpro.th2.common.schema.message.impl.rabbitmq.group import com.exactpro.th2.common.annotations.IntegrationTest import com.exactpro.th2.common.grpc.MessageGroupBatch import com.exactpro.th2.common.schema.box.configuration.BoxConfiguration +import com.exactpro.th2.common.schema.message.ContainerConstants.DEFAULT_CONFIRMATION_TIMEOUT +import com.exactpro.th2.common.schema.message.ContainerConstants.DEFAULT_PREFETCH_COUNT +import com.exactpro.th2.common.schema.message.ContainerConstants.EXCHANGE +import com.exactpro.th2.common.schema.message.ContainerConstants.QUEUE_NAME +import com.exactpro.th2.common.schema.message.ContainerConstants.RABBITMQ_IMAGE_NAME +import com.exactpro.th2.common.schema.message.ContainerConstants.ROUTING_KEY import com.exactpro.th2.common.schema.message.configuration.MessageRouterConfiguration import com.exactpro.th2.common.schema.message.impl.context.DefaultMessageRouterContext import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.ConnectionManagerConfiguration @@ -27,7 +35,6 @@ import mu.KotlinLogging import org.junit.jupiter.api.Test import org.mockito.kotlin.mock import org.testcontainers.containers.RabbitMQContainer -import org.testcontainers.utility.DockerImageName import java.time.Duration import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit @@ -35,10 +42,9 @@ import kotlin.test.assertTrue @IntegrationTest class IntegrationTestRabbitMessageGroupBatchRouter { - @Test fun `subscribe to exclusive queue`() { - RabbitMQContainer(DockerImageName.parse(RABBITMQ_3_8_MANAGEMENT_ALPINE)) + RabbitMQContainer(RABBITMQ_IMAGE_NAME) .use { rabbitMQContainer -> rabbitMQContainer.start() LOGGER.info { "Started with port ${rabbitMQContainer.amqpPort}" } @@ -65,7 +71,7 @@ class IntegrationTestRabbitMessageGroupBatchRouter { @Test fun `send receive message group batch`() { - RabbitMQContainer(DockerImageName.parse(RABBITMQ_3_8_MANAGEMENT_ALPINE)) + RabbitMQContainer(RABBITMQ_IMAGE_NAME) .withExchange(EXCHANGE, BuiltinExchangeType.DIRECT.type, false, false, true, emptyMap()) .withQueue(QUEUE_NAME) .withBinding(EXCHANGE, QUEUE_NAME, emptyMap(), ROUTING_KEY, "queue") @@ -123,19 +129,9 @@ class IntegrationTestRabbitMessageGroupBatchRouter { prefetchCount = prefetchCount, confirmationTimeout = confirmationTimeout, ), - ) { - LOGGER.error { "Fatal connection problem" } - } + ) companion object { private val LOGGER = KotlinLogging.logger { } - - private const val RABBITMQ_3_8_MANAGEMENT_ALPINE = "rabbitmq:3.8-management-alpine" - private const val ROUTING_KEY = "routingKey" - private const val QUEUE_NAME = "queue" - private const val EXCHANGE = "test-exchange" - - private const val DEFAULT_PREFETCH_COUNT = 10 - private val DEFAULT_CONFIRMATION_TIMEOUT: Duration = Duration.ofSeconds(1) } } \ No newline at end of file diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/TransportGroupBatchRouterIntegrationTest.kt b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/TransportGroupBatchRouterIntegrationTest.kt index ffbbd5702..873004c00 100644 --- a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/TransportGroupBatchRouterIntegrationTest.kt +++ b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/TransportGroupBatchRouterIntegrationTest.kt @@ -13,10 +13,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.exactpro.th2.common.schema.message.impl.rabbitmq.transport import com.exactpro.th2.common.annotations.IntegrationTest import com.exactpro.th2.common.schema.box.configuration.BoxConfiguration +import com.exactpro.th2.common.schema.message.ContainerConstants.DEFAULT_CONFIRMATION_TIMEOUT +import com.exactpro.th2.common.schema.message.ContainerConstants.DEFAULT_PREFETCH_COUNT +import com.exactpro.th2.common.schema.message.ContainerConstants.EXCHANGE +import com.exactpro.th2.common.schema.message.ContainerConstants.QUEUE_NAME +import com.exactpro.th2.common.schema.message.ContainerConstants.RABBITMQ_IMAGE_NAME +import com.exactpro.th2.common.schema.message.ContainerConstants.ROUTING_KEY import com.exactpro.th2.common.schema.message.configuration.MessageRouterConfiguration import com.exactpro.th2.common.schema.message.impl.context.DefaultMessageRouterContext import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.ConnectionManagerConfiguration @@ -27,7 +34,6 @@ import mu.KotlinLogging import org.junit.jupiter.api.Test import org.mockito.kotlin.mock import org.testcontainers.containers.RabbitMQContainer -import org.testcontainers.utility.DockerImageName import java.time.Duration import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit @@ -38,7 +44,7 @@ class TransportGroupBatchRouterIntegrationTest { @Test fun `subscribe to exclusive queue`() { - RabbitMQContainer(DockerImageName.parse(RABBITMQ_3_8_MANAGEMENT_ALPINE)) + RabbitMQContainer(RABBITMQ_IMAGE_NAME) .use { rabbitMQContainer -> rabbitMQContainer.start() LOGGER.info { "Started with port ${rabbitMQContainer.amqpPort}" } @@ -68,7 +74,7 @@ class TransportGroupBatchRouterIntegrationTest { @Test fun `send receive message group batch`() { - RabbitMQContainer(DockerImageName.parse(RABBITMQ_3_8_MANAGEMENT_ALPINE)) + RabbitMQContainer(RABBITMQ_IMAGE_NAME) .withExchange(EXCHANGE, BuiltinExchangeType.DIRECT.type, false, false, true, emptyMap()) .withQueue(QUEUE_NAME) .withBinding(EXCHANGE, QUEUE_NAME, emptyMap(), ROUTING_KEY, "queue") @@ -129,19 +135,9 @@ class TransportGroupBatchRouterIntegrationTest { prefetchCount = prefetchCount, confirmationTimeout = confirmationTimeout, ), - ) { - LOGGER.error { "Fatal connection problem" } - } + ) companion object { private val LOGGER = KotlinLogging.logger { } - - private const val RABBITMQ_3_8_MANAGEMENT_ALPINE = "rabbitmq:3.8-management-alpine" - private const val ROUTING_KEY = "routingKey" - private const val QUEUE_NAME = "queue" - private const val EXCHANGE = "test-exchange" - - private const val DEFAULT_PREFETCH_COUNT = 10 - private val DEFAULT_CONFIRMATION_TIMEOUT: Duration = Duration.ofSeconds(1) } } \ No newline at end of file diff --git a/src/test/kotlin/com/exactpro/th2/common/util/RabbitTestContainerUtil.kt b/src/test/kotlin/com/exactpro/th2/common/util/RabbitTestContainerUtil.kt new file mode 100644 index 000000000..8d5e82e01 --- /dev/null +++ b/src/test/kotlin/com/exactpro/th2/common/util/RabbitTestContainerUtil.kt @@ -0,0 +1,75 @@ +/* + * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@file:JvmName("RabbitTestContainerUtil") + +package com.exactpro.th2.common.util + +import kotlin.random.Random +import org.testcontainers.containers.Container +import org.testcontainers.containers.RabbitMQContainer + +fun declareQueue(rabbit: RabbitMQContainer, queueName: String): Container.ExecResult? = + execCommandWithSplit(rabbit, "rabbitmqadmin declare queue name=$queueName durable=false") + +fun declareFanoutExchangeWithBinding(rabbit: RabbitMQContainer, exchangeName: String, destinationQueue: String) { + execCommandWithSplit(rabbit, "rabbitmqadmin declare exchange name=$exchangeName type=fanout") + execCommandWithSplit( + rabbit, + "rabbitmqadmin declare binding source=$exchangeName destination_type=queue destination=$destinationQueue" + ) +} + +fun putMessageInQueue(rabbit: RabbitMQContainer, queueName: String): Container.ExecResult? = + execCommandWithSplit( + rabbit, + """rabbitmqadmin publish exchange=amq.default routing_key=$queueName payload="hello-${ + Random.nextInt( + 0, + 1000 + ) + }"""" + ) + +fun getQueuesInfo(rabbit: RabbitMQContainer): Container.ExecResult? = + execCommandWithSplit(rabbit, "rabbitmqctl list_queues") + +fun getChannelsInfo(rabbit: RabbitMQContainer): String = + execCommandWithSplit(rabbit, "rabbitmqctl list_consumers").toString() + +fun getSubscribedChannelsCount(rabbitMQContainer: RabbitMQContainer, queue: String): Int = + getChannelsInfo(rabbitMQContainer).countMatches(queue) + +fun restartContainer(rabbit: RabbitMQContainer) { + val tag: String = rabbit.containerId + val snapshotId: String = rabbit.dockerClient + .commitCmd(tag) + .withRepository("temp-rabbit") + .withTag(tag) + .exec() + rabbit.stop() + rabbit.dockerImageName = snapshotId + rabbit.start() +} + +private fun execCommandWithSplit(rabbit: RabbitMQContainer, command: String): Container.ExecResult? = + rabbit.execInContainer(*command.split(" ").toTypedArray()) + +private fun String.countMatches(pattern: String): Int = + substringAfter("active\targuments") + .split(pattern) + .dropLastWhile { s -> s.isEmpty() } + .toTypedArray().size - 1 \ No newline at end of file diff --git a/src/test/resources/rabbitmq_it.conf b/src/test/resources/rabbitmq_it.conf new file mode 100644 index 000000000..2942e21de --- /dev/null +++ b/src/test/resources/rabbitmq_it.conf @@ -0,0 +1,7 @@ +# According to rabbitMQ docs: Values lower than one minute are not supported +# Whether the timeout should be enforced is evaluated periodically, at one minute intervals +# (https://www.rabbitmq.com/consumers.html#acknowledgement-timeout) +# Actually, timeouts less than a minute are applied as expected contrary to the documentation (RabbitMQ 3.12.7). +# Using small timeouts to reduce testing time +consumer_timeout = 1000 +loopback_users.guest = false \ No newline at end of file diff --git a/src/test/resources/test_json_configurations/connection_manager.json b/src/test/resources/test_json_configurations/connection_manager.json index 4711383fb..fe8dee838 100644 --- a/src/test/resources/test_json_configurations/connection_manager.json +++ b/src/test/resources/test_json_configurations/connection_manager.json @@ -5,5 +5,6 @@ "maxRecoveryAttempts": 8, "minConnectionRecoveryTimeout": 8888, "maxConnectionRecoveryTimeout": 88888, - "prefetchCount": 1 + "prefetchCount": 1, + "retryTimeDeviationPercent": 10 } \ No newline at end of file