diff --git a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java index 8339cec8..7ce276be 100644 --- a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java +++ b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java @@ -73,6 +73,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; @@ -108,6 +109,7 @@ public String getNameSuffix() { private final HealthMetrics consumeMetrics = new HealthMetrics(this, ConnectionType.CONSUME.getNameSuffix()); private final Map<PinId, ChannelHolder> channelsByPin = new ConcurrentHashMap<>(); private final AtomicReference<State> connectionState = new AtomicReference<>(State.OPEN); + private final AtomicBoolean isPublishingBlocked = new AtomicBoolean(false); private final ConnectionManagerConfiguration configuration; private final String subscriberName; private final AtomicInteger nextSubscriberId = new AtomicInteger(1); @@ -245,7 +247,7 @@ private void turnOffReadiness(Throwable exception) { connection = factory.newConnection(connectionName + '-' + connectionType.getNameSuffix()); LOGGER.info("Created RabbitMQ connection {} [{}]", connection, connection.hashCode()); addShutdownListenerToConnection(connection); - addBlockedListenersToConnection(connection); + addBlockedListenersToConnection(connection, connectionType); addRecoveryListenerToConnection(connection, metrics); metrics.getReadinessMonitor().enable(); LOGGER.debug("Set RabbitMQ readiness to true"); @@ -373,18 +375,34 @@ private void addRecoveryListenerToConnection(Connection conn, HealthMetrics metr } } - private void addBlockedListenersToConnection(Connection conn) { - conn.addBlockedListener(new BlockedListener() { - @Override - public void handleBlocked(String reason) { - LOGGER.warn("RabbitMQ blocked connection: {}", reason); - } + private void addBlockedListenersToConnection(Connection conn, ConnectionType type) { + BlockedListener listener = (type == ConnectionType.PUBLISH) ? + new BlockedListener() { + @Override + public void handleBlocked(String reason) { + isPublishingBlocked.set(true); + LOGGER.warn("RabbitMQ blocked publish connection: {}", reason); + } - @Override - public void handleUnblocked() { - LOGGER.warn("RabbitMQ unblocked connection"); - } - }); + @Override + public void handleUnblocked() { + isPublishingBlocked.set(false); + LOGGER.warn("RabbitMQ unblocked publish connection"); + } + } : + new BlockedListener() { + @Override + public void handleBlocked(String reason) { + LOGGER.error("RabbitMQ blocked consumer connection: {}", reason); + } + + @Override + public void handleUnblocked() { + LOGGER.error("RabbitMQ unblocked consumer connection"); + } + }; + + conn.addBlockedListener(listener); } public boolean isOpen() { @@ -449,7 +467,12 @@ private void closeConnection(Connection connection, int timeout) { public void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException, InterruptedException { ChannelHolder holder = getOrCreateChannelFor(PinId.forRoutingKey(exchange, routingKey), publishConnection); holder.retryingPublishWithLock(configuration, body, - (channel, payload) -> channel.basicPublish(exchange, routingKey, props, payload)); + (channel, payload) -> { + long start = System.currentTimeMillis(); + channel.basicPublish(exchange, routingKey, props, payload); + long delay = System.currentTimeMillis() - start; + LOGGER.error("delay = {}", delay); + }); } public String queueDeclare() throws IOException { @@ -540,6 +563,10 @@ boolean isAlive() { return publishMetrics.getLivenessMonitor().isEnabled() && consumeMetrics.getLivenessMonitor().isEnabled(); } + boolean isPublishingBlocked() { + return isPublishingBlocked.get(); + } + private ChannelHolderOptions configurationToOptions() { return new ChannelHolderOptions( configuration.getPrefetchCount(), @@ -892,7 +919,7 @@ public void retryingPublishWithLock(ConnectionManagerConfiguration configuration } catch (IOException | ShutdownSignalException e) { var currentValue = iterator.next(); var recoveryDelay = currentValue.getDelay(); - LOGGER.warn("Retrying publishing #{}, waiting for {}ms. Reason: {}", currentValue.getTryNumber(), recoveryDelay, e); + LOGGER.error("Retrying publishing #{}, waiting for {}ms. Reason: {}", currentValue.getTryNumber(), recoveryDelay, e); TimeUnit.MILLISECONDS.sleep(recoveryDelay); // cleanup after failure publishConfirmationListener.remove(msgSeq); @@ -1353,7 +1380,7 @@ public void noConfirmationWillBeReceived() { try { LOGGER.warn("Publication listener was notified that no confirmations will be received"); noConfirmationWillBeReceived = true; - // we need to unlock possible locked publisher so it can check that nothing will be confirmed + // we need to unlock possible locked publisher, so it can check that nothing will be confirmed // and retry the publication hasSpaceToWriteCondition.signalAll(); } finally { diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt index 311db435..c2f5a2cd 100644 --- a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt +++ b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt @@ -1204,6 +1204,7 @@ class TestConnectionManager { LOGGER.info { "Started with port ${rabbit.amqpPort}" } LOGGER.info { "Started with port ${rabbit.amqpPort}" } val messagesCount = 10 + val blockAfter = 3 val countDown = CountDownLatch(messagesCount) val messageSizeBytes = 7 createConnectionManager( @@ -1221,13 +1222,24 @@ class TestConnectionManager { ) ).use { manager -> repeat(messagesCount) { index -> + if (index == blockAfter) { + assertFalse(manager.isPublishingBlocked) + + // blocks all publishers ( https://www.rabbitmq.com/docs/memory ) + rabbit.executeInContainerWithLogging("rabbitmqctl", "set_vm_memory_high_watermark", "0") + } + manager.basicPublish(exchange, routingKey, null, "Hello $index".toByteArray(Charsets.UTF_8)) LOGGER.info("Published $index") - } - // blocks all publishers ( https://www.rabbitmq.com/docs/memory ) - rabbit.executeInContainerWithLogging("rabbitmqctl", "set_vm_memory_high_watermark", "0") - manager.basicPublish(exchange, routingKey, null, "Final message.".toByteArray(Charsets.UTF_8)) // this message initiates publishers blocking + if (index == blockAfter) { + // wait for blocking of publishing connection + Awaitility.await("publishing blocked") + .pollInterval(10L, TimeUnit.MILLISECONDS) + .atMost(100L, TimeUnit.MILLISECONDS) + .until { manager.isPublishingBlocked } + } + } val receivedMessages = linkedSetOf<String>() LOGGER.info { "creating consumer" } @@ -1254,6 +1266,19 @@ class TestConnectionManager { subscribeFuture.cancel(true) } + Awaitility.await("receive messages sent before blocking") + .pollInterval(10L, TimeUnit.MILLISECONDS) + .atMost(100L, TimeUnit.MILLISECONDS) + .until { blockAfter.toLong() == messagesCount - countDown.count } + + Thread.sleep(100) // ensure no more messages received + assertEquals(blockAfter.toLong(), messagesCount - countDown.count) + assertTrue(manager.isPublishingBlocked) + + // unblocks publishers + rabbit.executeInContainerWithLogging("rabbitmqctl", "set_vm_memory_high_watermark", "0.4") + assertFalse(manager.isPublishingBlocked) + // delay receiving all messages Awaitility.await("all messages received") .pollInterval(10L, TimeUnit.MILLISECONDS)