diff --git a/build.gradle b/build.gradle index 5b3cd75b..fce8d0e0 100644 --- a/build.gradle +++ b/build.gradle @@ -166,6 +166,7 @@ dependencies { testImplementation("org.junit-pioneer:junit-pioneer:2.2.0") { because("system property tests") } + testImplementation("org.awaitility:awaitility:4.2.1") testFixturesImplementation "org.jetbrains.kotlin:kotlin-test-junit5:$kotlin_version" testFixturesImplementation "org.junit.jupiter:junit-jupiter:$junitVersion" diff --git a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java index f3444e72..e4e07089 100644 --- a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java +++ b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java @@ -73,8 +73,8 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -89,7 +89,7 @@ public class ConnectionManager implements AutoCloseable { private final Connection connection; private final Map channelsByPin = new ConcurrentHashMap<>(); - private final AtomicBoolean connectionIsClosed = new AtomicBoolean(false); + private final AtomicReference connectionIsClosed = new AtomicReference<>(State.OPEN); private final ConnectionManagerConfiguration configuration; private final String subscriberName; private final AtomicInteger nextSubscriberId = new AtomicInteger(1); @@ -118,6 +118,8 @@ public ConnectionManagerConfiguration getConfiguration() { return configuration; } + private enum State { OPEN, CLOSING, CLOSED } + public ConnectionManager(@NotNull String connectionName, @NotNull RabbitMQConfiguration rabbitMQConfiguration, @NotNull ConnectionManagerConfiguration connectionManagerConfiguration) { Objects.requireNonNull(rabbitMQConfiguration, "RabbitMQ configuration cannot be null"); this.configuration = Objects.requireNonNull(connectionManagerConfiguration, "Connection manager configuration can not be null"); @@ -205,7 +207,7 @@ private void turnOffReadiness(Throwable exception) { } }); - factory.setConnectionRecoveryTriggeringCondition(shutdownSignal -> !connectionIsClosed.get()); + factory.setConnectionRecoveryTriggeringCondition(shutdownSignal -> connectionIsClosed.get() != State.CLOSED); factory.setRecoveryDelayHandler(recoveryAttempts -> { int minTime = connectionManagerConfiguration.getMinConnectionRecoveryTimeout(); @@ -358,12 +360,12 @@ public void handleUnblocked() { } public boolean isOpen() { - return connection.isOpen() && !connectionIsClosed.get(); + return connection.isOpen() && connectionIsClosed.get() == State.OPEN; } @Override public void close() { - if (connectionIsClosed.getAndSet(true)) { + if (!connectionIsClosed.compareAndSet(State.OPEN, State.CLOSING)) { LOGGER.info("Connection manager already closed"); return; } @@ -397,6 +399,8 @@ public void close() { } } + connectionIsClosed.set(State.CLOSED); + if (connection.isOpen()) { try { connection.close(closeTimeout); @@ -602,7 +606,7 @@ private void waitForConnectionRecovery(ShutdownNotifier notifier, boolean waitFo } } - if (connectionIsClosed.get()) { + if (connectionIsClosed.get() == State.CLOSED) { throw new IllegalStateException("Connection is already closed"); } } @@ -621,7 +625,8 @@ private void waitForRecovery(ShutdownNotifier notifier) { } private boolean isConnectionRecovery(ShutdownNotifier notifier) { - return !(notifier instanceof AutorecoveringChannel) && !notifier.isOpen() && !connectionIsClosed.get(); + return !(notifier instanceof AutorecoveringChannel) && !notifier.isOpen() + && connectionIsClosed.get() != State.CLOSED; } /** diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt index da1ecfc3..d96e1135 100644 --- a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt +++ b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt @@ -36,6 +36,7 @@ import com.rabbitmq.client.BuiltinExchangeType import com.rabbitmq.client.CancelCallback import com.rabbitmq.client.Delivery import mu.KotlinLogging +import org.awaitility.Awaitility import org.junit.jupiter.api.AfterAll import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Assertions @@ -110,7 +111,10 @@ class TestConnectionManager { repeat(messagesCount) { index -> if (index == 1) { // delay should allow ack for the first message be received - Thread.sleep(100) + Awaitility.await("first message is confirmed") + .pollInterval(10, TimeUnit.MILLISECONDS) + .atMost(100, TimeUnit.MILLISECONDS) + .until { countDown.count == messagesCount - 1L } // Man pages: // https://man7.org/linux/man-pages/man8/tc-netem.8.html // https://man7.org/linux/man-pages/man8/ifconfig.8.html @@ -127,7 +131,9 @@ class TestConnectionManager { // More than 2 HB will be missed // This is enough for rabbitmq server to understand the connection is lost - Thread.sleep(3_000) + Awaitility.await("connection is closed") + .atMost(3, TimeUnit.SECONDS) + .until { !manager.isOpen } // enabling network interface back rabbit.executeInContainerWithLogging("ifconfig", "eth0", "up") } @@ -149,6 +155,99 @@ class TestConnectionManager { } } + @Test + fun `connection manager redelivers unconfirmed messages on close`() { + val routingKey = "routingKey1" + val queueName = "queue1" + val exchange = "test-exchange1" + rabbit + .let { rabbit -> + declareQueue(rabbit, queueName) + declareFanoutExchangeWithBinding(rabbit, exchange, queueName) + LOGGER.info { "Started with port ${rabbit.amqpPort}" } + val messagesCount = 10 + val countDown = CountDownLatch(messagesCount) + val messageSizeBytes = 7 + createConnectionManager( + rabbit, ConnectionManagerConfiguration( + subscriberName = "test", + prefetchCount = DEFAULT_PREFETCH_COUNT, + confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT, + minConnectionRecoveryTimeout = 2000, + maxConnectionRecoveryTimeout = 2000, + // to avoid unexpected delays before recovery + retryTimeDeviationPercent = 0, + ) + ).use { manager -> + val receivedMessages = linkedSetOf() + manager.basicConsume(queueName, { _, delivery, ack -> + val message = delivery.body.toString(Charsets.UTF_8) + LOGGER.info { "Received $message from ${delivery.envelope.routingKey}" } + if (receivedMessages.add(message)) { + // decrement only unique messages + countDown.countDown() + } else { + LOGGER.warn { "Duplicated $message for ${delivery.envelope.routingKey}" } + } + ack.confirm() + }) { + LOGGER.info { "Canceled $it" } + } + + createConnectionManager( + rabbit, ConnectionManagerConfiguration( + prefetchCount = DEFAULT_PREFETCH_COUNT, + confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT, + enablePublisherConfirmation = true, + maxInflightPublicationsBytes = messagesCount * 2 * messageSizeBytes, + heartbeatIntervalSeconds = 1, + minConnectionRecoveryTimeout = 2000, + maxConnectionRecoveryTimeout = 2000, + // to avoid unexpected delays before recovery + retryTimeDeviationPercent = 0, + ) + ).use { managerWithConfirmation -> + repeat(messagesCount) { index -> + if (index == 1) { + // delay should allow ack for the first message be received + Awaitility.await("first message is confirmed") + .pollInterval(10, TimeUnit.MILLISECONDS) + .atMost(100, TimeUnit.MILLISECONDS) + .until { countDown.count == messagesCount - 1L } + // looks like if nothing is sent yet through the channel + // it will detekt connection lost right away and start waiting for recovery + rabbit.executeInContainerWithLogging("ifconfig", "eth0", "down") + } + managerWithConfirmation.basicPublish( + exchange, + routingKey, + null, + "Hello $index".toByteArray(Charsets.UTF_8) + ) + } + // ensure connection is closed because of HB timeout + Awaitility.await("connection is closed") + .atMost(4, TimeUnit.SECONDS) + .until { !managerWithConfirmation.isOpen } + rabbit.executeInContainerWithLogging("ifconfig", "eth0", "up") + // wait for connection to recover before closing + Awaitility.await("connection is recovered") + .atMost(4, TimeUnit.SECONDS) + .until { managerWithConfirmation.isOpen } + } + + countDown.assertComplete { "Not all messages were received: $receivedMessages" } + assertEquals( + (0 until messagesCount).map { + "Hello $it" + }, + receivedMessages.toList(), + "messages received in unexpected order", + ) + } + } + } + @Test fun `connection manager reports unacked messages when confirmation timeout elapsed`() { val routingKey = "routingKey1"