From 8c205221ce1f9e18213a34279ced940209ebc074 Mon Sep 17 00:00:00 2001 From: Oleg Date: Sun, 2 Jun 2024 17:51:48 +0400 Subject: [PATCH] Migrate inflight limit from messages count to payload size in bytes --- README.md | 2 +- .../connection/ConnectionManager.java | 88 +++++++++++++------ .../configuration/RabbitMQConfiguration.kt | 4 +- .../connection/ConnectionManualBenchmark.kt | 6 +- .../connection/TestConnectionManager.kt | 29 ++++-- 5 files changed, 86 insertions(+), 43 deletions(-) diff --git a/README.md b/README.md index 4510450a..c3b35bfb 100644 --- a/README.md +++ b/README.md @@ -516,7 +516,7 @@ dependencies { + Added functionality for publisher confirmations to mitigate network issues for message producers. + New parameters are added to connection manager configuration: + enablePublisherConfirmation - enables publisher confirmation. `false` by default. - + maxInflightPublications - the max number of unconfirmed published messages per channel. `200`, by default. + + maxInflightPublicationsBytes - the max number of unconfirmed published messages per channel. `52428800` (50 MB), by default. + heartbeatIntervalSeconds - rabbitmq connection heartbeat interval in seconds. `0` by default (that means the default interval will be set by the internal library used to communicate with RabbitMQ). diff --git a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java index dce147ca..01a3113c 100644 --- a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java +++ b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java @@ -504,7 +504,7 @@ private ChannelHolderOptions configurationToOptions() { return new ChannelHolderOptions( configuration.getPrefetchCount(), configuration.getEnablePublisherConfirmation(), - configuration.getMaxInflightPublications() + configuration.getMaxInflightPublicationsBytes() ); } @@ -718,12 +718,12 @@ public String toString() { private static class ChannelHolderOptions { private final int maxCount; private final boolean enablePublisherConfirmation; - private final int maxInflightRequests; + private final int maxInflightRequestsBytes; - private ChannelHolderOptions(int maxCount, boolean enablePublisherConfirmation, int maxInflightRequests) { + private ChannelHolderOptions(int maxCount, boolean enablePublisherConfirmation, int maxInflightRequestsBytes) { this.maxCount = maxCount; this.enablePublisherConfirmation = enablePublisherConfirmation; - this.maxInflightRequests = maxInflightRequests; + this.maxInflightRequestsBytes = maxInflightRequestsBytes; } public int getMaxCount() { @@ -734,8 +734,8 @@ public boolean isEnablePublisherConfirmation() { return enablePublisherConfirmation; } - public int getMaxInflightRequests() { - return maxInflightRequests; + public int getMaxInflightRequestsBytes() { + return maxInflightRequestsBytes; } } @@ -780,7 +780,7 @@ public ChannelHolder( this.subscriptionCallbacks = subscriptionCallbacks; publishConfirmationListener = new PublisherConfirmationListener( options.isEnablePublisherConfirmation(), - options.getMaxInflightRequests() + options.getMaxInflightRequestsBytes() ); } @@ -842,7 +842,6 @@ public void retryingPublishWithLock(ConnectionManagerConfiguration configuration publishConfirmationListener.put(msgSeq, currentPayload); if (publishConfirmationListener.isNoConfirmationWillBeReceived()) { // will drain message to queue on next iteration - publishConfirmationListener.remove(msgSeq); continue; } currentPayload.publish(channel); @@ -854,9 +853,6 @@ public void retryingPublishWithLock(ConnectionManagerConfiguration configuration var recoveryDelay = currentValue.getDelay(); LOGGER.warn("Retrying publishing #{}, waiting for {}ms. Reason: {}", currentValue.getTryNumber(), recoveryDelay, e); TimeUnit.MILLISECONDS.sleep(recoveryDelay); - // cleanup after failure - publishConfirmationListener.remove(msgSeq); - redeliveryQueue.addFirst(currentPayload); // We should not recover the channel if its connection is closed // If we do that the channel will be also auto recovered by RabbitMQ client @@ -866,6 +862,10 @@ public void retryingPublishWithLock(ConnectionManagerConfiguration configuration // so we should redeliver all inflight requests publishConfirmationListener.transferUnconfirmedTo(redeliveryQueue); tempChannel = recreateChannel(); + } else { + // cleanup after failure + publishConfirmationListener.remove(msgSeq); + redeliveryQueue.addFirst(currentPayload); } } } @@ -1098,6 +1098,10 @@ private PublicationHolder(ChannelPublisher publisher, byte[] payload) { this.payload = payload; } + int size() { + return payload.length; + } + boolean isConfirmed() { return confirmed; } @@ -1121,17 +1125,20 @@ private static class PublisherConfirmationListener implements ConfirmListener { private final Condition allMessagesConfirmed = lock.writeLock().newCondition(); @GuardedBy("lock") private final NavigableMap inflightRequests = new TreeMap<>(); - private final int maxInflightRequests; + private final int maxInflightRequestsBytes; private final boolean enablePublisherConfirmation; private final boolean hasLimit; - private volatile boolean noConfirmationWillBeReceived; + @GuardedBy("lock") + private boolean noConfirmationWillBeReceived; + @GuardedBy("lock") + private int inflightBytes; - private PublisherConfirmationListener(boolean enablePublisherConfirmation, int maxInflightRequests) { - if (maxInflightRequests <= 0 && maxInflightRequests != ConnectionManagerConfiguration.NO_LIMIT_INFLIGHT_REQUESTS) { - throw new IllegalArgumentException("invalid maxInflightRequests: " + maxInflightRequests); + private PublisherConfirmationListener(boolean enablePublisherConfirmation, int maxInflightRequestsBytes) { + if (maxInflightRequestsBytes <= 0 && maxInflightRequestsBytes != ConnectionManagerConfiguration.NO_LIMIT_INFLIGHT_REQUESTS) { + throw new IllegalArgumentException("invalid maxInflightRequests: " + maxInflightRequestsBytes); } - hasLimit = maxInflightRequests != ConnectionManagerConfiguration.NO_LIMIT_INFLIGHT_REQUESTS; - this.maxInflightRequests = maxInflightRequests; + hasLimit = maxInflightRequestsBytes != ConnectionManagerConfiguration.NO_LIMIT_INFLIGHT_REQUESTS; + this.maxInflightRequestsBytes = maxInflightRequestsBytes; this.enablePublisherConfirmation = enablePublisherConfirmation; } @@ -1141,14 +1148,26 @@ public void put(long deliveryTag, PublicationHolder payload) throws InterruptedE } lock.writeLock().lock(); try { - // there is only one thread at a time that tries to publish message - // so it is safe to check the limit only once - if (hasLimit && inflightRequests.size() >= maxInflightRequests) { - LOGGER.warn("blocking because inflight requests size is above limit {} for publication channel", maxInflightRequests); - hasSpaceToWriteCondition.await(); - LOGGER.info("unblocking because inflight requests size is below limit {} for publication channel", maxInflightRequests); + int payloadSize = payload.size(); + if (hasLimit) { + int newSize = inflightBytes + payloadSize; + if (newSize > maxInflightRequestsBytes) { + LOGGER.warn("blocking because {} inflight requests bytes size is above limit {} bytes for publication channel", + newSize, maxInflightRequestsBytes); + do { + hasSpaceToWriteCondition.await(); + newSize = inflightBytes + payloadSize; + } while (newSize > maxInflightRequestsBytes && !noConfirmationWillBeReceived); + if (noConfirmationWillBeReceived) { + LOGGER.warn("unblocking because no confirmation will be received and inflight request size will not change"); + } else { + LOGGER.info("unblocking because {} inflight requests bytes size is below limit {} bytes for publication channel", + newSize, maxInflightRequestsBytes); + } + } } inflightRequests.put(deliveryTag, payload); + inflightBytes += payloadSize; } finally { lock.writeLock().unlock(); } @@ -1162,6 +1181,7 @@ public void remove(long deliveryTag) { public void handleAck(long deliveryTag, boolean multiple) { LOGGER.trace("Delivery ack received for tag {} (multiple:{})", deliveryTag, multiple); removeInflightRequests(deliveryTag, multiple); + LOGGER.trace("Delivery ack processed for tag {} (multiple:{})", deliveryTag, multiple); } @Override @@ -1170,6 +1190,7 @@ public void handleNack(long deliveryTag, boolean multiple) { // we cannot do match with nack because this is an internal error in rabbitmq // we can try to republish the message but there is no guarantee that the message will be accepted removeInflightRequests(deliveryTag, multiple); + LOGGER.warn("Delivery nack processed for tag {} (multiple:{})", deliveryTag, multiple); } private void removeInflightRequests(long deliveryTag, boolean multiple) { @@ -1178,9 +1199,13 @@ private void removeInflightRequests(long deliveryTag, boolean multiple) { } lock.writeLock().lock(); try { - int initialSize = inflightRequests.size(); + int currentSize = inflightBytes; if (multiple) { - inflightRequests.headMap(deliveryTag, true).clear(); + Map headMap = inflightRequests.headMap(deliveryTag, true); + for (Map.Entry entry : headMap.entrySet()) { + currentSize -= entry.getValue().size(); + } + headMap.clear(); } else { long oldestPublication = Objects.requireNonNullElse(inflightRequests.firstKey(), deliveryTag); if (oldestPublication == deliveryTag) { @@ -1195,6 +1220,7 @@ private void removeInflightRequests(long deliveryTag, boolean multiple) { if (key > deliveryTag && !holder.isConfirmed()) { break; } + currentSize -= holder.size(); tailIterator.remove(); } } else { @@ -1206,12 +1232,14 @@ private void removeInflightRequests(long deliveryTag, boolean multiple) { } } } + if (inflightBytes != currentSize) { + inflightBytes = currentSize; + hasSpaceToWriteCondition.signalAll(); + } if (inflightRequests.isEmpty()) { + inflightBytes = 0; allMessagesConfirmed.signalAll(); } - if (inflightRequests.size() != initialSize) { - hasSpaceToWriteCondition.signalAll(); - } } finally { lock.writeLock().unlock(); } @@ -1244,6 +1272,7 @@ public void transferUnconfirmedTo(Deque redelivery) { redelivery.addFirst(payload); } inflightRequests.clear(); + inflightBytes = 0; hasSpaceToWriteCondition.signalAll(); allMessagesConfirmed.signalAll(); noConfirmationWillBeReceived = false; @@ -1270,6 +1299,7 @@ public boolean isNoConfirmationWillBeReceived() { public void noConfirmationWillBeReceived() { lock.writeLock().lock(); try { + LOGGER.warn("Publication listener was notified that no confirmations will be received"); noConfirmationWillBeReceived = true; // we need to unlock possible locked publisher so it can check that nothing will be confirmed // and retry the publication diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/configuration/RabbitMQConfiguration.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/configuration/RabbitMQConfiguration.kt index 9396edec..74db30a2 100644 --- a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/configuration/RabbitMQConfiguration.kt +++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/configuration/RabbitMQConfiguration.kt @@ -44,8 +44,8 @@ data class ConnectionManagerConfiguration( val workingThreads: Int = 1, val confirmationTimeout: Duration = Duration.ofMinutes(5), val enablePublisherConfirmation: Boolean = false, - // Default value is taken based on measurement done in ConnectionManualBenchmark class - val maxInflightPublications: Int = 200, + // Default value 50MB is taken based on measurement done in ConnectionManualBenchmark class + val maxInflightPublicationsBytes: Int = 50 * 1024 * 1024, val heartbeatIntervalSeconds: Int = DEFAULT_HB_INTERVAL_SECONDS, ) : Configuration() { init { diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManualBenchmark.kt b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManualBenchmark.kt index cfcb22a4..2f8fa25a 100644 --- a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManualBenchmark.kt +++ b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManualBenchmark.kt @@ -46,7 +46,7 @@ object ConnectionManualBenchmark { prefetchCount = 1000, confirmationTimeout = Duration.ofSeconds(5), enablePublisherConfirmation = true, - maxInflightPublications = 100, + maxInflightPublicationsBytes = 1024 * 1024 * 25, ) ) @@ -57,7 +57,7 @@ object ConnectionManualBenchmark { prefetchCount = 100, confirmationTimeout = Duration.ofSeconds(5), enablePublisherConfirmation = true, - maxInflightPublications = 200, + maxInflightPublicationsBytes = 1024 * 1024 * 1, ) ) } @@ -69,7 +69,7 @@ object ConnectionManualBenchmark { prefetchCount = 100, confirmationTimeout = Duration.ofSeconds(5), enablePublisherConfirmation = false, - maxInflightPublications = -1, + maxInflightPublicationsBytes = -1, ) ) } diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt index 0eca6f62..da536e51 100644 --- a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt +++ b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt @@ -53,6 +53,7 @@ import org.testcontainers.utility.MountableFile import java.io.IOException import java.time.Duration import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.CompletableFuture import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger @@ -74,13 +75,14 @@ class TestConnectionManager { LOGGER.info { "Started with port ${rabbit.amqpPort}" } val messagesCount = 10 val countDown = CountDownLatch(messagesCount) + val messageSizeBytes = 7 createConnectionManager( rabbit, ConnectionManagerConfiguration( subscriberName = "test", prefetchCount = DEFAULT_PREFETCH_COUNT, confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT, enablePublisherConfirmation = true, - maxInflightPublications = 5, + maxInflightPublicationsBytes = 5 * messageSizeBytes, heartbeatIntervalSeconds = 1, minConnectionRecoveryTimeout = 2000, maxConnectionRecoveryTimeout = 2000, @@ -104,6 +106,7 @@ class TestConnectionManager { } + var future: CompletableFuture<*>? = null repeat(messagesCount) { index -> if (index == 1) { // delay should allow ack for the first message be received @@ -119,16 +122,22 @@ class TestConnectionManager { // So, we will have to deal with it on the consumer side rabbit.executeInContainerWithLogging("ifconfig", "eth0", "down") } else if (index == 4) { - // More than 2 HB will be missed - // This is enough for rabbitmq server to understand the connection is lost - Thread.sleep(4_000) - // enabling network interface back - rabbit.executeInContainerWithLogging("ifconfig", "eth0", "up") + future = CompletableFuture.supplyAsync { + // Interface is unblock in separate thread to emulate more realistic scenario + + // More than 2 HB will be missed + // This is enough for rabbitmq server to understand the connection is lost + Thread.sleep(3_000) + // enabling network interface back + rabbit.executeInContainerWithLogging("ifconfig", "eth0", "up") + } } manager.basicPublish(exchange, routingKey, null, "Hello $index".toByteArray(Charsets.UTF_8)) } - countDown.assertComplete("Not all messages were received: $receivedMessages") + future?.get(30, TimeUnit.SECONDS) + + countDown.assertComplete { "Not all messages were received: $receivedMessages" } assertEquals( (0 until messagesCount).map { "Hello $it" @@ -959,12 +968,16 @@ class TestConnectionManager { } private fun CountDownLatch.assertComplete(message: String) { + assertComplete { message } + } + + private fun CountDownLatch.assertComplete(messageSupplier: () -> String) { assertTrue( await( 1L, TimeUnit.SECONDS ) - ) { "$message, actual count: $count" } + ) { "${messageSupplier()}, actual count: $count" } } private fun assertTarget(target: T, timeout: Long = 1_000, message: String, func: () -> T) {