From 8190ccf402c8bcce60503f1e9b98f6eca66987df Mon Sep 17 00:00:00 2001
From: Oleg
Date: Wed, 22 May 2024 18:50:05 +0400
Subject: [PATCH 01/16] Add publishing confirmation to redeliver messages that
were not confirmed by broker.
The implementation does not handle cases when message in the middle was not confirmed. Unfortunately, there is not much we can do here
---
.github/workflows/integration-tests.yml | 26 ++
.../connection/ConnectionManager.java | 364 ++++++++++++++++--
.../configuration/RabbitMQConfiguration.kt | 10 +-
.../NotificationEventBatchSender.kt | 1 +
.../connection/TestConnectionManager.kt | 129 ++++++-
5 files changed, 490 insertions(+), 40 deletions(-)
create mode 100644 .github/workflows/integration-tests.yml
diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml
new file mode 100644
index 000000000..7d3da3c08
--- /dev/null
+++ b/.github/workflows/integration-tests.yml
@@ -0,0 +1,26 @@
+name: "Run integration tests for common"
+
+on:
+ push:
+ branches:
+ - '*'
+
+jobs:
+ tests:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v4
+ - name: Set up JDK 'zulu' '11'
+ uses: actions/setup-java@v3
+ with:
+ distribution: 'zulu'
+ java-version: '11'
+ - name: Setup Gradle
+ uses: gradle/gradle-build-action@v2
+ - name: Build with Gradle
+ run: ./gradlew --info integrationTest
+ - uses: actions/upload-artifact@v3
+ if: failure()
+ with:
+ name: integration-test-results
+ path: build/reports/tests/integrationTest/
diff --git a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java
index aea3e5304..039f063ac 100644
--- a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java
+++ b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java
@@ -30,6 +30,7 @@
import com.rabbitmq.client.BlockedListener;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
@@ -56,11 +57,15 @@
import javax.annotation.concurrent.GuardedBy;
import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.NavigableMap;
import java.util.Objects;
import java.util.Collections;
+import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -70,8 +75,11 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
@@ -146,6 +154,10 @@ public ConnectionManager(@NotNull String connectionName, @NotNull RabbitMQConfig
factory.setConnectionTimeout(connectionManagerConfiguration.getConnectionTimeout());
}
+ if (connectionManagerConfiguration.getHeartbeatIntervalSeconds() > ConnectionManagerConfiguration.DEFAULT_HB_INTERVAL_SECONDS) {
+ factory.setRequestedHeartbeat(connectionManagerConfiguration.getHeartbeatIntervalSeconds());
+ }
+
factory.setExceptionHandler(new ExceptionHandler() {
@Override
public void handleUnexpectedConnectionDriverException(Connection conn, Throwable exception) {
@@ -357,16 +369,31 @@ public void close() {
}
LOGGER.info("Closing connection manager");
+ int closeTimeout = configuration.getConnectionCloseTimeout();
- for (ChannelHolder channelHolder: channelsByPin.values()) {
+ for (Map.Entry entry: channelsByPin.entrySet()) {
+ PinId id = entry.getKey();
+ ChannelHolder channelHolder = entry.getValue();
try {
+ if (channelHolder.hasUnconfirmedMessages()) {
+ if (channelHolder.noConfirmationWillBeReceived()) {
+ LOGGER.warn("Some messages were not confirmed by broken in channel {} and were not republished. Try to republish messages", id);
+ channelHolder.publishUnconfirmedMessages();
+ }
+ try {
+ channelHolder.awaitConfirmations(closeTimeout, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException ignored) {
+ }
+ }
+ if (channelHolder.hasUnconfirmedMessages()) {
+ LOGGER.error("RabbitMQ channel for pin {} has unconfirmed messages on close", id);
+ }
channelHolder.channel.abort();
} catch (IOException e) {
- LOGGER.error("Cannot close channel", e);
+ LOGGER.error("Cannot close channel for pin {}", id, e);
}
}
- int closeTimeout = configuration.getConnectionCloseTimeout();
if (connection.isOpen()) {
try {
connection.close(closeTimeout);
@@ -379,13 +406,14 @@ public void close() {
shutdownExecutor(channelChecker, closeTimeout, "channel-checker");
}
- public void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws InterruptedException {
+ public void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException, InterruptedException {
ChannelHolder holder = getOrCreateChannelFor(PinId.forRoutingKey(exchange, routingKey));
- holder.retryingPublishWithLock(channel -> channel.basicPublish(exchange, routingKey, props, body), configuration);
+ holder.retryingPublishWithLock(configuration, body,
+ (channel, payload) -> channel.basicPublish(exchange, routingKey, props, payload));
}
public String queueDeclare() throws IOException {
- ChannelHolder holder = new ChannelHolder(this::createChannel, this::waitForConnectionRecovery, configuration.getPrefetchCount());
+ ChannelHolder holder = new ChannelHolder(this::createChannel, this::waitForConnectionRecovery, configurationToOptions());
return holder.mapWithLock(channel -> {
String queue = channel.queueDeclare(
"", // queue name
@@ -472,6 +500,14 @@ boolean isAlive() {
return metrics.getLivenessMonitor().isEnabled();
}
+ private ChannelHolderOptions configurationToOptions() {
+ return new ChannelHolderOptions(
+ configuration.getPrefetchCount(),
+ configuration.getEnablePublisherConfirmation(),
+ configuration.getMaxInflightPublications()
+ );
+ }
+
private void basicCancel(Channel channel, String consumerTag) throws IOException {
channel.basicCancel(consumerTag);
}
@@ -511,14 +547,14 @@ public SubscriptionCallbacks(ManualAckDeliveryCallback deliverCallback, CancelCa
private ChannelHolder getOrCreateChannelFor(PinId pinId) {
return channelsByPin.computeIfAbsent(pinId, ignore -> {
LOGGER.trace("Creating channel holder for {}", pinId);
- return new ChannelHolder(this::createChannel, this::waitForConnectionRecovery, configuration.getPrefetchCount());
+ return new ChannelHolder(this::createChannel, this::waitForConnectionRecovery, configurationToOptions());
});
}
private ChannelHolder getOrCreateChannelFor(PinId pinId, SubscriptionCallbacks subscriptionCallbacks) {
return channelsByPin.computeIfAbsent(pinId, ignore -> {
LOGGER.trace("Creating channel holder with callbacks for {}", pinId);
- return new ChannelHolder(() -> createChannelWithOptionalRecovery(true), this::waitForConnectionRecovery, configuration.getPrefetchCount(), subscriptionCallbacks);
+ return new ChannelHolder(() -> createChannelWithOptionalRecovery(true), this::waitForConnectionRecovery, configurationToOptions(), subscriptionCallbacks);
});
}
@@ -679,11 +715,35 @@ public String toString() {
}
}
+ private static class ChannelHolderOptions {
+ private final int maxCount;
+ private final boolean enablePublisherConfirmation;
+ private final int maxInflightRequests;
+
+ private ChannelHolderOptions(int maxCount, boolean enablePublisherConfirmation, int maxInflightRequests) {
+ this.maxCount = maxCount;
+ this.enablePublisherConfirmation = enablePublisherConfirmation;
+ this.maxInflightRequests = maxInflightRequests;
+ }
+
+ public int getMaxCount() {
+ return maxCount;
+ }
+
+ public boolean isEnablePublisherConfirmation() {
+ return enablePublisherConfirmation;
+ }
+
+ public int getMaxInflightRequests() {
+ return maxInflightRequests;
+ }
+ }
+
private static class ChannelHolder {
private final Lock lock = new ReentrantLock();
private final Supplier supplier;
private final BiConsumer reconnectionChecker;
- private final int maxCount;
+ private final ChannelHolderOptions options;
private final SubscriptionCallbacks subscriptionCallbacks;
@GuardedBy("lock")
private int pending;
@@ -693,30 +753,47 @@ private static class ChannelHolder {
private Channel channel;
private final Lock subscribingLock = new ReentrantLock();
@GuardedBy("subscribingLock")
- private boolean isSubscribed = false;
+ private boolean isSubscribed;
+ @GuardedBy("lock")
+ private Deque redeliveryQueue = new ArrayDeque<>();
+
+ private final PublisherConfirmationListener publishConfirmationListener;
public ChannelHolder(
Supplier supplier,
BiConsumer reconnectionChecker,
- int maxCount
+ ChannelHolderOptions options
) {
- this.supplier = Objects.requireNonNull(supplier, "'Supplier' parameter");
- this.reconnectionChecker = Objects.requireNonNull(reconnectionChecker, "'Reconnection checker' parameter");
- this.maxCount = maxCount;
- this.subscriptionCallbacks = null;
+ this(supplier, reconnectionChecker, options, null);
}
public ChannelHolder(
Supplier supplier,
BiConsumer reconnectionChecker,
- int maxCount,
+ ChannelHolderOptions options,
SubscriptionCallbacks subscriptionCallbacks
) {
this.supplier = Objects.requireNonNull(supplier, "'Supplier' parameter");
this.reconnectionChecker = Objects.requireNonNull(reconnectionChecker, "'Reconnection checker' parameter");
- this.maxCount = maxCount;
+ this.options = options;
this.subscriptionCallbacks = subscriptionCallbacks;
+ publishConfirmationListener = new PublisherConfirmationListener(
+ options.isEnablePublisherConfirmation(),
+ options.getMaxInflightRequests()
+ );
+ }
+
+ public boolean hasUnconfirmedMessages() {
+ return publishConfirmationListener.hasUnconfirmedMessages();
+ }
+
+ public boolean noConfirmationWillBeReceived() {
+ return publishConfirmationListener.isNoConfirmationWillBeReceived();
+ }
+
+ public boolean awaitConfirmations(long timeout, TimeUnit timeUnit) throws InterruptedException {
+ return publishConfirmationListener.awaitConfirmations(timeout, timeUnit);
}
public void withLock(ChannelConsumer consumer) throws IOException {
@@ -741,25 +818,53 @@ public void withLock(boolean waitForRecovery, ChannelConsumer consumer) throws I
}
}
- public void retryingPublishWithLock(ChannelConsumer consumer, ConnectionManagerConfiguration configuration) throws InterruptedException {
+ public void retryingPublishWithLock(ConnectionManagerConfiguration configuration, byte[] payload, ChannelPublisher publisher) throws IOException, InterruptedException {
lock.lock();
try {
Iterator iterator = configuration.createRetryingDelaySequence().iterator();
Channel tempChannel = getChannel(true);
+ // add current message to the end to unify logic for sending current and redelivered messages
+ redeliveryQueue.addLast(new PublicationHolder(publisher, payload));
while (true) {
- try {
- consumer.consume(tempChannel);
+ if (publishConfirmationListener.isNoConfirmationWillBeReceived()) {
+ LOGGER.warn("Connection was closed on channel. No delivery confirmation will be received. Drain message to redelivery");
+ publishConfirmationListener.transferUnconfirmedTo(redeliveryQueue);
+ }
+ PublicationHolder currentPayload = redeliveryQueue.pollFirst();
+ if (!redeliveryQueue.isEmpty()) {
+ LOGGER.warn("Redelivery queue size: {}", redeliveryQueue.size());
+ }
+ if (currentPayload == null) {
break;
+ }
+ long msgSeq = tempChannel.getNextPublishSeqNo();
+ try {
+ publishConfirmationListener.put(msgSeq, currentPayload);
+ if (publishConfirmationListener.isNoConfirmationWillBeReceived()) {
+ // will drain message to queue on next iteration
+ publishConfirmationListener.remove(msgSeq);
+ continue;
+ }
+ currentPayload.publish(channel);
+ if (redeliveryQueue.isEmpty()) {
+ break;
+ }
} catch (IOException | ShutdownSignalException e) {
var currentValue = iterator.next();
var recoveryDelay = currentValue.getDelay();
LOGGER.warn("Retrying publishing #{}, waiting for {}ms. Reason: {}", currentValue.getTryNumber(), recoveryDelay, e);
TimeUnit.MILLISECONDS.sleep(recoveryDelay);
+ // cleanup after failure
+ publishConfirmationListener.remove(msgSeq);
+ redeliveryQueue.addFirst(currentPayload);
// We should not recover the channel if its connection is closed
// If we do that the channel will be also auto recovered by RabbitMQ client
// during connection recovery, and we will get two new channels instead of one closed.
if (!tempChannel.isOpen() && tempChannel.getConnection().isOpen()) {
+ // once channel is recreated there won't be any confirmation received
+ // so we should redeliver all inflight requests
+ publishConfirmationListener.transferUnconfirmedTo(redeliveryQueue);
tempChannel = recreateChannel();
}
}
@@ -873,17 +978,17 @@ public T mapWithLock(ChannelMapper mapper) throws IOException {
/**
* Decreases the number of unacked messages.
- * If the number of unacked messages is less than {@link #maxCount}
+ * If the number of unacked messages is less than {@link ChannelHolderOptions#getMaxCount()}
* the onWaterMarkDecreased action will be called.
* The future created in {@link #acquireAndSubmitCheck(Supplier)} method will be canceled
* @param onWaterMarkDecreased
- * the action that will be executed when the number of unacked messages is less than {@link #maxCount} and there is a future to cancel
+ * the action that will be executed when the number of unacked messages is less than {@link ChannelHolderOptions#getMaxCount()} and there is a future to cancel
*/
public void release(Runnable onWaterMarkDecreased) {
lock.lock();
try {
pending--;
- if (pending < maxCount && check != null) {
+ if (pending < options.getMaxCount() && check != null) {
check.cancel(true);
check = null;
onWaterMarkDecreased.run();
@@ -895,11 +1000,11 @@ public void release(Runnable onWaterMarkDecreased) {
/**
* Increases the number of unacked messages.
- * If the number of unacked messages is higher than or equal to {@link #maxCount}
+ * If the number of unacked messages is higher than or equal to {@link ChannelHolderOptions#getMaxCount()}
* the futureSupplier will be invoked to create a task
- * that either will be executed or canceled when number of unacked message will be less that {@link #maxCount}
+ * that either will be executed or canceled when number of unacked message will be less that {@link ChannelHolderOptions#getMaxCount()}
* @param futureSupplier
- * creates a future to track the task that should be executed until the number of unacked message is not less than {@link #maxCount}
+ * creates a future to track the task that should be executed until the number of unacked message is not less than {@link ChannelHolderOptions#getMaxCount()}
*/
public void acquireAndSubmitCheck(Supplier> futureSupplier) {
lock.lock();
@@ -925,29 +1030,224 @@ public boolean isChannelSubscribed(Channel channel) {
public boolean reachedPendingLimit() {
lock.lock();
try {
- return pending >= maxCount;
+ return pending >= options.getMaxCount();
} finally {
lock.unlock();
}
}
- private Channel getChannel() {
+ private Channel getChannel() throws IOException {
return getChannel(true);
}
- private Channel recreateChannel() {
+ private Channel recreateChannel() throws IOException {
channel = supplier.get();
reconnectionChecker.accept(channel, true);
+ setupPublisherConfirmation(channel);
return channel;
}
- private Channel getChannel(boolean waitForRecovery) {
+ private Channel getChannel(boolean waitForRecovery) throws IOException {
if (channel == null) {
channel = supplier.get();
+ setupPublisherConfirmation(channel);
}
reconnectionChecker.accept(channel, waitForRecovery);
return channel;
}
+
+ private void setupPublisherConfirmation(Channel channel) throws IOException {
+ if (!options.isEnablePublisherConfirmation()) {
+ return;
+ }
+ channel.confirmSelect();
+ channel.addShutdownListener(cause -> {
+ publishConfirmationListener.noConfirmationWillBeReceived();
+ });
+ channel.addConfirmListener(publishConfirmationListener);
+ }
+
+ public void publishUnconfirmedMessages() throws IOException {
+ lock.lock();
+ try {
+ Channel channel = getChannel(false);
+ if (!channel.isOpen()) {
+ throw new IllegalStateException("channel is not opened to publish unconfirmed messages");
+ }
+ publishConfirmationListener.transferUnconfirmedTo(redeliveryQueue);
+ while (!redeliveryQueue.isEmpty()) {
+ PublicationHolder holder = redeliveryQueue.pollFirst();
+ if (holder == null) {
+ break;
+ }
+ holder.publish(channel);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ private static class PublicationHolder {
+ private final ChannelPublisher publisher;
+ private final byte[] payload;
+
+ private PublicationHolder(ChannelPublisher publisher, byte[] payload) {
+ this.publisher = publisher;
+ this.payload = payload;
+ }
+
+ public void publish(Channel channel) throws IOException {
+ publisher.publish(channel, payload);
+ }
+ }
+
+ private static class PublisherConfirmationListener implements ConfirmListener {
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+ private final Condition hasSpaceToWriteCondition = lock.writeLock().newCondition();
+ private final Condition allMessagesConfirmed = lock.writeLock().newCondition();
+ @GuardedBy("lock")
+ private final NavigableMap inflightRequests = new TreeMap<>();
+ private final int maxInflightRequests;
+ private final boolean enablePublisherConfirmation;
+ private final boolean hasLimit;
+ private volatile boolean noConfirmationWillBeReceived;
+
+ private PublisherConfirmationListener(boolean enablePublisherConfirmation, int maxInflightRequests) {
+ if (maxInflightRequests <= 0 && maxInflightRequests != ConnectionManagerConfiguration.NO_LIMIT_INFLIGHT_REQUESTS) {
+ throw new IllegalArgumentException("invalid maxInflightRequests: " + maxInflightRequests);
+ }
+ hasLimit = maxInflightRequests != ConnectionManagerConfiguration.NO_LIMIT_INFLIGHT_REQUESTS;
+ this.maxInflightRequests = maxInflightRequests;
+ this.enablePublisherConfirmation = enablePublisherConfirmation;
+ }
+
+ public void put(long deliveryTag, PublicationHolder payload) throws InterruptedException {
+ if (!enablePublisherConfirmation) {
+ return;
+ }
+ lock.writeLock().lock();
+ try {
+ // there is only one thread at a time that tries to publish message
+ // so it is safe to check the limit only once
+ if (hasLimit && inflightRequests.size() >= maxInflightRequests) {
+ LOGGER.warn("blocking because inflight requests size is above limit {} for publication channel", maxInflightRequests);
+ hasSpaceToWriteCondition.await();
+ LOGGER.info("unblocking because inflight requests size is below limit {} for publication channel", maxInflightRequests);
+ }
+ inflightRequests.put(deliveryTag, payload);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ public void remove(long deliveryTag) {
+ removeInflightRequests(deliveryTag, false);
+ }
+
+ @Override
+ public void handleAck(long deliveryTag, boolean multiple) {
+ LOGGER.trace("Delivery ack received for tag {} (multiple:{})", deliveryTag, multiple);
+ removeInflightRequests(deliveryTag, multiple);
+ }
+
+ @Override
+ public void handleNack(long deliveryTag, boolean multiple) {
+ LOGGER.warn("Delivery nack received for tag {} (multiple:{})", deliveryTag, multiple);
+ // we cannot do match with nack because this is an internal error in rabbitmq
+ // we can try to republish the message but there is no guarantee that the message will be accepted
+ removeInflightRequests(deliveryTag, multiple);
+ }
+
+ private void removeInflightRequests(long deliveryTag, boolean multiple) {
+ if (!enablePublisherConfirmation) {
+ return;
+ }
+ lock.writeLock().lock();
+ try {
+ if (multiple) {
+ inflightRequests.headMap(deliveryTag, true).clear();
+ } else {
+ inflightRequests.remove(deliveryTag);
+ }
+ if (inflightRequests.isEmpty()) {
+ allMessagesConfirmed.signalAll();
+ }
+ hasSpaceToWriteCondition.signalAll();
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ public boolean hasUnconfirmedMessages() {
+ lock.readLock().lock();
+ try {
+ return !inflightRequests.isEmpty();
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Adds unconfirmed messages to provided queue.
+ * Messages will be added to the beginning of the queue.
+ * As result, queue will have the oldest messages in the start and newest in the end.
+ *
+ * {@link #inflightRequests} map will be cleared.
+ *
+ * {@link #noConfirmationWillBeReceived} will be reset
+ * @param redelivery queue to transfer messages
+ */
+ public void transferUnconfirmedTo(Deque redelivery) {
+ lock.writeLock().lock();
+ try {
+ for (PublicationHolder payload : inflightRequests.descendingMap().values()) {
+ redelivery.addFirst(payload);
+ }
+ inflightRequests.clear();
+ hasSpaceToWriteCondition.signalAll();
+ allMessagesConfirmed.signalAll();
+ noConfirmationWillBeReceived = false;
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Indicates that no confirmation will be received for inflight requests.
+ * Method {@link #transferUnconfirmedTo(Deque)} should be called to reset the flag
+ * and obtain messages for redeliver
+ * @return true if channel was closed and no confirmation will be received
+ */
+ public boolean isNoConfirmationWillBeReceived() {
+ lock.readLock().lock();
+ try {
+ return noConfirmationWillBeReceived;
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ public void noConfirmationWillBeReceived() {
+ lock.writeLock().lock();
+ try {
+ noConfirmationWillBeReceived = true;
+ // we need to unlock possible locked publisher so it can check that nothing will be confirmed
+ // and retry the publication
+ hasSpaceToWriteCondition.signalAll();
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ public boolean awaitConfirmations(long timeout, TimeUnit timeUnit) throws InterruptedException {
+ lock.writeLock().lock();
+ try {
+ return inflightRequests.isEmpty() || allMessagesConfirmed.await(timeout, timeUnit);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
}
private interface ChannelMapper {
@@ -957,4 +1257,8 @@ private interface ChannelMapper {
private interface ChannelConsumer {
void consume(Channel channel) throws IOException;
}
+
+ private interface ChannelPublisher {
+ void publish(Channel channel, byte[] payload) throws IOException;
+ }
}
\ No newline at end of file
diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/configuration/RabbitMQConfiguration.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/configuration/RabbitMQConfiguration.kt
index 6c147e5c0..f6ff7a974 100644
--- a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/configuration/RabbitMQConfiguration.kt
+++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/configuration/RabbitMQConfiguration.kt
@@ -42,7 +42,10 @@ data class ConnectionManagerConfiguration(
val retryTimeDeviationPercent: Int = 10,
val messageRecursionLimit: Int = 100,
val workingThreads: Int = 1,
- val confirmationTimeout: Duration = Duration.ofMinutes(5)
+ val confirmationTimeout: Duration = Duration.ofMinutes(5),
+ val enablePublisherConfirmation: Boolean = false,
+ val maxInflightPublications: Int = NO_LIMIT_INFLIGHT_REQUESTS,
+ val heartbeatIntervalSeconds: Int = DEFAULT_HB_INTERVAL_SECONDS,
) : Configuration() {
init {
check(maxRecoveryAttempts > 0) { "expected 'maxRecoveryAttempts' greater than 0 but was $maxRecoveryAttempts" }
@@ -64,6 +67,11 @@ data class ConnectionManagerConfiguration(
))
}
}
+
+ companion object {
+ const val NO_LIMIT_INFLIGHT_REQUESTS: Int = -1
+ const val DEFAULT_HB_INTERVAL_SECONDS: Int = 0
+ }
}
data class RetryingDelay(val tryNumber: Int, val delay: Int) {
diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/notification/NotificationEventBatchSender.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/notification/NotificationEventBatchSender.kt
index 0b4cd41eb..a81fe662a 100644
--- a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/notification/NotificationEventBatchSender.kt
+++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/notification/NotificationEventBatchSender.kt
@@ -34,6 +34,7 @@ class NotificationEventBatchSender(
throw UnsupportedOperationException("Method is deprecated, please use constructor")
}
+ @Throws(IOException::class)
override fun send(message: EventBatch) {
try {
connectionManager.basicPublish(exchange, EMPTY_ROUTING_KEY, null, message.toByteArray())
diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt
index 7448ef191..0eca6f625 100644
--- a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt
+++ b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt
@@ -31,17 +31,13 @@ import com.exactpro.th2.common.util.getChannelsInfo
import com.exactpro.th2.common.util.getQueuesInfo
import com.exactpro.th2.common.util.getSubscribedChannelsCount
import com.exactpro.th2.common.util.putMessageInQueue
+import com.github.dockerjava.api.model.Capability
import com.rabbitmq.client.BuiltinExchangeType
-import java.time.Duration
-import java.util.concurrent.ArrayBlockingQueue
-import java.util.concurrent.CountDownLatch
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic.AtomicInteger
-import kotlin.concurrent.thread
import com.rabbitmq.client.CancelCallback
import com.rabbitmq.client.Delivery
import mu.KotlinLogging
import org.junit.jupiter.api.AfterAll
+import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertFalse
@@ -49,13 +45,101 @@ import org.junit.jupiter.api.Assertions.assertNotEquals
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.Test
+import org.slf4j.LoggerFactory
+import org.testcontainers.containers.GenericContainer
import org.testcontainers.containers.RabbitMQContainer
+import org.testcontainers.containers.output.Slf4jLogConsumer
import org.testcontainers.utility.MountableFile
import java.io.IOException
+import java.time.Duration
+import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicInteger
+import kotlin.concurrent.thread
import kotlin.test.assertFailsWith
@IntegrationTest
class TestConnectionManager {
+
+ @Test
+ fun `connection manager redelivers unconfirmed messages`() {
+ val routingKey = "routingKey1"
+ val queueName = "queue1"
+ val exchange = "test-exchange1"
+ rabbit
+ .let { rabbit ->
+ declareQueue(rabbit, queueName)
+ declareFanoutExchangeWithBinding(rabbit, exchange, queueName)
+ LOGGER.info { "Started with port ${rabbit.amqpPort}" }
+ val messagesCount = 10
+ val countDown = CountDownLatch(messagesCount)
+ createConnectionManager(
+ rabbit, ConnectionManagerConfiguration(
+ subscriberName = "test",
+ prefetchCount = DEFAULT_PREFETCH_COUNT,
+ confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT,
+ enablePublisherConfirmation = true,
+ maxInflightPublications = 5,
+ heartbeatIntervalSeconds = 1,
+ minConnectionRecoveryTimeout = 2000,
+ maxConnectionRecoveryTimeout = 2000,
+ // to avoid unexpected delays before recovery
+ retryTimeDeviationPercent = 0,
+ )
+ ).use { manager ->
+ val receivedMessages = linkedSetOf()
+ manager.basicConsume(queueName, { _, delivery, ack ->
+ val message = delivery.body.toString(Charsets.UTF_8)
+ LOGGER.info { "Received $message from ${delivery.envelope.routingKey}" }
+ if (receivedMessages.add(message)) {
+ // decrement only unique messages
+ countDown.countDown()
+ } else {
+ LOGGER.warn { "Duplicated $message for ${delivery.envelope.routingKey}" }
+ }
+ ack.confirm()
+ }) {
+ LOGGER.info { "Canceled $it" }
+ }
+
+
+ repeat(messagesCount) { index ->
+ if (index == 1) {
+ // delay should allow ack for the first message be received
+ Thread.sleep(100)
+ // Man pages:
+ // https://man7.org/linux/man-pages/man8/tc-netem.8.html
+ // https://man7.org/linux/man-pages/man8/ifconfig.8.html
+ //
+ // Here we try to emulate network outage to cause missing publication confirmations.
+ //
+ // In real life we will probably get duplicates in this case because
+ // rabbitmq does not provide exactly-once semantic.
+ // So, we will have to deal with it on the consumer side
+ rabbit.executeInContainerWithLogging("ifconfig", "eth0", "down")
+ } else if (index == 4) {
+ // More than 2 HB will be missed
+ // This is enough for rabbitmq server to understand the connection is lost
+ Thread.sleep(4_000)
+ // enabling network interface back
+ rabbit.executeInContainerWithLogging("ifconfig", "eth0", "up")
+ }
+ manager.basicPublish(exchange, routingKey, null, "Hello $index".toByteArray(Charsets.UTF_8))
+ }
+
+ countDown.assertComplete("Not all messages were received: $receivedMessages")
+ assertEquals(
+ (0 until messagesCount).map {
+ "Hello $it"
+ },
+ receivedMessages.toList(),
+ "messages received in unexpected order",
+ )
+ }
+ }
+ }
+
@Test
fun `connection manager reports unacked messages when confirmation timeout elapsed`() {
val routingKey = "routingKey1"
@@ -408,9 +492,10 @@ class TestConnectionManager {
fun `connection manager receives a messages after container restart`() {
val queueName = "queue5"
val amqpPort = 5672
- val container = object : RabbitMQContainer(RABBITMQ_IMAGE_NAME) {
- init { super.addFixedExposedPort(amqpPort, amqpPort) }
- }
+ val container = RabbitMQContainer(RABBITMQ_IMAGE_NAME)
+ .apply {
+ portBindings = listOf("$amqpPort:$amqpPort")
+ }
container
.use {
@@ -987,15 +1072,41 @@ class TestConnectionManager {
)
)
+ @AfterEach
+ fun cleanupRabbitMq() {
+ // cleanup is done to prevent queue name collision during test
+ rabbit.apply {
+ executeInContainerWithLogging("rabbitmqctl", "stop_app")
+ executeInContainerWithLogging("rabbitmqctl", "reset")
+ executeInContainerWithLogging("rabbitmqctl", "start_app")
+ }
+ }
+
companion object {
private val LOGGER = KotlinLogging.logger { }
private lateinit var rabbit: RabbitMQContainer
+ @JvmStatic
+ fun GenericContainer<*>.executeInContainerWithLogging(vararg command: String, exceptionOnExecutionError: Boolean = true) {
+ execInContainer(*command).also {
+ LOGGER.info { "Command: ${command.joinToString(separator = " ")}; out: ${it.stdout}; err: ${it.stderr}; exit code: ${it.exitCode}" }
+ if (exceptionOnExecutionError && it.exitCode != 0) {
+ throw IllegalStateException("Command ${command.joinToString()} exited with error code: ${it.exitCode}")
+ }
+ }
+ }
+
@BeforeAll
@JvmStatic
fun initRabbit() {
rabbit = RabbitMQContainer(RABBITMQ_IMAGE_NAME)
+ .withLogConsumer(Slf4jLogConsumer(LoggerFactory.getLogger("rabbitmq")))
+ .withCreateContainerCmdModifier {
+ it.hostConfig
+ // required to use tc tool to emulate network problems
+ ?.withCapAdd(Capability.NET_ADMIN)
+ }
rabbit.start()
}
From 4186701d111f7d5276068a8aa2602d2015bd2ff4 Mon Sep 17 00:00:00 2001
From: Oleg
Date: Sun, 26 May 2024 19:37:39 +0400
Subject: [PATCH 02/16] Fix grpc tests to run in CI
---
.../schema/grpc/router/impl/DefaultGrpcRouterTest.kt | 12 ++++++------
1 file changed, 6 insertions(+), 6 deletions(-)
diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/grpc/router/impl/DefaultGrpcRouterTest.kt b/src/test/kotlin/com/exactpro/th2/common/schema/grpc/router/impl/DefaultGrpcRouterTest.kt
index 943d7aa9b..eb611c386 100644
--- a/src/test/kotlin/com/exactpro/th2/common/schema/grpc/router/impl/DefaultGrpcRouterTest.kt
+++ b/src/test/kotlin/com/exactpro/th2/common/schema/grpc/router/impl/DefaultGrpcRouterTest.kt
@@ -825,7 +825,7 @@ internal class DefaultGrpcRouterTest {
ExceptionMetadata(
"UNAVAILABLE: io exception",
ExceptionMetadata(
- "Connection refused: localhost/127.0.0.1:8080",
+ "Connection refused: localhost/",
ExceptionMetadata(
"Connection refused"
)
@@ -851,7 +851,7 @@ internal class DefaultGrpcRouterTest {
ExceptionMetadata(
"UNAVAILABLE: io exception",
ExceptionMetadata(
- "Connection refused: localhost/127.0.0.1:8080",
+ "Connection refused: localhost/",
ExceptionMetadata(
"Connection refused"
)
@@ -880,7 +880,7 @@ internal class DefaultGrpcRouterTest {
ExceptionMetadata(
"UNAVAILABLE: io exception",
ExceptionMetadata(
- "Connection refused: localhost/127.0.0.1:8080",
+ "Connection refused: localhost/",
ExceptionMetadata(
"Connection refused"
)
@@ -928,9 +928,9 @@ internal class DefaultGrpcRouterTest {
exceptionMetadata: ExceptionMetadata,
path: List = emptyList()
) {
- assertEquals(
- exceptionMetadata.message,
- exception.message,
+ val expectedMessage = exceptionMetadata.message
+ assertTrue(
+ exception.message == expectedMessage || exception.message?.startsWith(expectedMessage ?: "null") == true,
"Message for exception: $exception, path: ${path.printAsStackTrace()}"
)
exceptionMetadata.suspended?.let { suspendMetadataList ->
From 09f49bdd5b25386cf5c023100c7e20be3d543738 Mon Sep 17 00:00:00 2001
From: Oleg
Date: Mon, 27 May 2024 15:27:25 +0400
Subject: [PATCH 03/16] Manual publication test
---
build.gradle | 8 +-
.../connection/ConnectionManualBenchmark.kt | 151 ++++++++++++++++++
2 files changed, 158 insertions(+), 1 deletion(-)
create mode 100644 src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManualBenchmark.kt
diff --git a/build.gradle b/build.gradle
index b1b5c0993..7e01b2129 100644
--- a/build.gradle
+++ b/build.gradle
@@ -206,4 +206,10 @@ tasks.named('extractIncludeProto') {enabled = false }
tasks.named('extractIncludeTestFixturesProto') {enabled = false }
compileTestJava.dependsOn.add('generateTestProto')
-processTestResources.dependsOn.add('generateTestProto')
\ No newline at end of file
+processTestResources.dependsOn.add('generateTestProto')
+
+tasks.register("publicationManualBench", JavaExec.class) {
+ mainClass.set('com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManualBenchmark')
+ classpath(sourceSets.test.runtimeClasspath)
+ dependsOn('testClasses')
+}
\ No newline at end of file
diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManualBenchmark.kt b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManualBenchmark.kt
new file mode 100644
index 000000000..cfcb22a46
--- /dev/null
+++ b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManualBenchmark.kt
@@ -0,0 +1,151 @@
+/*
+ * Copyright 2024 Exactpro (Exactpro Systems Limited)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.exactpro.th2.common.schema.message.impl.rabbitmq.connection
+
+import com.exactpro.th2.common.schema.message.ContainerConstants.RABBITMQ_IMAGE_NAME
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.ConnectionManagerConfiguration
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.RabbitMQConfiguration
+import mu.KotlinLogging
+import org.testcontainers.containers.RabbitMQContainer
+import java.time.Duration
+import java.util.concurrent.ThreadLocalRandom
+
+object ConnectionManualBenchmark {
+ private val LOGGER = KotlinLogging.logger {}
+ @JvmStatic
+ fun main(args: Array) {
+ RabbitMQContainer(RABBITMQ_IMAGE_NAME).use { container ->
+ val payload: ByteArray = ByteArray(256 * 1024).also {
+ ThreadLocalRandom.current().nextBytes(it)
+ }
+ container.start()
+ with(container) {
+ val queueName = "test-queue"
+ val exchangeName = "test-exchange"
+ execInContainer("rabbitmqadmin", "declare", "queue", "name=$queueName", "durable=false")
+ execInContainer("rabbitmqadmin", "declare", "exchange", "name=$exchangeName", "type=fanout")
+ execInContainer("rabbitmqadmin", "declare", "binding", "source=$exchangeName", "destination_type=queue", "destination=$queueName")
+
+ val consumer = createConnectionManager(
+ container,
+ ConnectionManagerConfiguration(
+ prefetchCount = 1000,
+ confirmationTimeout = Duration.ofSeconds(5),
+ enablePublisherConfirmation = true,
+ maxInflightPublications = 100,
+ )
+ )
+
+ val connectionManagerWithConfirmation = {
+ createConnectionManager(
+ container,
+ ConnectionManagerConfiguration(
+ prefetchCount = 100,
+ confirmationTimeout = Duration.ofSeconds(5),
+ enablePublisherConfirmation = true,
+ maxInflightPublications = 200,
+ )
+ )
+ }
+
+ val connectionManagerWithoutConfirmation = {
+ createConnectionManager(
+ container,
+ ConnectionManagerConfiguration(
+ prefetchCount = 100,
+ confirmationTimeout = Duration.ofSeconds(5),
+ enablePublisherConfirmation = false,
+ maxInflightPublications = -1,
+ )
+ )
+ }
+
+ consumer.use {
+ consumer.basicConsume(
+ queueName,
+ { _, _, ack -> ack.confirm() },
+ { LOGGER.warn { "Canceled" } },
+ )
+ val times = 5
+ val withConf = ArrayList(times)
+ val withoutConf = ArrayList(times)
+ repeat(times) {
+ // Avg for no confirmation: PT19.468S
+ withoutConf += measure("not confirmation", connectionManagerWithoutConfirmation, payload)
+
+ // Avg for confirmation: PT20.758S
+ withConf += measure("confirmation", connectionManagerWithConfirmation, payload)
+ }
+
+ fun List.avg(): Duration {
+ return map { it.toMillis() }.average().let { Duration.ofMillis(it.toLong()) }
+ }
+
+ LOGGER.info { "Avg for confirmation: ${withConf.avg()}" }
+ LOGGER.info { "Avg for no confirmation: ${withoutConf.avg()}" }
+ }
+ }
+ }
+ }
+
+ private fun measure(name: String, manager: () -> ConnectionManager, payload: ByteArray): Duration {
+ LOGGER.info("Measuring $name")
+ val start: Long
+ val sent: Long
+ manager().use { mgr ->
+ repeat(100) {
+ mgr.basicPublish(
+ "test-exchange",
+ "routing",
+ null,
+ payload,
+ )
+ }
+ LOGGER.info("Wait after warmup for $name")
+ Thread.sleep(1000)
+ LOGGER.info("Start measuring for $name")
+ start = System.currentTimeMillis()
+ repeat(100_000) {
+ mgr.basicPublish(
+ "test-exchange",
+ "routing",
+ null,
+ payload,
+ )
+ }
+ sent = System.currentTimeMillis()
+ }
+ val end = System.currentTimeMillis()
+ LOGGER.info("Sent $name in ${Duration.ofMillis(sent - start)}")
+ val duration = Duration.ofMillis(end - start)
+ LOGGER.info("Executed $name in $duration")
+ return duration
+ }
+
+ private fun createConnectionManager(container: RabbitMQContainer, configuration: ConnectionManagerConfiguration) =
+ ConnectionManager(
+ "test-connection",
+ RabbitMQConfiguration(
+ host = container.host,
+ vHost = "",
+ port = container.amqpPort,
+ username = container.adminUsername,
+ password = container.adminPassword,
+ ),
+ configuration
+ )
+}
\ No newline at end of file
From c2ca4bf01691de73bba8c59ddb76181ff80cb7b4 Mon Sep 17 00:00:00 2001
From: Oleg
Date: Mon, 27 May 2024 16:09:23 +0400
Subject: [PATCH 04/16] Delay publication removal until all older publications
are confirmed
---
.../connection/ConnectionManager.java | 38 ++++++++++++++++++-
1 file changed, 36 insertions(+), 2 deletions(-)
diff --git a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java
index 039f063ac..961404fe2 100644
--- a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java
+++ b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java
@@ -1091,12 +1091,25 @@ public void publishUnconfirmedMessages() throws IOException {
private static class PublicationHolder {
private final ChannelPublisher publisher;
private final byte[] payload;
+ private volatile boolean confirmed;
private PublicationHolder(ChannelPublisher publisher, byte[] payload) {
this.publisher = publisher;
this.payload = payload;
}
+ boolean isConfirmed() {
+ return confirmed;
+ }
+
+ void confirmed() {
+ confirmed = true;
+ }
+
+ void reset() {
+ confirmed = false;
+ }
+
public void publish(Channel channel) throws IOException {
publisher.publish(channel, payload);
}
@@ -1165,15 +1178,35 @@ private void removeInflightRequests(long deliveryTag, boolean multiple) {
}
lock.writeLock().lock();
try {
+ int initialSize = inflightRequests.size();
if (multiple) {
inflightRequests.headMap(deliveryTag, true).clear();
} else {
- inflightRequests.remove(deliveryTag);
+ var head = inflightRequests.headMap(deliveryTag, true);
+ // received the confirmation for oldest publication
+ // check all earlier confirmation that were confirmed but not removed
+ if (head.size() == 1) {
+ head.clear();
+ Iterator> tailIterator =
+ inflightRequests.tailMap(deliveryTag, false).entrySet().iterator();
+ while (tailIterator.hasNext()) {
+ if (!tailIterator.next().getValue().isConfirmed()) {
+ break;
+ }
+ tailIterator.remove();
+ }
+ } else if (!head.isEmpty()) {
+ // this is not the oldest publication
+ // mark as confirm but wait for oldest to be confirmed
+ head.lastEntry().getValue().confirmed();
+ }
}
if (inflightRequests.isEmpty()) {
allMessagesConfirmed.signalAll();
}
- hasSpaceToWriteCondition.signalAll();
+ if (inflightRequests.size() != initialSize) {
+ hasSpaceToWriteCondition.signalAll();
+ }
} finally {
lock.writeLock().unlock();
}
@@ -1202,6 +1235,7 @@ public void transferUnconfirmedTo(Deque redelivery) {
lock.writeLock().lock();
try {
for (PublicationHolder payload : inflightRequests.descendingMap().values()) {
+ payload.reset();
redelivery.addFirst(payload);
}
inflightRequests.clear();
From c6d698534b4e94e058c400c4cf37f54ddbed9408 Mon Sep 17 00:00:00 2001
From: Oleg
Date: Mon, 27 May 2024 17:19:29 +0400
Subject: [PATCH 05/16] Change handling of unordered acks
---
.../connection/ConnectionManager.java | 23 +++++++++++--------
1 file changed, 14 insertions(+), 9 deletions(-)
diff --git a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java
index 961404fe2..dce147ca7 100644
--- a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java
+++ b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java
@@ -1182,23 +1182,28 @@ private void removeInflightRequests(long deliveryTag, boolean multiple) {
if (multiple) {
inflightRequests.headMap(deliveryTag, true).clear();
} else {
- var head = inflightRequests.headMap(deliveryTag, true);
- // received the confirmation for oldest publication
- // check all earlier confirmation that were confirmed but not removed
- if (head.size() == 1) {
- head.clear();
+ long oldestPublication = Objects.requireNonNullElse(inflightRequests.firstKey(), deliveryTag);
+ if (oldestPublication == deliveryTag) {
+ // received the confirmation for oldest publication
+ // check all earlier confirmation that were confirmed but not removed
Iterator> tailIterator =
- inflightRequests.tailMap(deliveryTag, false).entrySet().iterator();
+ inflightRequests.tailMap(deliveryTag, true).entrySet().iterator();
while (tailIterator.hasNext()) {
- if (!tailIterator.next().getValue().isConfirmed()) {
+ Map.Entry next = tailIterator.next();
+ long key = next.getKey();
+ PublicationHolder holder = next.getValue();
+ if (key > deliveryTag && !holder.isConfirmed()) {
break;
}
tailIterator.remove();
}
- } else if (!head.isEmpty()) {
+ } else {
// this is not the oldest publication
// mark as confirm but wait for oldest to be confirmed
- head.lastEntry().getValue().confirmed();
+ var holder = inflightRequests.get(deliveryTag);
+ if (holder != null) {
+ holder.confirmed();
+ }
}
}
if (inflightRequests.isEmpty()) {
From 6e7e8a4ef7261597c88bb6140e5d38a46a355888 Mon Sep 17 00:00:00 2001
From: Oleg
Date: Tue, 28 May 2024 16:26:20 +0400
Subject: [PATCH 06/16] Add jfr options
---
build.gradle | 2 ++
1 file changed, 2 insertions(+)
diff --git a/build.gradle b/build.gradle
index 7e01b2129..5b3cd75b0 100644
--- a/build.gradle
+++ b/build.gradle
@@ -212,4 +212,6 @@ tasks.register("publicationManualBench", JavaExec.class) {
mainClass.set('com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManualBenchmark')
classpath(sourceSets.test.runtimeClasspath)
dependsOn('testClasses')
+
+ jvmArgs('-XX:StartFlightRecording=duration=60s,settings=profile,filename=publishing-profile-record.jfr')
}
\ No newline at end of file
From 7828a5094a69802ae1ba08a6ec67a89b9bcefe65 Mon Sep 17 00:00:00 2001
From: Oleg
Date: Tue, 28 May 2024 16:33:52 +0400
Subject: [PATCH 07/16] Correct default value for inflight requests
---
.../impl/rabbitmq/configuration/RabbitMQConfiguration.kt | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/configuration/RabbitMQConfiguration.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/configuration/RabbitMQConfiguration.kt
index f6ff7a974..9396edec3 100644
--- a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/configuration/RabbitMQConfiguration.kt
+++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/configuration/RabbitMQConfiguration.kt
@@ -44,7 +44,8 @@ data class ConnectionManagerConfiguration(
val workingThreads: Int = 1,
val confirmationTimeout: Duration = Duration.ofMinutes(5),
val enablePublisherConfirmation: Boolean = false,
- val maxInflightPublications: Int = NO_LIMIT_INFLIGHT_REQUESTS,
+ // Default value is taken based on measurement done in ConnectionManualBenchmark class
+ val maxInflightPublications: Int = 200,
val heartbeatIntervalSeconds: Int = DEFAULT_HB_INTERVAL_SECONDS,
) : Configuration() {
init {
From a61c25ed912d5513ef1cfe7ad513317a9a9fd500 Mon Sep 17 00:00:00 2001
From: Oleg
Date: Tue, 28 May 2024 16:34:04 +0400
Subject: [PATCH 08/16] Add readme description
---
README.md | 11 ++++++++++-
1 file changed, 10 insertions(+), 1 deletion(-)
diff --git a/README.md b/README.md
index 5847e4aff..4510450a9 100644
--- a/README.md
+++ b/README.md
@@ -1,4 +1,4 @@
-# th2 common library (Java) (5.12.0)
+# th2 common library (Java) (5.13.0)
## Usage
@@ -511,6 +511,15 @@ dependencies {
## Release notes
+### 5.13.0-dev
+
++ Added functionality for publisher confirmations to mitigate network issues for message producers.
++ New parameters are added to connection manager configuration:
+ + enablePublisherConfirmation - enables publisher confirmation. `false` by default.
+ + maxInflightPublications - the max number of unconfirmed published messages per channel. `200`, by default.
+ + heartbeatIntervalSeconds - rabbitmq connection heartbeat interval in seconds.
+ `0` by default (that means the default interval will be set by the internal library used to communicate with RabbitMQ).
+
### 5.12.0-dev
+ Updated kubernetes-client: `6.12.1`
From 693b93f638578bff822f5390b7f13fd3b0ec73c0 Mon Sep 17 00:00:00 2001
From: Oleg
Date: Tue, 28 May 2024 17:52:56 +0400
Subject: [PATCH 09/16] Try to increase graceful shutdown timeout
---
.../th2/common/schema/grpc/router/impl/DefaultGrpcRouterTest.kt | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/grpc/router/impl/DefaultGrpcRouterTest.kt b/src/test/kotlin/com/exactpro/th2/common/schema/grpc/router/impl/DefaultGrpcRouterTest.kt
index eb611c386..b4da67ad3 100644
--- a/src/test/kotlin/com/exactpro/th2/common/schema/grpc/router/impl/DefaultGrpcRouterTest.kt
+++ b/src/test/kotlin/com/exactpro/th2/common/schema/grpc/router/impl/DefaultGrpcRouterTest.kt
@@ -962,7 +962,7 @@ internal class DefaultGrpcRouterTest {
private fun ExecutorService.shutdownGracefully() {
shutdown()
- if (!awaitTermination(1, TimeUnit.SECONDS)) {
+ if (!awaitTermination(30, TimeUnit.SECONDS)) {
shutdownNow()
error("'Executor' can't be stopped")
}
From 5d0ab4db57906473dd64fced26d3fb3ca2efec38 Mon Sep 17 00:00:00 2001
From: Oleg
Date: Thu, 30 May 2024 10:33:28 +0400
Subject: [PATCH 10/16] Upate action versions in integration test workflow
---
.github/workflows/integration-tests.yml | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml
index 7d3da3c08..c46e756f5 100644
--- a/.github/workflows/integration-tests.yml
+++ b/.github/workflows/integration-tests.yml
@@ -11,15 +11,15 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Set up JDK 'zulu' '11'
- uses: actions/setup-java@v3
+ uses: actions/setup-java@v4
with:
distribution: 'zulu'
java-version: '11'
- name: Setup Gradle
- uses: gradle/gradle-build-action@v2
+ uses: gradle/actions/setup-gradle@v3
- name: Build with Gradle
run: ./gradlew --info integrationTest
- - uses: actions/upload-artifact@v3
+ - uses: actions/upload-artifact@v4
if: failure()
with:
name: integration-test-results
From c3acdfe682d0ee87b8aa123b7cd47c9312133bb0 Mon Sep 17 00:00:00 2001
From: Oleg
Date: Thu, 30 May 2024 11:45:07 +0400
Subject: [PATCH 11/16] Correct baton class: guarantee that one thread cannot
put and get the same permit
---
.../grpc/router/impl/DefaultGrpcRouterTest.kt | 65 +++++++++++++++++--
1 file changed, 58 insertions(+), 7 deletions(-)
diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/grpc/router/impl/DefaultGrpcRouterTest.kt b/src/test/kotlin/com/exactpro/th2/common/schema/grpc/router/impl/DefaultGrpcRouterTest.kt
index b4da67ad3..17de2cd62 100644
--- a/src/test/kotlin/com/exactpro/th2/common/schema/grpc/router/impl/DefaultGrpcRouterTest.kt
+++ b/src/test/kotlin/com/exactpro/th2/common/schema/grpc/router/impl/DefaultGrpcRouterTest.kt
@@ -38,9 +38,14 @@ import io.grpc.stub.StreamObserver
import mu.KotlinLogging
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Assertions.assertNull
+import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Nested
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
+import org.junit.jupiter.api.extension.AfterTestExecutionCallback
+import org.junit.jupiter.api.extension.BeforeTestExecutionCallback
+import org.junit.jupiter.api.extension.ExtendWith
+import org.junit.jupiter.api.extension.ExtensionContext
import org.mockito.kotlin.argumentCaptor
import org.mockito.kotlin.mock
import org.mockito.kotlin.timeout
@@ -50,7 +55,6 @@ import org.mockito.kotlin.verifyNoMoreInteractions
import org.testcontainers.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder
import java.time.Duration
import java.time.Instant
-import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.ExecutionException
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
@@ -58,6 +62,8 @@ import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference
+import java.util.concurrent.locks.ReentrantLock
+import kotlin.concurrent.withLock
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertNotNull
@@ -65,8 +71,24 @@ import kotlin.test.assertTrue
private const val CANCEL_REASON = "test request is canceled"
+@ExtendWith(DefaultGrpcRouterTest.ExecutionListener::class)
@IntegrationTest
internal class DefaultGrpcRouterTest {
+ /**
+ * Listener adds additional logging to help understanding from the stdout where test starts and finishes
+ */
+ internal class ExecutionListener : BeforeTestExecutionCallback, AfterTestExecutionCallback {
+ private val logger = KotlinLogging.logger { }
+ override fun beforeTestExecution(ctx: ExtensionContext) {
+ logger.info { "Execution for test '${ctx.testMethod.map { it.name }.orElse("unknown")}' started" }
+ }
+
+ override fun afterTestExecution(ctx: ExtensionContext) {
+ logger.info { "Execution for test '${ctx.testMethod.map { it.name }.orElse("unknown")}' is finished" }
+ }
+
+ }
+
@IntegrationTest
abstract inner class AbstractGrpcRouterTest {
private val grpcRouterClient = DefaultGrpcRouter()
@@ -415,6 +437,7 @@ internal class DefaultGrpcRouterTest {
)
}
+ @Disabled("this test isn't relevant for async request")
@Test
override fun `interrupt thread during retry request`() {
// this test isn't relevant for async request
@@ -751,6 +774,7 @@ internal class DefaultGrpcRouterTest {
)
}
+ @Disabled("this test isn't relevant for async request")
@Test
override fun `interrupt thread during retry request`() {
// this test isn't relevant for async request
@@ -915,7 +939,7 @@ internal class DefaultGrpcRouterTest {
shutdownNow()
} else {
shutdown()
- if (!awaitTermination(5, TimeUnit.SECONDS)) {
+ if (!awaitTermination(60, TimeUnit.SECONDS)) {
shutdownNow()
error("'Server' can't be closed")
}
@@ -974,10 +998,25 @@ internal class DefaultGrpcRouterTest {
val suspended: List? = null
)
+ /**
+ * Baton class can help to synchronize two threads (only **two**).
+ *
+ * Baton class was migrated from using queue with size 1 to lock and conditions for synchronization.
+ *
+ * The implementation with queue did not provide guarantees that the same thread won't get the permit and put it back
+ * while another thread was still waiting for a free space in the queue.
+ *
+ * Using lock and conditions guarantees that the permit won't be given unless somebody is waiting for that permit.
+ * And vise-versa, nobody can get a permit unless somebody tries to put the permit
+ */
internal class Baton(
private val name: String
) {
- private val queue = ArrayBlockingQueue(1).apply { put(Any()) }
+ @Volatile
+ private var permits = 0
+ private val lock = ReentrantLock()
+ private val givenCondition = lock.newCondition()
+ private val getCondition = lock.newCondition()
fun giveAndGet(giveComment: String = "", getComment: String = "") {
give(giveComment)
@@ -986,13 +1025,25 @@ internal class DefaultGrpcRouterTest {
fun give(comment: String = "") {
K_LOGGER.info { "'$name' baton is giving by [${Thread.currentThread().name}] - $comment" }
- queue.put(Any())
+ lock.withLock {
+ if (permits == 0) {
+ getCondition.await()
+ }
+ permits += 1
+ givenCondition.signal()
+ }
K_LOGGER.info { "'$name' baton is given by [${Thread.currentThread().name}] - $comment" }
}
fun get(comment: String = "") {
K_LOGGER.info { "'$name' baton is getting by [${Thread.currentThread().name}] - $comment" }
- queue.poll()
+ lock.withLock {
+ getCondition.signal()
+ permits -= 1
+ if (permits < 0) {
+ givenCondition.await()
+ }
+ }
K_LOGGER.info { "'$name' baton is got by [${Thread.currentThread().name}] - $comment" }
}
}
@@ -1009,8 +1060,8 @@ internal class DefaultGrpcRouterTest {
}.build())
responseBaton?.let {
- Thread.sleep(1_000)
it.give("response sent")
+ Thread.sleep(1_000)
}
if (complete) {
@@ -1027,8 +1078,8 @@ internal class DefaultGrpcRouterTest {
}
responseBaton?.let {
- Thread.sleep(1_000)
it.give("response sent")
+ Thread.sleep(1_000)
}
if (complete) {
From 8c205221ce1f9e18213a34279ced940209ebc074 Mon Sep 17 00:00:00 2001
From: Oleg
Date: Sun, 2 Jun 2024 17:51:48 +0400
Subject: [PATCH 12/16] Migrate inflight limit from messages count to payload
size in bytes
---
README.md | 2 +-
.../connection/ConnectionManager.java | 88 +++++++++++++------
.../configuration/RabbitMQConfiguration.kt | 4 +-
.../connection/ConnectionManualBenchmark.kt | 6 +-
.../connection/TestConnectionManager.kt | 29 ++++--
5 files changed, 86 insertions(+), 43 deletions(-)
diff --git a/README.md b/README.md
index 4510450a9..c3b35bfb9 100644
--- a/README.md
+++ b/README.md
@@ -516,7 +516,7 @@ dependencies {
+ Added functionality for publisher confirmations to mitigate network issues for message producers.
+ New parameters are added to connection manager configuration:
+ enablePublisherConfirmation - enables publisher confirmation. `false` by default.
- + maxInflightPublications - the max number of unconfirmed published messages per channel. `200`, by default.
+ + maxInflightPublicationsBytes - the max number of unconfirmed published messages per channel. `52428800` (50 MB), by default.
+ heartbeatIntervalSeconds - rabbitmq connection heartbeat interval in seconds.
`0` by default (that means the default interval will be set by the internal library used to communicate with RabbitMQ).
diff --git a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java
index dce147ca7..01a3113c4 100644
--- a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java
+++ b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java
@@ -504,7 +504,7 @@ private ChannelHolderOptions configurationToOptions() {
return new ChannelHolderOptions(
configuration.getPrefetchCount(),
configuration.getEnablePublisherConfirmation(),
- configuration.getMaxInflightPublications()
+ configuration.getMaxInflightPublicationsBytes()
);
}
@@ -718,12 +718,12 @@ public String toString() {
private static class ChannelHolderOptions {
private final int maxCount;
private final boolean enablePublisherConfirmation;
- private final int maxInflightRequests;
+ private final int maxInflightRequestsBytes;
- private ChannelHolderOptions(int maxCount, boolean enablePublisherConfirmation, int maxInflightRequests) {
+ private ChannelHolderOptions(int maxCount, boolean enablePublisherConfirmation, int maxInflightRequestsBytes) {
this.maxCount = maxCount;
this.enablePublisherConfirmation = enablePublisherConfirmation;
- this.maxInflightRequests = maxInflightRequests;
+ this.maxInflightRequestsBytes = maxInflightRequestsBytes;
}
public int getMaxCount() {
@@ -734,8 +734,8 @@ public boolean isEnablePublisherConfirmation() {
return enablePublisherConfirmation;
}
- public int getMaxInflightRequests() {
- return maxInflightRequests;
+ public int getMaxInflightRequestsBytes() {
+ return maxInflightRequestsBytes;
}
}
@@ -780,7 +780,7 @@ public ChannelHolder(
this.subscriptionCallbacks = subscriptionCallbacks;
publishConfirmationListener = new PublisherConfirmationListener(
options.isEnablePublisherConfirmation(),
- options.getMaxInflightRequests()
+ options.getMaxInflightRequestsBytes()
);
}
@@ -842,7 +842,6 @@ public void retryingPublishWithLock(ConnectionManagerConfiguration configuration
publishConfirmationListener.put(msgSeq, currentPayload);
if (publishConfirmationListener.isNoConfirmationWillBeReceived()) {
// will drain message to queue on next iteration
- publishConfirmationListener.remove(msgSeq);
continue;
}
currentPayload.publish(channel);
@@ -854,9 +853,6 @@ public void retryingPublishWithLock(ConnectionManagerConfiguration configuration
var recoveryDelay = currentValue.getDelay();
LOGGER.warn("Retrying publishing #{}, waiting for {}ms. Reason: {}", currentValue.getTryNumber(), recoveryDelay, e);
TimeUnit.MILLISECONDS.sleep(recoveryDelay);
- // cleanup after failure
- publishConfirmationListener.remove(msgSeq);
- redeliveryQueue.addFirst(currentPayload);
// We should not recover the channel if its connection is closed
// If we do that the channel will be also auto recovered by RabbitMQ client
@@ -866,6 +862,10 @@ public void retryingPublishWithLock(ConnectionManagerConfiguration configuration
// so we should redeliver all inflight requests
publishConfirmationListener.transferUnconfirmedTo(redeliveryQueue);
tempChannel = recreateChannel();
+ } else {
+ // cleanup after failure
+ publishConfirmationListener.remove(msgSeq);
+ redeliveryQueue.addFirst(currentPayload);
}
}
}
@@ -1098,6 +1098,10 @@ private PublicationHolder(ChannelPublisher publisher, byte[] payload) {
this.payload = payload;
}
+ int size() {
+ return payload.length;
+ }
+
boolean isConfirmed() {
return confirmed;
}
@@ -1121,17 +1125,20 @@ private static class PublisherConfirmationListener implements ConfirmListener {
private final Condition allMessagesConfirmed = lock.writeLock().newCondition();
@GuardedBy("lock")
private final NavigableMap inflightRequests = new TreeMap<>();
- private final int maxInflightRequests;
+ private final int maxInflightRequestsBytes;
private final boolean enablePublisherConfirmation;
private final boolean hasLimit;
- private volatile boolean noConfirmationWillBeReceived;
+ @GuardedBy("lock")
+ private boolean noConfirmationWillBeReceived;
+ @GuardedBy("lock")
+ private int inflightBytes;
- private PublisherConfirmationListener(boolean enablePublisherConfirmation, int maxInflightRequests) {
- if (maxInflightRequests <= 0 && maxInflightRequests != ConnectionManagerConfiguration.NO_LIMIT_INFLIGHT_REQUESTS) {
- throw new IllegalArgumentException("invalid maxInflightRequests: " + maxInflightRequests);
+ private PublisherConfirmationListener(boolean enablePublisherConfirmation, int maxInflightRequestsBytes) {
+ if (maxInflightRequestsBytes <= 0 && maxInflightRequestsBytes != ConnectionManagerConfiguration.NO_LIMIT_INFLIGHT_REQUESTS) {
+ throw new IllegalArgumentException("invalid maxInflightRequests: " + maxInflightRequestsBytes);
}
- hasLimit = maxInflightRequests != ConnectionManagerConfiguration.NO_LIMIT_INFLIGHT_REQUESTS;
- this.maxInflightRequests = maxInflightRequests;
+ hasLimit = maxInflightRequestsBytes != ConnectionManagerConfiguration.NO_LIMIT_INFLIGHT_REQUESTS;
+ this.maxInflightRequestsBytes = maxInflightRequestsBytes;
this.enablePublisherConfirmation = enablePublisherConfirmation;
}
@@ -1141,14 +1148,26 @@ public void put(long deliveryTag, PublicationHolder payload) throws InterruptedE
}
lock.writeLock().lock();
try {
- // there is only one thread at a time that tries to publish message
- // so it is safe to check the limit only once
- if (hasLimit && inflightRequests.size() >= maxInflightRequests) {
- LOGGER.warn("blocking because inflight requests size is above limit {} for publication channel", maxInflightRequests);
- hasSpaceToWriteCondition.await();
- LOGGER.info("unblocking because inflight requests size is below limit {} for publication channel", maxInflightRequests);
+ int payloadSize = payload.size();
+ if (hasLimit) {
+ int newSize = inflightBytes + payloadSize;
+ if (newSize > maxInflightRequestsBytes) {
+ LOGGER.warn("blocking because {} inflight requests bytes size is above limit {} bytes for publication channel",
+ newSize, maxInflightRequestsBytes);
+ do {
+ hasSpaceToWriteCondition.await();
+ newSize = inflightBytes + payloadSize;
+ } while (newSize > maxInflightRequestsBytes && !noConfirmationWillBeReceived);
+ if (noConfirmationWillBeReceived) {
+ LOGGER.warn("unblocking because no confirmation will be received and inflight request size will not change");
+ } else {
+ LOGGER.info("unblocking because {} inflight requests bytes size is below limit {} bytes for publication channel",
+ newSize, maxInflightRequestsBytes);
+ }
+ }
}
inflightRequests.put(deliveryTag, payload);
+ inflightBytes += payloadSize;
} finally {
lock.writeLock().unlock();
}
@@ -1162,6 +1181,7 @@ public void remove(long deliveryTag) {
public void handleAck(long deliveryTag, boolean multiple) {
LOGGER.trace("Delivery ack received for tag {} (multiple:{})", deliveryTag, multiple);
removeInflightRequests(deliveryTag, multiple);
+ LOGGER.trace("Delivery ack processed for tag {} (multiple:{})", deliveryTag, multiple);
}
@Override
@@ -1170,6 +1190,7 @@ public void handleNack(long deliveryTag, boolean multiple) {
// we cannot do match with nack because this is an internal error in rabbitmq
// we can try to republish the message but there is no guarantee that the message will be accepted
removeInflightRequests(deliveryTag, multiple);
+ LOGGER.warn("Delivery nack processed for tag {} (multiple:{})", deliveryTag, multiple);
}
private void removeInflightRequests(long deliveryTag, boolean multiple) {
@@ -1178,9 +1199,13 @@ private void removeInflightRequests(long deliveryTag, boolean multiple) {
}
lock.writeLock().lock();
try {
- int initialSize = inflightRequests.size();
+ int currentSize = inflightBytes;
if (multiple) {
- inflightRequests.headMap(deliveryTag, true).clear();
+ Map headMap = inflightRequests.headMap(deliveryTag, true);
+ for (Map.Entry entry : headMap.entrySet()) {
+ currentSize -= entry.getValue().size();
+ }
+ headMap.clear();
} else {
long oldestPublication = Objects.requireNonNullElse(inflightRequests.firstKey(), deliveryTag);
if (oldestPublication == deliveryTag) {
@@ -1195,6 +1220,7 @@ private void removeInflightRequests(long deliveryTag, boolean multiple) {
if (key > deliveryTag && !holder.isConfirmed()) {
break;
}
+ currentSize -= holder.size();
tailIterator.remove();
}
} else {
@@ -1206,12 +1232,14 @@ private void removeInflightRequests(long deliveryTag, boolean multiple) {
}
}
}
+ if (inflightBytes != currentSize) {
+ inflightBytes = currentSize;
+ hasSpaceToWriteCondition.signalAll();
+ }
if (inflightRequests.isEmpty()) {
+ inflightBytes = 0;
allMessagesConfirmed.signalAll();
}
- if (inflightRequests.size() != initialSize) {
- hasSpaceToWriteCondition.signalAll();
- }
} finally {
lock.writeLock().unlock();
}
@@ -1244,6 +1272,7 @@ public void transferUnconfirmedTo(Deque redelivery) {
redelivery.addFirst(payload);
}
inflightRequests.clear();
+ inflightBytes = 0;
hasSpaceToWriteCondition.signalAll();
allMessagesConfirmed.signalAll();
noConfirmationWillBeReceived = false;
@@ -1270,6 +1299,7 @@ public boolean isNoConfirmationWillBeReceived() {
public void noConfirmationWillBeReceived() {
lock.writeLock().lock();
try {
+ LOGGER.warn("Publication listener was notified that no confirmations will be received");
noConfirmationWillBeReceived = true;
// we need to unlock possible locked publisher so it can check that nothing will be confirmed
// and retry the publication
diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/configuration/RabbitMQConfiguration.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/configuration/RabbitMQConfiguration.kt
index 9396edec3..74db30a23 100644
--- a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/configuration/RabbitMQConfiguration.kt
+++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/configuration/RabbitMQConfiguration.kt
@@ -44,8 +44,8 @@ data class ConnectionManagerConfiguration(
val workingThreads: Int = 1,
val confirmationTimeout: Duration = Duration.ofMinutes(5),
val enablePublisherConfirmation: Boolean = false,
- // Default value is taken based on measurement done in ConnectionManualBenchmark class
- val maxInflightPublications: Int = 200,
+ // Default value 50MB is taken based on measurement done in ConnectionManualBenchmark class
+ val maxInflightPublicationsBytes: Int = 50 * 1024 * 1024,
val heartbeatIntervalSeconds: Int = DEFAULT_HB_INTERVAL_SECONDS,
) : Configuration() {
init {
diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManualBenchmark.kt b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManualBenchmark.kt
index cfcb22a46..2f8fa25a6 100644
--- a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManualBenchmark.kt
+++ b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManualBenchmark.kt
@@ -46,7 +46,7 @@ object ConnectionManualBenchmark {
prefetchCount = 1000,
confirmationTimeout = Duration.ofSeconds(5),
enablePublisherConfirmation = true,
- maxInflightPublications = 100,
+ maxInflightPublicationsBytes = 1024 * 1024 * 25,
)
)
@@ -57,7 +57,7 @@ object ConnectionManualBenchmark {
prefetchCount = 100,
confirmationTimeout = Duration.ofSeconds(5),
enablePublisherConfirmation = true,
- maxInflightPublications = 200,
+ maxInflightPublicationsBytes = 1024 * 1024 * 1,
)
)
}
@@ -69,7 +69,7 @@ object ConnectionManualBenchmark {
prefetchCount = 100,
confirmationTimeout = Duration.ofSeconds(5),
enablePublisherConfirmation = false,
- maxInflightPublications = -1,
+ maxInflightPublicationsBytes = -1,
)
)
}
diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt
index 0eca6f625..da536e51b 100644
--- a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt
+++ b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt
@@ -53,6 +53,7 @@ import org.testcontainers.utility.MountableFile
import java.io.IOException
import java.time.Duration
import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.CompletableFuture
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
@@ -74,13 +75,14 @@ class TestConnectionManager {
LOGGER.info { "Started with port ${rabbit.amqpPort}" }
val messagesCount = 10
val countDown = CountDownLatch(messagesCount)
+ val messageSizeBytes = 7
createConnectionManager(
rabbit, ConnectionManagerConfiguration(
subscriberName = "test",
prefetchCount = DEFAULT_PREFETCH_COUNT,
confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT,
enablePublisherConfirmation = true,
- maxInflightPublications = 5,
+ maxInflightPublicationsBytes = 5 * messageSizeBytes,
heartbeatIntervalSeconds = 1,
minConnectionRecoveryTimeout = 2000,
maxConnectionRecoveryTimeout = 2000,
@@ -104,6 +106,7 @@ class TestConnectionManager {
}
+ var future: CompletableFuture<*>? = null
repeat(messagesCount) { index ->
if (index == 1) {
// delay should allow ack for the first message be received
@@ -119,16 +122,22 @@ class TestConnectionManager {
// So, we will have to deal with it on the consumer side
rabbit.executeInContainerWithLogging("ifconfig", "eth0", "down")
} else if (index == 4) {
- // More than 2 HB will be missed
- // This is enough for rabbitmq server to understand the connection is lost
- Thread.sleep(4_000)
- // enabling network interface back
- rabbit.executeInContainerWithLogging("ifconfig", "eth0", "up")
+ future = CompletableFuture.supplyAsync {
+ // Interface is unblock in separate thread to emulate more realistic scenario
+
+ // More than 2 HB will be missed
+ // This is enough for rabbitmq server to understand the connection is lost
+ Thread.sleep(3_000)
+ // enabling network interface back
+ rabbit.executeInContainerWithLogging("ifconfig", "eth0", "up")
+ }
}
manager.basicPublish(exchange, routingKey, null, "Hello $index".toByteArray(Charsets.UTF_8))
}
- countDown.assertComplete("Not all messages were received: $receivedMessages")
+ future?.get(30, TimeUnit.SECONDS)
+
+ countDown.assertComplete { "Not all messages were received: $receivedMessages" }
assertEquals(
(0 until messagesCount).map {
"Hello $it"
@@ -959,12 +968,16 @@ class TestConnectionManager {
}
private fun CountDownLatch.assertComplete(message: String) {
+ assertComplete { message }
+ }
+
+ private fun CountDownLatch.assertComplete(messageSupplier: () -> String) {
assertTrue(
await(
1L,
TimeUnit.SECONDS
)
- ) { "$message, actual count: $count" }
+ ) { "${messageSupplier()}, actual count: $count" }
}
private fun assertTarget(target: T, timeout: Long = 1_000, message: String, func: () -> T) {
From a832f9cd6360c37c9c4fdd0c3781872d54f970ba Mon Sep 17 00:00:00 2001
From: Oleg
Date: Sun, 2 Jun 2024 18:45:31 +0400
Subject: [PATCH 13/16] Fix payload lost during retry attempt
---
.../connection/ConnectionManager.java | 26 ++++++++++++----
.../connection/TestConnectionManager.kt | 30 +++++++++++--------
2 files changed, 39 insertions(+), 17 deletions(-)
diff --git a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java
index 01a3113c4..1cd0c1086 100644
--- a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java
+++ b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java
@@ -853,6 +853,9 @@ public void retryingPublishWithLock(ConnectionManagerConfiguration configuration
var recoveryDelay = currentValue.getDelay();
LOGGER.warn("Retrying publishing #{}, waiting for {}ms. Reason: {}", currentValue.getTryNumber(), recoveryDelay, e);
TimeUnit.MILLISECONDS.sleep(recoveryDelay);
+ // cleanup after failure
+ publishConfirmationListener.remove(msgSeq);
+ redeliveryQueue.addFirst(currentPayload);
// We should not recover the channel if its connection is closed
// If we do that the channel will be also auto recovered by RabbitMQ client
@@ -862,10 +865,6 @@ public void retryingPublishWithLock(ConnectionManagerConfiguration configuration
// so we should redeliver all inflight requests
publishConfirmationListener.transferUnconfirmedTo(redeliveryQueue);
tempChannel = recreateChannel();
- } else {
- // cleanup after failure
- publishConfirmationListener.remove(msgSeq);
- redeliveryQueue.addFirst(currentPayload);
}
}
}
@@ -1174,7 +1173,24 @@ public void put(long deliveryTag, PublicationHolder payload) throws InterruptedE
}
public void remove(long deliveryTag) {
- removeInflightRequests(deliveryTag, false);
+ if (!enablePublisherConfirmation) {
+ return;
+ }
+ lock.writeLock().lock();
+ try {
+ PublicationHolder holder = inflightRequests.remove(deliveryTag);
+ if (holder == null) {
+ return;
+ }
+ inflightBytes -= holder.size();
+ hasSpaceToWriteCondition.signalAll();
+ if (inflightRequests.isEmpty()) {
+ inflightBytes = 0;
+ allMessagesConfirmed.signalAll();
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
}
@Override
diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt
index da536e51b..da1ecfc3b 100644
--- a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt
+++ b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt
@@ -282,7 +282,8 @@ class TestConnectionManager {
.let {
declareQueue(rabbit, queueName)
LOGGER.info { "Started with port ${it.amqpPort}" }
- val counter = AtomicInteger(0)
+ val counter = AtomicInteger()
+ val downLatch = CountDownLatch(1)
createConnectionManager(
it,
ConnectionManagerConfiguration(
@@ -298,6 +299,7 @@ class TestConnectionManager {
try {
monitor = connectionManager.basicConsume(queueName, { _, delivery, _ ->
counter.incrementAndGet()
+ downLatch.countDown()
LOGGER.info { "Received ${delivery.body.toString(Charsets.UTF_8)} from \"${delivery.envelope.routingKey}\"" }
}) {
LOGGER.info { "Canceled $it" }
@@ -309,19 +311,18 @@ class TestConnectionManager {
LOGGER.info { "Publication finished!" }
assertEquals(
0,
- counter.get()
+ counter.get(),
) { "Unexpected number of messages received. The first message shouldn't be received" }
- Thread.sleep(200)
LOGGER.info { "Creating the correct exchange..." }
declareFanoutExchangeWithBinding(it, exchange, queueName)
- Thread.sleep(200)
LOGGER.info { "Exchange created!" }
Assertions.assertDoesNotThrow {
connectionManager.basicPublish(exchange, "", null, "Hello2".toByteArray(Charsets.UTF_8))
}
- Thread.sleep(200)
+ downLatch.assertComplete(1L, TimeUnit.SECONDS) { "no messages were received" }
+
assertEquals(
1,
counter.get()
@@ -967,16 +968,21 @@ class TestConnectionManager {
}
}
- private fun CountDownLatch.assertComplete(message: String) {
- assertComplete { message }
+ private fun CountDownLatch.assertComplete(
+ message: String,
+ timeout: Long = 1,
+ timeUnit: TimeUnit = TimeUnit.SECONDS,
+ ) {
+ assertComplete(timeout, timeUnit) { message }
}
- private fun CountDownLatch.assertComplete(messageSupplier: () -> String) {
+ private fun CountDownLatch.assertComplete(
+ timeout: Long = 1,
+ timeUnit: TimeUnit = TimeUnit.SECONDS,
+ messageSupplier: () -> String,
+ ) {
assertTrue(
- await(
- 1L,
- TimeUnit.SECONDS
- )
+ await(timeout, timeUnit)
) { "${messageSupplier()}, actual count: $count" }
}
From b859c5927c504d8314c118b6cd35aa4d81bf41c7 Mon Sep 17 00:00:00 2001
From: Oleg
Date: Mon, 3 Jun 2024 18:01:55 +0400
Subject: [PATCH 14/16] Correct work with confirmation flag. Use correct
channel variable when publishing payload
---
.../impl/rabbitmq/connection/ConnectionManager.java | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
diff --git a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java
index 1cd0c1086..f3444e72d 100644
--- a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java
+++ b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java
@@ -380,9 +380,12 @@ public void close() {
LOGGER.warn("Some messages were not confirmed by broken in channel {} and were not republished. Try to republish messages", id);
channelHolder.publishUnconfirmedMessages();
}
+ LOGGER.info("Waiting for messages confirmation in channel {}", id);
try {
channelHolder.awaitConfirmations(closeTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignored) {
+ LOGGER.warn("Waiting for messages confirmation in channel {} was interrupted", id);
+ Thread.currentThread().interrupt();
}
}
if (channelHolder.hasUnconfirmedMessages()) {
@@ -844,7 +847,7 @@ public void retryingPublishWithLock(ConnectionManagerConfiguration configuration
// will drain message to queue on next iteration
continue;
}
- currentPayload.publish(channel);
+ currentPayload.publish(tempChannel);
if (redeliveryQueue.isEmpty()) {
break;
}
@@ -1076,9 +1079,6 @@ public void publishUnconfirmedMessages() throws IOException {
publishConfirmationListener.transferUnconfirmedTo(redeliveryQueue);
while (!redeliveryQueue.isEmpty()) {
PublicationHolder holder = redeliveryQueue.pollFirst();
- if (holder == null) {
- break;
- }
holder.publish(channel);
}
} finally {
@@ -1289,9 +1289,9 @@ public void transferUnconfirmedTo(Deque redelivery) {
}
inflightRequests.clear();
inflightBytes = 0;
+ noConfirmationWillBeReceived = false;
hasSpaceToWriteCondition.signalAll();
allMessagesConfirmed.signalAll();
- noConfirmationWillBeReceived = false;
} finally {
lock.writeLock().unlock();
}
From 45412f747921e15e1a6b2fcbfe3ef63d4dc33f0c Mon Sep 17 00:00:00 2001
From: Oleg Smirnov
Date: Mon, 3 Jun 2024 21:44:12 +0400
Subject: [PATCH 15/16] Update version in gradle.properties
---
gradle.properties | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/gradle.properties b/gradle.properties
index f83ac548b..bc2d53f5a 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -13,8 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-release_version=5.12.0
+release_version=5.13.0
kotlin_version=1.8.22
description='th2 common library (Java)'
vcs_url=https://github.com/th2-net/th2-common-j
-kapt.include.compile.classpath=false
\ No newline at end of file
+kapt.include.compile.classpath=false
From 84a094b7d05310a8ac45d69019367416c464e737 Mon Sep 17 00:00:00 2001
From: Oleg
Date: Wed, 5 Jun 2024 14:18:52 +0400
Subject: [PATCH 16/16] Add test case for republishing on close. Correct close
method to allow republisihing
---
build.gradle | 1 +
.../connection/ConnectionManager.java | 19 ++--
.../connection/TestConnectionManager.kt | 103 +++++++++++++++++-
3 files changed, 114 insertions(+), 9 deletions(-)
diff --git a/build.gradle b/build.gradle
index 5b3cd75b0..fce8d0e0f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -166,6 +166,7 @@ dependencies {
testImplementation("org.junit-pioneer:junit-pioneer:2.2.0") {
because("system property tests")
}
+ testImplementation("org.awaitility:awaitility:4.2.1")
testFixturesImplementation "org.jetbrains.kotlin:kotlin-test-junit5:$kotlin_version"
testFixturesImplementation "org.junit.jupiter:junit-jupiter:$junitVersion"
diff --git a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java
index f3444e72d..e4e070891 100644
--- a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java
+++ b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java
@@ -73,8 +73,8 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@@ -89,7 +89,7 @@ public class ConnectionManager implements AutoCloseable {
private final Connection connection;
private final Map channelsByPin = new ConcurrentHashMap<>();
- private final AtomicBoolean connectionIsClosed = new AtomicBoolean(false);
+ private final AtomicReference connectionIsClosed = new AtomicReference<>(State.OPEN);
private final ConnectionManagerConfiguration configuration;
private final String subscriberName;
private final AtomicInteger nextSubscriberId = new AtomicInteger(1);
@@ -118,6 +118,8 @@ public ConnectionManagerConfiguration getConfiguration() {
return configuration;
}
+ private enum State { OPEN, CLOSING, CLOSED }
+
public ConnectionManager(@NotNull String connectionName, @NotNull RabbitMQConfiguration rabbitMQConfiguration, @NotNull ConnectionManagerConfiguration connectionManagerConfiguration) {
Objects.requireNonNull(rabbitMQConfiguration, "RabbitMQ configuration cannot be null");
this.configuration = Objects.requireNonNull(connectionManagerConfiguration, "Connection manager configuration can not be null");
@@ -205,7 +207,7 @@ private void turnOffReadiness(Throwable exception) {
}
});
- factory.setConnectionRecoveryTriggeringCondition(shutdownSignal -> !connectionIsClosed.get());
+ factory.setConnectionRecoveryTriggeringCondition(shutdownSignal -> connectionIsClosed.get() != State.CLOSED);
factory.setRecoveryDelayHandler(recoveryAttempts -> {
int minTime = connectionManagerConfiguration.getMinConnectionRecoveryTimeout();
@@ -358,12 +360,12 @@ public void handleUnblocked() {
}
public boolean isOpen() {
- return connection.isOpen() && !connectionIsClosed.get();
+ return connection.isOpen() && connectionIsClosed.get() == State.OPEN;
}
@Override
public void close() {
- if (connectionIsClosed.getAndSet(true)) {
+ if (!connectionIsClosed.compareAndSet(State.OPEN, State.CLOSING)) {
LOGGER.info("Connection manager already closed");
return;
}
@@ -397,6 +399,8 @@ public void close() {
}
}
+ connectionIsClosed.set(State.CLOSED);
+
if (connection.isOpen()) {
try {
connection.close(closeTimeout);
@@ -602,7 +606,7 @@ private void waitForConnectionRecovery(ShutdownNotifier notifier, boolean waitFo
}
}
- if (connectionIsClosed.get()) {
+ if (connectionIsClosed.get() == State.CLOSED) {
throw new IllegalStateException("Connection is already closed");
}
}
@@ -621,7 +625,8 @@ private void waitForRecovery(ShutdownNotifier notifier) {
}
private boolean isConnectionRecovery(ShutdownNotifier notifier) {
- return !(notifier instanceof AutorecoveringChannel) && !notifier.isOpen() && !connectionIsClosed.get();
+ return !(notifier instanceof AutorecoveringChannel) && !notifier.isOpen()
+ && connectionIsClosed.get() != State.CLOSED;
}
/**
diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt
index da1ecfc3b..d96e1135e 100644
--- a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt
+++ b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt
@@ -36,6 +36,7 @@ import com.rabbitmq.client.BuiltinExchangeType
import com.rabbitmq.client.CancelCallback
import com.rabbitmq.client.Delivery
import mu.KotlinLogging
+import org.awaitility.Awaitility
import org.junit.jupiter.api.AfterAll
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Assertions
@@ -110,7 +111,10 @@ class TestConnectionManager {
repeat(messagesCount) { index ->
if (index == 1) {
// delay should allow ack for the first message be received
- Thread.sleep(100)
+ Awaitility.await("first message is confirmed")
+ .pollInterval(10, TimeUnit.MILLISECONDS)
+ .atMost(100, TimeUnit.MILLISECONDS)
+ .until { countDown.count == messagesCount - 1L }
// Man pages:
// https://man7.org/linux/man-pages/man8/tc-netem.8.html
// https://man7.org/linux/man-pages/man8/ifconfig.8.html
@@ -127,7 +131,9 @@ class TestConnectionManager {
// More than 2 HB will be missed
// This is enough for rabbitmq server to understand the connection is lost
- Thread.sleep(3_000)
+ Awaitility.await("connection is closed")
+ .atMost(3, TimeUnit.SECONDS)
+ .until { !manager.isOpen }
// enabling network interface back
rabbit.executeInContainerWithLogging("ifconfig", "eth0", "up")
}
@@ -149,6 +155,99 @@ class TestConnectionManager {
}
}
+ @Test
+ fun `connection manager redelivers unconfirmed messages on close`() {
+ val routingKey = "routingKey1"
+ val queueName = "queue1"
+ val exchange = "test-exchange1"
+ rabbit
+ .let { rabbit ->
+ declareQueue(rabbit, queueName)
+ declareFanoutExchangeWithBinding(rabbit, exchange, queueName)
+ LOGGER.info { "Started with port ${rabbit.amqpPort}" }
+ val messagesCount = 10
+ val countDown = CountDownLatch(messagesCount)
+ val messageSizeBytes = 7
+ createConnectionManager(
+ rabbit, ConnectionManagerConfiguration(
+ subscriberName = "test",
+ prefetchCount = DEFAULT_PREFETCH_COUNT,
+ confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT,
+ minConnectionRecoveryTimeout = 2000,
+ maxConnectionRecoveryTimeout = 2000,
+ // to avoid unexpected delays before recovery
+ retryTimeDeviationPercent = 0,
+ )
+ ).use { manager ->
+ val receivedMessages = linkedSetOf()
+ manager.basicConsume(queueName, { _, delivery, ack ->
+ val message = delivery.body.toString(Charsets.UTF_8)
+ LOGGER.info { "Received $message from ${delivery.envelope.routingKey}" }
+ if (receivedMessages.add(message)) {
+ // decrement only unique messages
+ countDown.countDown()
+ } else {
+ LOGGER.warn { "Duplicated $message for ${delivery.envelope.routingKey}" }
+ }
+ ack.confirm()
+ }) {
+ LOGGER.info { "Canceled $it" }
+ }
+
+ createConnectionManager(
+ rabbit, ConnectionManagerConfiguration(
+ prefetchCount = DEFAULT_PREFETCH_COUNT,
+ confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT,
+ enablePublisherConfirmation = true,
+ maxInflightPublicationsBytes = messagesCount * 2 * messageSizeBytes,
+ heartbeatIntervalSeconds = 1,
+ minConnectionRecoveryTimeout = 2000,
+ maxConnectionRecoveryTimeout = 2000,
+ // to avoid unexpected delays before recovery
+ retryTimeDeviationPercent = 0,
+ )
+ ).use { managerWithConfirmation ->
+ repeat(messagesCount) { index ->
+ if (index == 1) {
+ // delay should allow ack for the first message be received
+ Awaitility.await("first message is confirmed")
+ .pollInterval(10, TimeUnit.MILLISECONDS)
+ .atMost(100, TimeUnit.MILLISECONDS)
+ .until { countDown.count == messagesCount - 1L }
+ // looks like if nothing is sent yet through the channel
+ // it will detekt connection lost right away and start waiting for recovery
+ rabbit.executeInContainerWithLogging("ifconfig", "eth0", "down")
+ }
+ managerWithConfirmation.basicPublish(
+ exchange,
+ routingKey,
+ null,
+ "Hello $index".toByteArray(Charsets.UTF_8)
+ )
+ }
+ // ensure connection is closed because of HB timeout
+ Awaitility.await("connection is closed")
+ .atMost(4, TimeUnit.SECONDS)
+ .until { !managerWithConfirmation.isOpen }
+ rabbit.executeInContainerWithLogging("ifconfig", "eth0", "up")
+ // wait for connection to recover before closing
+ Awaitility.await("connection is recovered")
+ .atMost(4, TimeUnit.SECONDS)
+ .until { managerWithConfirmation.isOpen }
+ }
+
+ countDown.assertComplete { "Not all messages were received: $receivedMessages" }
+ assertEquals(
+ (0 until messagesCount).map {
+ "Hello $it"
+ },
+ receivedMessages.toList(),
+ "messages received in unexpected order",
+ )
+ }
+ }
+ }
+
@Test
fun `connection manager reports unacked messages when confirmation timeout elapsed`() {
val routingKey = "routingKey1"