From 236125c7f90d18697f40b700c59c74ebb10a005c Mon Sep 17 00:00:00 2001
From: Oleg Smirnov
Date: Thu, 6 Jun 2024 10:28:07 +0400
Subject: [PATCH] =?UTF-8?q?Add=20publishing=20confirmation=20to=20redelive?=
=?UTF-8?q?r=20messages=20that=20were=20not=20confi=E2=80=A6=20(#306)?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.github/workflows/integration-tests.yml | 26 +
README.md | 11 +-
build.gradle | 11 +-
gradle.properties | 4 +-
.../connection/ConnectionManager.java | 468 ++++++++++++++++--
.../configuration/RabbitMQConfiguration.kt | 11 +-
.../NotificationEventBatchSender.kt | 1 +
.../grpc/router/impl/DefaultGrpcRouterTest.kt | 79 ++-
.../connection/ConnectionManualBenchmark.kt | 151 ++++++
.../connection/TestConnectionManager.kt | 269 +++++++++-
10 files changed, 955 insertions(+), 76 deletions(-)
create mode 100644 .github/workflows/integration-tests.yml
create mode 100644 src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManualBenchmark.kt
diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml
new file mode 100644
index 000000000..c46e756f5
--- /dev/null
+++ b/.github/workflows/integration-tests.yml
@@ -0,0 +1,26 @@
+name: "Run integration tests for common"
+
+on:
+ push:
+ branches:
+ - '*'
+
+jobs:
+ tests:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v4
+ - name: Set up JDK 'zulu' '11'
+ uses: actions/setup-java@v4
+ with:
+ distribution: 'zulu'
+ java-version: '11'
+ - name: Setup Gradle
+ uses: gradle/actions/setup-gradle@v3
+ - name: Build with Gradle
+ run: ./gradlew --info integrationTest
+ - uses: actions/upload-artifact@v4
+ if: failure()
+ with:
+ name: integration-test-results
+ path: build/reports/tests/integrationTest/
diff --git a/README.md b/README.md
index 5847e4aff..c3b35bfb9 100644
--- a/README.md
+++ b/README.md
@@ -1,4 +1,4 @@
-# th2 common library (Java) (5.12.0)
+# th2 common library (Java) (5.13.0)
## Usage
@@ -511,6 +511,15 @@ dependencies {
## Release notes
+### 5.13.0-dev
+
++ Added functionality for publisher confirmations to mitigate network issues for message producers.
++ New parameters are added to connection manager configuration:
+ + enablePublisherConfirmation - enables publisher confirmation. `false` by default.
+ + maxInflightPublicationsBytes - the max number of unconfirmed published messages per channel. `52428800` (50 MB), by default.
+ + heartbeatIntervalSeconds - rabbitmq connection heartbeat interval in seconds.
+ `0` by default (that means the default interval will be set by the internal library used to communicate with RabbitMQ).
+
### 5.12.0-dev
+ Updated kubernetes-client: `6.12.1`
diff --git a/build.gradle b/build.gradle
index b1b5c0993..fce8d0e0f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -166,6 +166,7 @@ dependencies {
testImplementation("org.junit-pioneer:junit-pioneer:2.2.0") {
because("system property tests")
}
+ testImplementation("org.awaitility:awaitility:4.2.1")
testFixturesImplementation "org.jetbrains.kotlin:kotlin-test-junit5:$kotlin_version"
testFixturesImplementation "org.junit.jupiter:junit-jupiter:$junitVersion"
@@ -206,4 +207,12 @@ tasks.named('extractIncludeProto') {enabled = false }
tasks.named('extractIncludeTestFixturesProto') {enabled = false }
compileTestJava.dependsOn.add('generateTestProto')
-processTestResources.dependsOn.add('generateTestProto')
\ No newline at end of file
+processTestResources.dependsOn.add('generateTestProto')
+
+tasks.register("publicationManualBench", JavaExec.class) {
+ mainClass.set('com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManualBenchmark')
+ classpath(sourceSets.test.runtimeClasspath)
+ dependsOn('testClasses')
+
+ jvmArgs('-XX:StartFlightRecording=duration=60s,settings=profile,filename=publishing-profile-record.jfr')
+}
\ No newline at end of file
diff --git a/gradle.properties b/gradle.properties
index f83ac548b..bc2d53f5a 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -13,8 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-release_version=5.12.0
+release_version=5.13.0
kotlin_version=1.8.22
description='th2 common library (Java)'
vcs_url=https://github.com/th2-net/th2-common-j
-kapt.include.compile.classpath=false
\ No newline at end of file
+kapt.include.compile.classpath=false
diff --git a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java
index aea3e5304..e4e070891 100644
--- a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java
+++ b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java
@@ -30,6 +30,7 @@
import com.rabbitmq.client.BlockedListener;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
@@ -56,11 +57,15 @@
import javax.annotation.concurrent.GuardedBy;
import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.NavigableMap;
import java.util.Objects;
import java.util.Collections;
+import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -68,10 +73,13 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
@@ -81,7 +89,7 @@ public class ConnectionManager implements AutoCloseable {
private final Connection connection;
private final Map channelsByPin = new ConcurrentHashMap<>();
- private final AtomicBoolean connectionIsClosed = new AtomicBoolean(false);
+ private final AtomicReference connectionIsClosed = new AtomicReference<>(State.OPEN);
private final ConnectionManagerConfiguration configuration;
private final String subscriberName;
private final AtomicInteger nextSubscriberId = new AtomicInteger(1);
@@ -110,6 +118,8 @@ public ConnectionManagerConfiguration getConfiguration() {
return configuration;
}
+ private enum State { OPEN, CLOSING, CLOSED }
+
public ConnectionManager(@NotNull String connectionName, @NotNull RabbitMQConfiguration rabbitMQConfiguration, @NotNull ConnectionManagerConfiguration connectionManagerConfiguration) {
Objects.requireNonNull(rabbitMQConfiguration, "RabbitMQ configuration cannot be null");
this.configuration = Objects.requireNonNull(connectionManagerConfiguration, "Connection manager configuration can not be null");
@@ -146,6 +156,10 @@ public ConnectionManager(@NotNull String connectionName, @NotNull RabbitMQConfig
factory.setConnectionTimeout(connectionManagerConfiguration.getConnectionTimeout());
}
+ if (connectionManagerConfiguration.getHeartbeatIntervalSeconds() > ConnectionManagerConfiguration.DEFAULT_HB_INTERVAL_SECONDS) {
+ factory.setRequestedHeartbeat(connectionManagerConfiguration.getHeartbeatIntervalSeconds());
+ }
+
factory.setExceptionHandler(new ExceptionHandler() {
@Override
public void handleUnexpectedConnectionDriverException(Connection conn, Throwable exception) {
@@ -193,7 +207,7 @@ private void turnOffReadiness(Throwable exception) {
}
});
- factory.setConnectionRecoveryTriggeringCondition(shutdownSignal -> !connectionIsClosed.get());
+ factory.setConnectionRecoveryTriggeringCondition(shutdownSignal -> connectionIsClosed.get() != State.CLOSED);
factory.setRecoveryDelayHandler(recoveryAttempts -> {
int minTime = connectionManagerConfiguration.getMinConnectionRecoveryTimeout();
@@ -346,27 +360,47 @@ public void handleUnblocked() {
}
public boolean isOpen() {
- return connection.isOpen() && !connectionIsClosed.get();
+ return connection.isOpen() && connectionIsClosed.get() == State.OPEN;
}
@Override
public void close() {
- if (connectionIsClosed.getAndSet(true)) {
+ if (!connectionIsClosed.compareAndSet(State.OPEN, State.CLOSING)) {
LOGGER.info("Connection manager already closed");
return;
}
LOGGER.info("Closing connection manager");
+ int closeTimeout = configuration.getConnectionCloseTimeout();
- for (ChannelHolder channelHolder: channelsByPin.values()) {
+ for (Map.Entry entry: channelsByPin.entrySet()) {
+ PinId id = entry.getKey();
+ ChannelHolder channelHolder = entry.getValue();
try {
+ if (channelHolder.hasUnconfirmedMessages()) {
+ if (channelHolder.noConfirmationWillBeReceived()) {
+ LOGGER.warn("Some messages were not confirmed by broken in channel {} and were not republished. Try to republish messages", id);
+ channelHolder.publishUnconfirmedMessages();
+ }
+ LOGGER.info("Waiting for messages confirmation in channel {}", id);
+ try {
+ channelHolder.awaitConfirmations(closeTimeout, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException ignored) {
+ LOGGER.warn("Waiting for messages confirmation in channel {} was interrupted", id);
+ Thread.currentThread().interrupt();
+ }
+ }
+ if (channelHolder.hasUnconfirmedMessages()) {
+ LOGGER.error("RabbitMQ channel for pin {} has unconfirmed messages on close", id);
+ }
channelHolder.channel.abort();
} catch (IOException e) {
- LOGGER.error("Cannot close channel", e);
+ LOGGER.error("Cannot close channel for pin {}", id, e);
}
}
- int closeTimeout = configuration.getConnectionCloseTimeout();
+ connectionIsClosed.set(State.CLOSED);
+
if (connection.isOpen()) {
try {
connection.close(closeTimeout);
@@ -379,13 +413,14 @@ public void close() {
shutdownExecutor(channelChecker, closeTimeout, "channel-checker");
}
- public void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws InterruptedException {
+ public void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException, InterruptedException {
ChannelHolder holder = getOrCreateChannelFor(PinId.forRoutingKey(exchange, routingKey));
- holder.retryingPublishWithLock(channel -> channel.basicPublish(exchange, routingKey, props, body), configuration);
+ holder.retryingPublishWithLock(configuration, body,
+ (channel, payload) -> channel.basicPublish(exchange, routingKey, props, payload));
}
public String queueDeclare() throws IOException {
- ChannelHolder holder = new ChannelHolder(this::createChannel, this::waitForConnectionRecovery, configuration.getPrefetchCount());
+ ChannelHolder holder = new ChannelHolder(this::createChannel, this::waitForConnectionRecovery, configurationToOptions());
return holder.mapWithLock(channel -> {
String queue = channel.queueDeclare(
"", // queue name
@@ -472,6 +507,14 @@ boolean isAlive() {
return metrics.getLivenessMonitor().isEnabled();
}
+ private ChannelHolderOptions configurationToOptions() {
+ return new ChannelHolderOptions(
+ configuration.getPrefetchCount(),
+ configuration.getEnablePublisherConfirmation(),
+ configuration.getMaxInflightPublicationsBytes()
+ );
+ }
+
private void basicCancel(Channel channel, String consumerTag) throws IOException {
channel.basicCancel(consumerTag);
}
@@ -511,14 +554,14 @@ public SubscriptionCallbacks(ManualAckDeliveryCallback deliverCallback, CancelCa
private ChannelHolder getOrCreateChannelFor(PinId pinId) {
return channelsByPin.computeIfAbsent(pinId, ignore -> {
LOGGER.trace("Creating channel holder for {}", pinId);
- return new ChannelHolder(this::createChannel, this::waitForConnectionRecovery, configuration.getPrefetchCount());
+ return new ChannelHolder(this::createChannel, this::waitForConnectionRecovery, configurationToOptions());
});
}
private ChannelHolder getOrCreateChannelFor(PinId pinId, SubscriptionCallbacks subscriptionCallbacks) {
return channelsByPin.computeIfAbsent(pinId, ignore -> {
LOGGER.trace("Creating channel holder with callbacks for {}", pinId);
- return new ChannelHolder(() -> createChannelWithOptionalRecovery(true), this::waitForConnectionRecovery, configuration.getPrefetchCount(), subscriptionCallbacks);
+ return new ChannelHolder(() -> createChannelWithOptionalRecovery(true), this::waitForConnectionRecovery, configurationToOptions(), subscriptionCallbacks);
});
}
@@ -563,7 +606,7 @@ private void waitForConnectionRecovery(ShutdownNotifier notifier, boolean waitFo
}
}
- if (connectionIsClosed.get()) {
+ if (connectionIsClosed.get() == State.CLOSED) {
throw new IllegalStateException("Connection is already closed");
}
}
@@ -582,7 +625,8 @@ private void waitForRecovery(ShutdownNotifier notifier) {
}
private boolean isConnectionRecovery(ShutdownNotifier notifier) {
- return !(notifier instanceof AutorecoveringChannel) && !notifier.isOpen() && !connectionIsClosed.get();
+ return !(notifier instanceof AutorecoveringChannel) && !notifier.isOpen()
+ && connectionIsClosed.get() != State.CLOSED;
}
/**
@@ -679,11 +723,35 @@ public String toString() {
}
}
+ private static class ChannelHolderOptions {
+ private final int maxCount;
+ private final boolean enablePublisherConfirmation;
+ private final int maxInflightRequestsBytes;
+
+ private ChannelHolderOptions(int maxCount, boolean enablePublisherConfirmation, int maxInflightRequestsBytes) {
+ this.maxCount = maxCount;
+ this.enablePublisherConfirmation = enablePublisherConfirmation;
+ this.maxInflightRequestsBytes = maxInflightRequestsBytes;
+ }
+
+ public int getMaxCount() {
+ return maxCount;
+ }
+
+ public boolean isEnablePublisherConfirmation() {
+ return enablePublisherConfirmation;
+ }
+
+ public int getMaxInflightRequestsBytes() {
+ return maxInflightRequestsBytes;
+ }
+ }
+
private static class ChannelHolder {
private final Lock lock = new ReentrantLock();
private final Supplier supplier;
private final BiConsumer reconnectionChecker;
- private final int maxCount;
+ private final ChannelHolderOptions options;
private final SubscriptionCallbacks subscriptionCallbacks;
@GuardedBy("lock")
private int pending;
@@ -693,30 +761,47 @@ private static class ChannelHolder {
private Channel channel;
private final Lock subscribingLock = new ReentrantLock();
@GuardedBy("subscribingLock")
- private boolean isSubscribed = false;
+ private boolean isSubscribed;
+ @GuardedBy("lock")
+ private Deque redeliveryQueue = new ArrayDeque<>();
+
+ private final PublisherConfirmationListener publishConfirmationListener;
public ChannelHolder(
Supplier supplier,
BiConsumer reconnectionChecker,
- int maxCount
+ ChannelHolderOptions options
) {
- this.supplier = Objects.requireNonNull(supplier, "'Supplier' parameter");
- this.reconnectionChecker = Objects.requireNonNull(reconnectionChecker, "'Reconnection checker' parameter");
- this.maxCount = maxCount;
- this.subscriptionCallbacks = null;
+ this(supplier, reconnectionChecker, options, null);
}
public ChannelHolder(
Supplier supplier,
BiConsumer reconnectionChecker,
- int maxCount,
+ ChannelHolderOptions options,
SubscriptionCallbacks subscriptionCallbacks
) {
this.supplier = Objects.requireNonNull(supplier, "'Supplier' parameter");
this.reconnectionChecker = Objects.requireNonNull(reconnectionChecker, "'Reconnection checker' parameter");
- this.maxCount = maxCount;
+ this.options = options;
this.subscriptionCallbacks = subscriptionCallbacks;
+ publishConfirmationListener = new PublisherConfirmationListener(
+ options.isEnablePublisherConfirmation(),
+ options.getMaxInflightRequestsBytes()
+ );
+ }
+
+ public boolean hasUnconfirmedMessages() {
+ return publishConfirmationListener.hasUnconfirmedMessages();
+ }
+
+ public boolean noConfirmationWillBeReceived() {
+ return publishConfirmationListener.isNoConfirmationWillBeReceived();
+ }
+
+ public boolean awaitConfirmations(long timeout, TimeUnit timeUnit) throws InterruptedException {
+ return publishConfirmationListener.awaitConfirmations(timeout, timeUnit);
}
public void withLock(ChannelConsumer consumer) throws IOException {
@@ -741,25 +826,52 @@ public void withLock(boolean waitForRecovery, ChannelConsumer consumer) throws I
}
}
- public void retryingPublishWithLock(ChannelConsumer consumer, ConnectionManagerConfiguration configuration) throws InterruptedException {
+ public void retryingPublishWithLock(ConnectionManagerConfiguration configuration, byte[] payload, ChannelPublisher publisher) throws IOException, InterruptedException {
lock.lock();
try {
Iterator iterator = configuration.createRetryingDelaySequence().iterator();
Channel tempChannel = getChannel(true);
+ // add current message to the end to unify logic for sending current and redelivered messages
+ redeliveryQueue.addLast(new PublicationHolder(publisher, payload));
while (true) {
- try {
- consumer.consume(tempChannel);
+ if (publishConfirmationListener.isNoConfirmationWillBeReceived()) {
+ LOGGER.warn("Connection was closed on channel. No delivery confirmation will be received. Drain message to redelivery");
+ publishConfirmationListener.transferUnconfirmedTo(redeliveryQueue);
+ }
+ PublicationHolder currentPayload = redeliveryQueue.pollFirst();
+ if (!redeliveryQueue.isEmpty()) {
+ LOGGER.warn("Redelivery queue size: {}", redeliveryQueue.size());
+ }
+ if (currentPayload == null) {
break;
+ }
+ long msgSeq = tempChannel.getNextPublishSeqNo();
+ try {
+ publishConfirmationListener.put(msgSeq, currentPayload);
+ if (publishConfirmationListener.isNoConfirmationWillBeReceived()) {
+ // will drain message to queue on next iteration
+ continue;
+ }
+ currentPayload.publish(tempChannel);
+ if (redeliveryQueue.isEmpty()) {
+ break;
+ }
} catch (IOException | ShutdownSignalException e) {
var currentValue = iterator.next();
var recoveryDelay = currentValue.getDelay();
LOGGER.warn("Retrying publishing #{}, waiting for {}ms. Reason: {}", currentValue.getTryNumber(), recoveryDelay, e);
TimeUnit.MILLISECONDS.sleep(recoveryDelay);
+ // cleanup after failure
+ publishConfirmationListener.remove(msgSeq);
+ redeliveryQueue.addFirst(currentPayload);
// We should not recover the channel if its connection is closed
// If we do that the channel will be also auto recovered by RabbitMQ client
// during connection recovery, and we will get two new channels instead of one closed.
if (!tempChannel.isOpen() && tempChannel.getConnection().isOpen()) {
+ // once channel is recreated there won't be any confirmation received
+ // so we should redeliver all inflight requests
+ publishConfirmationListener.transferUnconfirmedTo(redeliveryQueue);
tempChannel = recreateChannel();
}
}
@@ -873,17 +985,17 @@ public T mapWithLock(ChannelMapper mapper) throws IOException {
/**
* Decreases the number of unacked messages.
- * If the number of unacked messages is less than {@link #maxCount}
+ * If the number of unacked messages is less than {@link ChannelHolderOptions#getMaxCount()}
* the onWaterMarkDecreased action will be called.
* The future created in {@link #acquireAndSubmitCheck(Supplier)} method will be canceled
* @param onWaterMarkDecreased
- * the action that will be executed when the number of unacked messages is less than {@link #maxCount} and there is a future to cancel
+ * the action that will be executed when the number of unacked messages is less than {@link ChannelHolderOptions#getMaxCount()} and there is a future to cancel
*/
public void release(Runnable onWaterMarkDecreased) {
lock.lock();
try {
pending--;
- if (pending < maxCount && check != null) {
+ if (pending < options.getMaxCount() && check != null) {
check.cancel(true);
check = null;
onWaterMarkDecreased.run();
@@ -895,11 +1007,11 @@ public void release(Runnable onWaterMarkDecreased) {
/**
* Increases the number of unacked messages.
- * If the number of unacked messages is higher than or equal to {@link #maxCount}
+ * If the number of unacked messages is higher than or equal to {@link ChannelHolderOptions#getMaxCount()}
* the futureSupplier will be invoked to create a task
- * that either will be executed or canceled when number of unacked message will be less that {@link #maxCount}
+ * that either will be executed or canceled when number of unacked message will be less that {@link ChannelHolderOptions#getMaxCount()}
* @param futureSupplier
- * creates a future to track the task that should be executed until the number of unacked message is not less than {@link #maxCount}
+ * creates a future to track the task that should be executed until the number of unacked message is not less than {@link ChannelHolderOptions#getMaxCount()}
*/
public void acquireAndSubmitCheck(Supplier> futureSupplier) {
lock.lock();
@@ -925,29 +1037,307 @@ public boolean isChannelSubscribed(Channel channel) {
public boolean reachedPendingLimit() {
lock.lock();
try {
- return pending >= maxCount;
+ return pending >= options.getMaxCount();
} finally {
lock.unlock();
}
}
- private Channel getChannel() {
+ private Channel getChannel() throws IOException {
return getChannel(true);
}
- private Channel recreateChannel() {
+ private Channel recreateChannel() throws IOException {
channel = supplier.get();
reconnectionChecker.accept(channel, true);
+ setupPublisherConfirmation(channel);
return channel;
}
- private Channel getChannel(boolean waitForRecovery) {
+ private Channel getChannel(boolean waitForRecovery) throws IOException {
if (channel == null) {
channel = supplier.get();
+ setupPublisherConfirmation(channel);
}
reconnectionChecker.accept(channel, waitForRecovery);
return channel;
}
+
+ private void setupPublisherConfirmation(Channel channel) throws IOException {
+ if (!options.isEnablePublisherConfirmation()) {
+ return;
+ }
+ channel.confirmSelect();
+ channel.addShutdownListener(cause -> {
+ publishConfirmationListener.noConfirmationWillBeReceived();
+ });
+ channel.addConfirmListener(publishConfirmationListener);
+ }
+
+ public void publishUnconfirmedMessages() throws IOException {
+ lock.lock();
+ try {
+ Channel channel = getChannel(false);
+ if (!channel.isOpen()) {
+ throw new IllegalStateException("channel is not opened to publish unconfirmed messages");
+ }
+ publishConfirmationListener.transferUnconfirmedTo(redeliveryQueue);
+ while (!redeliveryQueue.isEmpty()) {
+ PublicationHolder holder = redeliveryQueue.pollFirst();
+ holder.publish(channel);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ private static class PublicationHolder {
+ private final ChannelPublisher publisher;
+ private final byte[] payload;
+ private volatile boolean confirmed;
+
+ private PublicationHolder(ChannelPublisher publisher, byte[] payload) {
+ this.publisher = publisher;
+ this.payload = payload;
+ }
+
+ int size() {
+ return payload.length;
+ }
+
+ boolean isConfirmed() {
+ return confirmed;
+ }
+
+ void confirmed() {
+ confirmed = true;
+ }
+
+ void reset() {
+ confirmed = false;
+ }
+
+ public void publish(Channel channel) throws IOException {
+ publisher.publish(channel, payload);
+ }
+ }
+
+ private static class PublisherConfirmationListener implements ConfirmListener {
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+ private final Condition hasSpaceToWriteCondition = lock.writeLock().newCondition();
+ private final Condition allMessagesConfirmed = lock.writeLock().newCondition();
+ @GuardedBy("lock")
+ private final NavigableMap inflightRequests = new TreeMap<>();
+ private final int maxInflightRequestsBytes;
+ private final boolean enablePublisherConfirmation;
+ private final boolean hasLimit;
+ @GuardedBy("lock")
+ private boolean noConfirmationWillBeReceived;
+ @GuardedBy("lock")
+ private int inflightBytes;
+
+ private PublisherConfirmationListener(boolean enablePublisherConfirmation, int maxInflightRequestsBytes) {
+ if (maxInflightRequestsBytes <= 0 && maxInflightRequestsBytes != ConnectionManagerConfiguration.NO_LIMIT_INFLIGHT_REQUESTS) {
+ throw new IllegalArgumentException("invalid maxInflightRequests: " + maxInflightRequestsBytes);
+ }
+ hasLimit = maxInflightRequestsBytes != ConnectionManagerConfiguration.NO_LIMIT_INFLIGHT_REQUESTS;
+ this.maxInflightRequestsBytes = maxInflightRequestsBytes;
+ this.enablePublisherConfirmation = enablePublisherConfirmation;
+ }
+
+ public void put(long deliveryTag, PublicationHolder payload) throws InterruptedException {
+ if (!enablePublisherConfirmation) {
+ return;
+ }
+ lock.writeLock().lock();
+ try {
+ int payloadSize = payload.size();
+ if (hasLimit) {
+ int newSize = inflightBytes + payloadSize;
+ if (newSize > maxInflightRequestsBytes) {
+ LOGGER.warn("blocking because {} inflight requests bytes size is above limit {} bytes for publication channel",
+ newSize, maxInflightRequestsBytes);
+ do {
+ hasSpaceToWriteCondition.await();
+ newSize = inflightBytes + payloadSize;
+ } while (newSize > maxInflightRequestsBytes && !noConfirmationWillBeReceived);
+ if (noConfirmationWillBeReceived) {
+ LOGGER.warn("unblocking because no confirmation will be received and inflight request size will not change");
+ } else {
+ LOGGER.info("unblocking because {} inflight requests bytes size is below limit {} bytes for publication channel",
+ newSize, maxInflightRequestsBytes);
+ }
+ }
+ }
+ inflightRequests.put(deliveryTag, payload);
+ inflightBytes += payloadSize;
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ public void remove(long deliveryTag) {
+ if (!enablePublisherConfirmation) {
+ return;
+ }
+ lock.writeLock().lock();
+ try {
+ PublicationHolder holder = inflightRequests.remove(deliveryTag);
+ if (holder == null) {
+ return;
+ }
+ inflightBytes -= holder.size();
+ hasSpaceToWriteCondition.signalAll();
+ if (inflightRequests.isEmpty()) {
+ inflightBytes = 0;
+ allMessagesConfirmed.signalAll();
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public void handleAck(long deliveryTag, boolean multiple) {
+ LOGGER.trace("Delivery ack received for tag {} (multiple:{})", deliveryTag, multiple);
+ removeInflightRequests(deliveryTag, multiple);
+ LOGGER.trace("Delivery ack processed for tag {} (multiple:{})", deliveryTag, multiple);
+ }
+
+ @Override
+ public void handleNack(long deliveryTag, boolean multiple) {
+ LOGGER.warn("Delivery nack received for tag {} (multiple:{})", deliveryTag, multiple);
+ // we cannot do match with nack because this is an internal error in rabbitmq
+ // we can try to republish the message but there is no guarantee that the message will be accepted
+ removeInflightRequests(deliveryTag, multiple);
+ LOGGER.warn("Delivery nack processed for tag {} (multiple:{})", deliveryTag, multiple);
+ }
+
+ private void removeInflightRequests(long deliveryTag, boolean multiple) {
+ if (!enablePublisherConfirmation) {
+ return;
+ }
+ lock.writeLock().lock();
+ try {
+ int currentSize = inflightBytes;
+ if (multiple) {
+ Map headMap = inflightRequests.headMap(deliveryTag, true);
+ for (Map.Entry entry : headMap.entrySet()) {
+ currentSize -= entry.getValue().size();
+ }
+ headMap.clear();
+ } else {
+ long oldestPublication = Objects.requireNonNullElse(inflightRequests.firstKey(), deliveryTag);
+ if (oldestPublication == deliveryTag) {
+ // received the confirmation for oldest publication
+ // check all earlier confirmation that were confirmed but not removed
+ Iterator> tailIterator =
+ inflightRequests.tailMap(deliveryTag, true).entrySet().iterator();
+ while (tailIterator.hasNext()) {
+ Map.Entry next = tailIterator.next();
+ long key = next.getKey();
+ PublicationHolder holder = next.getValue();
+ if (key > deliveryTag && !holder.isConfirmed()) {
+ break;
+ }
+ currentSize -= holder.size();
+ tailIterator.remove();
+ }
+ } else {
+ // this is not the oldest publication
+ // mark as confirm but wait for oldest to be confirmed
+ var holder = inflightRequests.get(deliveryTag);
+ if (holder != null) {
+ holder.confirmed();
+ }
+ }
+ }
+ if (inflightBytes != currentSize) {
+ inflightBytes = currentSize;
+ hasSpaceToWriteCondition.signalAll();
+ }
+ if (inflightRequests.isEmpty()) {
+ inflightBytes = 0;
+ allMessagesConfirmed.signalAll();
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ public boolean hasUnconfirmedMessages() {
+ lock.readLock().lock();
+ try {
+ return !inflightRequests.isEmpty();
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Adds unconfirmed messages to provided queue.
+ * Messages will be added to the beginning of the queue.
+ * As result, queue will have the oldest messages in the start and newest in the end.
+ *
+ * {@link #inflightRequests} map will be cleared.
+ *
+ * {@link #noConfirmationWillBeReceived} will be reset
+ * @param redelivery queue to transfer messages
+ */
+ public void transferUnconfirmedTo(Deque redelivery) {
+ lock.writeLock().lock();
+ try {
+ for (PublicationHolder payload : inflightRequests.descendingMap().values()) {
+ payload.reset();
+ redelivery.addFirst(payload);
+ }
+ inflightRequests.clear();
+ inflightBytes = 0;
+ noConfirmationWillBeReceived = false;
+ hasSpaceToWriteCondition.signalAll();
+ allMessagesConfirmed.signalAll();
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Indicates that no confirmation will be received for inflight requests.
+ * Method {@link #transferUnconfirmedTo(Deque)} should be called to reset the flag
+ * and obtain messages for redeliver
+ * @return true if channel was closed and no confirmation will be received
+ */
+ public boolean isNoConfirmationWillBeReceived() {
+ lock.readLock().lock();
+ try {
+ return noConfirmationWillBeReceived;
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ public void noConfirmationWillBeReceived() {
+ lock.writeLock().lock();
+ try {
+ LOGGER.warn("Publication listener was notified that no confirmations will be received");
+ noConfirmationWillBeReceived = true;
+ // we need to unlock possible locked publisher so it can check that nothing will be confirmed
+ // and retry the publication
+ hasSpaceToWriteCondition.signalAll();
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ public boolean awaitConfirmations(long timeout, TimeUnit timeUnit) throws InterruptedException {
+ lock.writeLock().lock();
+ try {
+ return inflightRequests.isEmpty() || allMessagesConfirmed.await(timeout, timeUnit);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
}
private interface ChannelMapper {
@@ -957,4 +1347,8 @@ private interface ChannelMapper {
private interface ChannelConsumer {
void consume(Channel channel) throws IOException;
}
+
+ private interface ChannelPublisher {
+ void publish(Channel channel, byte[] payload) throws IOException;
+ }
}
\ No newline at end of file
diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/configuration/RabbitMQConfiguration.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/configuration/RabbitMQConfiguration.kt
index 6c147e5c0..74db30a23 100644
--- a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/configuration/RabbitMQConfiguration.kt
+++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/configuration/RabbitMQConfiguration.kt
@@ -42,7 +42,11 @@ data class ConnectionManagerConfiguration(
val retryTimeDeviationPercent: Int = 10,
val messageRecursionLimit: Int = 100,
val workingThreads: Int = 1,
- val confirmationTimeout: Duration = Duration.ofMinutes(5)
+ val confirmationTimeout: Duration = Duration.ofMinutes(5),
+ val enablePublisherConfirmation: Boolean = false,
+ // Default value 50MB is taken based on measurement done in ConnectionManualBenchmark class
+ val maxInflightPublicationsBytes: Int = 50 * 1024 * 1024,
+ val heartbeatIntervalSeconds: Int = DEFAULT_HB_INTERVAL_SECONDS,
) : Configuration() {
init {
check(maxRecoveryAttempts > 0) { "expected 'maxRecoveryAttempts' greater than 0 but was $maxRecoveryAttempts" }
@@ -64,6 +68,11 @@ data class ConnectionManagerConfiguration(
))
}
}
+
+ companion object {
+ const val NO_LIMIT_INFLIGHT_REQUESTS: Int = -1
+ const val DEFAULT_HB_INTERVAL_SECONDS: Int = 0
+ }
}
data class RetryingDelay(val tryNumber: Int, val delay: Int) {
diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/notification/NotificationEventBatchSender.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/notification/NotificationEventBatchSender.kt
index 0b4cd41eb..a81fe662a 100644
--- a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/notification/NotificationEventBatchSender.kt
+++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/notification/NotificationEventBatchSender.kt
@@ -34,6 +34,7 @@ class NotificationEventBatchSender(
throw UnsupportedOperationException("Method is deprecated, please use constructor")
}
+ @Throws(IOException::class)
override fun send(message: EventBatch) {
try {
connectionManager.basicPublish(exchange, EMPTY_ROUTING_KEY, null, message.toByteArray())
diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/grpc/router/impl/DefaultGrpcRouterTest.kt b/src/test/kotlin/com/exactpro/th2/common/schema/grpc/router/impl/DefaultGrpcRouterTest.kt
index 943d7aa9b..17de2cd62 100644
--- a/src/test/kotlin/com/exactpro/th2/common/schema/grpc/router/impl/DefaultGrpcRouterTest.kt
+++ b/src/test/kotlin/com/exactpro/th2/common/schema/grpc/router/impl/DefaultGrpcRouterTest.kt
@@ -38,9 +38,14 @@ import io.grpc.stub.StreamObserver
import mu.KotlinLogging
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Assertions.assertNull
+import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Nested
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
+import org.junit.jupiter.api.extension.AfterTestExecutionCallback
+import org.junit.jupiter.api.extension.BeforeTestExecutionCallback
+import org.junit.jupiter.api.extension.ExtendWith
+import org.junit.jupiter.api.extension.ExtensionContext
import org.mockito.kotlin.argumentCaptor
import org.mockito.kotlin.mock
import org.mockito.kotlin.timeout
@@ -50,7 +55,6 @@ import org.mockito.kotlin.verifyNoMoreInteractions
import org.testcontainers.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder
import java.time.Duration
import java.time.Instant
-import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.ExecutionException
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
@@ -58,6 +62,8 @@ import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference
+import java.util.concurrent.locks.ReentrantLock
+import kotlin.concurrent.withLock
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertNotNull
@@ -65,8 +71,24 @@ import kotlin.test.assertTrue
private const val CANCEL_REASON = "test request is canceled"
+@ExtendWith(DefaultGrpcRouterTest.ExecutionListener::class)
@IntegrationTest
internal class DefaultGrpcRouterTest {
+ /**
+ * Listener adds additional logging to help understanding from the stdout where test starts and finishes
+ */
+ internal class ExecutionListener : BeforeTestExecutionCallback, AfterTestExecutionCallback {
+ private val logger = KotlinLogging.logger { }
+ override fun beforeTestExecution(ctx: ExtensionContext) {
+ logger.info { "Execution for test '${ctx.testMethod.map { it.name }.orElse("unknown")}' started" }
+ }
+
+ override fun afterTestExecution(ctx: ExtensionContext) {
+ logger.info { "Execution for test '${ctx.testMethod.map { it.name }.orElse("unknown")}' is finished" }
+ }
+
+ }
+
@IntegrationTest
abstract inner class AbstractGrpcRouterTest {
private val grpcRouterClient = DefaultGrpcRouter()
@@ -415,6 +437,7 @@ internal class DefaultGrpcRouterTest {
)
}
+ @Disabled("this test isn't relevant for async request")
@Test
override fun `interrupt thread during retry request`() {
// this test isn't relevant for async request
@@ -751,6 +774,7 @@ internal class DefaultGrpcRouterTest {
)
}
+ @Disabled("this test isn't relevant for async request")
@Test
override fun `interrupt thread during retry request`() {
// this test isn't relevant for async request
@@ -825,7 +849,7 @@ internal class DefaultGrpcRouterTest {
ExceptionMetadata(
"UNAVAILABLE: io exception",
ExceptionMetadata(
- "Connection refused: localhost/127.0.0.1:8080",
+ "Connection refused: localhost/",
ExceptionMetadata(
"Connection refused"
)
@@ -851,7 +875,7 @@ internal class DefaultGrpcRouterTest {
ExceptionMetadata(
"UNAVAILABLE: io exception",
ExceptionMetadata(
- "Connection refused: localhost/127.0.0.1:8080",
+ "Connection refused: localhost/",
ExceptionMetadata(
"Connection refused"
)
@@ -880,7 +904,7 @@ internal class DefaultGrpcRouterTest {
ExceptionMetadata(
"UNAVAILABLE: io exception",
ExceptionMetadata(
- "Connection refused: localhost/127.0.0.1:8080",
+ "Connection refused: localhost/",
ExceptionMetadata(
"Connection refused"
)
@@ -915,7 +939,7 @@ internal class DefaultGrpcRouterTest {
shutdownNow()
} else {
shutdown()
- if (!awaitTermination(5, TimeUnit.SECONDS)) {
+ if (!awaitTermination(60, TimeUnit.SECONDS)) {
shutdownNow()
error("'Server' can't be closed")
}
@@ -928,9 +952,9 @@ internal class DefaultGrpcRouterTest {
exceptionMetadata: ExceptionMetadata,
path: List = emptyList()
) {
- assertEquals(
- exceptionMetadata.message,
- exception.message,
+ val expectedMessage = exceptionMetadata.message
+ assertTrue(
+ exception.message == expectedMessage || exception.message?.startsWith(expectedMessage ?: "null") == true,
"Message for exception: $exception, path: ${path.printAsStackTrace()}"
)
exceptionMetadata.suspended?.let { suspendMetadataList ->
@@ -962,7 +986,7 @@ internal class DefaultGrpcRouterTest {
private fun ExecutorService.shutdownGracefully() {
shutdown()
- if (!awaitTermination(1, TimeUnit.SECONDS)) {
+ if (!awaitTermination(30, TimeUnit.SECONDS)) {
shutdownNow()
error("'Executor' can't be stopped")
}
@@ -974,10 +998,25 @@ internal class DefaultGrpcRouterTest {
val suspended: List? = null
)
+ /**
+ * Baton class can help to synchronize two threads (only **two**).
+ *
+ * Baton class was migrated from using queue with size 1 to lock and conditions for synchronization.
+ *
+ * The implementation with queue did not provide guarantees that the same thread won't get the permit and put it back
+ * while another thread was still waiting for a free space in the queue.
+ *
+ * Using lock and conditions guarantees that the permit won't be given unless somebody is waiting for that permit.
+ * And vise-versa, nobody can get a permit unless somebody tries to put the permit
+ */
internal class Baton(
private val name: String
) {
- private val queue = ArrayBlockingQueue(1).apply { put(Any()) }
+ @Volatile
+ private var permits = 0
+ private val lock = ReentrantLock()
+ private val givenCondition = lock.newCondition()
+ private val getCondition = lock.newCondition()
fun giveAndGet(giveComment: String = "", getComment: String = "") {
give(giveComment)
@@ -986,13 +1025,25 @@ internal class DefaultGrpcRouterTest {
fun give(comment: String = "") {
K_LOGGER.info { "'$name' baton is giving by [${Thread.currentThread().name}] - $comment" }
- queue.put(Any())
+ lock.withLock {
+ if (permits == 0) {
+ getCondition.await()
+ }
+ permits += 1
+ givenCondition.signal()
+ }
K_LOGGER.info { "'$name' baton is given by [${Thread.currentThread().name}] - $comment" }
}
fun get(comment: String = "") {
K_LOGGER.info { "'$name' baton is getting by [${Thread.currentThread().name}] - $comment" }
- queue.poll()
+ lock.withLock {
+ getCondition.signal()
+ permits -= 1
+ if (permits < 0) {
+ givenCondition.await()
+ }
+ }
K_LOGGER.info { "'$name' baton is got by [${Thread.currentThread().name}] - $comment" }
}
}
@@ -1009,8 +1060,8 @@ internal class DefaultGrpcRouterTest {
}.build())
responseBaton?.let {
- Thread.sleep(1_000)
it.give("response sent")
+ Thread.sleep(1_000)
}
if (complete) {
@@ -1027,8 +1078,8 @@ internal class DefaultGrpcRouterTest {
}
responseBaton?.let {
- Thread.sleep(1_000)
it.give("response sent")
+ Thread.sleep(1_000)
}
if (complete) {
diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManualBenchmark.kt b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManualBenchmark.kt
new file mode 100644
index 000000000..2f8fa25a6
--- /dev/null
+++ b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManualBenchmark.kt
@@ -0,0 +1,151 @@
+/*
+ * Copyright 2024 Exactpro (Exactpro Systems Limited)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.exactpro.th2.common.schema.message.impl.rabbitmq.connection
+
+import com.exactpro.th2.common.schema.message.ContainerConstants.RABBITMQ_IMAGE_NAME
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.ConnectionManagerConfiguration
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.RabbitMQConfiguration
+import mu.KotlinLogging
+import org.testcontainers.containers.RabbitMQContainer
+import java.time.Duration
+import java.util.concurrent.ThreadLocalRandom
+
+object ConnectionManualBenchmark {
+ private val LOGGER = KotlinLogging.logger {}
+ @JvmStatic
+ fun main(args: Array) {
+ RabbitMQContainer(RABBITMQ_IMAGE_NAME).use { container ->
+ val payload: ByteArray = ByteArray(256 * 1024).also {
+ ThreadLocalRandom.current().nextBytes(it)
+ }
+ container.start()
+ with(container) {
+ val queueName = "test-queue"
+ val exchangeName = "test-exchange"
+ execInContainer("rabbitmqadmin", "declare", "queue", "name=$queueName", "durable=false")
+ execInContainer("rabbitmqadmin", "declare", "exchange", "name=$exchangeName", "type=fanout")
+ execInContainer("rabbitmqadmin", "declare", "binding", "source=$exchangeName", "destination_type=queue", "destination=$queueName")
+
+ val consumer = createConnectionManager(
+ container,
+ ConnectionManagerConfiguration(
+ prefetchCount = 1000,
+ confirmationTimeout = Duration.ofSeconds(5),
+ enablePublisherConfirmation = true,
+ maxInflightPublicationsBytes = 1024 * 1024 * 25,
+ )
+ )
+
+ val connectionManagerWithConfirmation = {
+ createConnectionManager(
+ container,
+ ConnectionManagerConfiguration(
+ prefetchCount = 100,
+ confirmationTimeout = Duration.ofSeconds(5),
+ enablePublisherConfirmation = true,
+ maxInflightPublicationsBytes = 1024 * 1024 * 1,
+ )
+ )
+ }
+
+ val connectionManagerWithoutConfirmation = {
+ createConnectionManager(
+ container,
+ ConnectionManagerConfiguration(
+ prefetchCount = 100,
+ confirmationTimeout = Duration.ofSeconds(5),
+ enablePublisherConfirmation = false,
+ maxInflightPublicationsBytes = -1,
+ )
+ )
+ }
+
+ consumer.use {
+ consumer.basicConsume(
+ queueName,
+ { _, _, ack -> ack.confirm() },
+ { LOGGER.warn { "Canceled" } },
+ )
+ val times = 5
+ val withConf = ArrayList(times)
+ val withoutConf = ArrayList(times)
+ repeat(times) {
+ // Avg for no confirmation: PT19.468S
+ withoutConf += measure("not confirmation", connectionManagerWithoutConfirmation, payload)
+
+ // Avg for confirmation: PT20.758S
+ withConf += measure("confirmation", connectionManagerWithConfirmation, payload)
+ }
+
+ fun List.avg(): Duration {
+ return map { it.toMillis() }.average().let { Duration.ofMillis(it.toLong()) }
+ }
+
+ LOGGER.info { "Avg for confirmation: ${withConf.avg()}" }
+ LOGGER.info { "Avg for no confirmation: ${withoutConf.avg()}" }
+ }
+ }
+ }
+ }
+
+ private fun measure(name: String, manager: () -> ConnectionManager, payload: ByteArray): Duration {
+ LOGGER.info("Measuring $name")
+ val start: Long
+ val sent: Long
+ manager().use { mgr ->
+ repeat(100) {
+ mgr.basicPublish(
+ "test-exchange",
+ "routing",
+ null,
+ payload,
+ )
+ }
+ LOGGER.info("Wait after warmup for $name")
+ Thread.sleep(1000)
+ LOGGER.info("Start measuring for $name")
+ start = System.currentTimeMillis()
+ repeat(100_000) {
+ mgr.basicPublish(
+ "test-exchange",
+ "routing",
+ null,
+ payload,
+ )
+ }
+ sent = System.currentTimeMillis()
+ }
+ val end = System.currentTimeMillis()
+ LOGGER.info("Sent $name in ${Duration.ofMillis(sent - start)}")
+ val duration = Duration.ofMillis(end - start)
+ LOGGER.info("Executed $name in $duration")
+ return duration
+ }
+
+ private fun createConnectionManager(container: RabbitMQContainer, configuration: ConnectionManagerConfiguration) =
+ ConnectionManager(
+ "test-connection",
+ RabbitMQConfiguration(
+ host = container.host,
+ vHost = "",
+ port = container.amqpPort,
+ username = container.adminUsername,
+ password = container.adminPassword,
+ ),
+ configuration
+ )
+}
\ No newline at end of file
diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt
index 7448ef191..d96e1135e 100644
--- a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt
+++ b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt
@@ -31,17 +31,14 @@ import com.exactpro.th2.common.util.getChannelsInfo
import com.exactpro.th2.common.util.getQueuesInfo
import com.exactpro.th2.common.util.getSubscribedChannelsCount
import com.exactpro.th2.common.util.putMessageInQueue
+import com.github.dockerjava.api.model.Capability
import com.rabbitmq.client.BuiltinExchangeType
-import java.time.Duration
-import java.util.concurrent.ArrayBlockingQueue
-import java.util.concurrent.CountDownLatch
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic.AtomicInteger
-import kotlin.concurrent.thread
import com.rabbitmq.client.CancelCallback
import com.rabbitmq.client.Delivery
import mu.KotlinLogging
+import org.awaitility.Awaitility
import org.junit.jupiter.api.AfterAll
+import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertFalse
@@ -49,13 +46,208 @@ import org.junit.jupiter.api.Assertions.assertNotEquals
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.Test
+import org.slf4j.LoggerFactory
+import org.testcontainers.containers.GenericContainer
import org.testcontainers.containers.RabbitMQContainer
+import org.testcontainers.containers.output.Slf4jLogConsumer
import org.testcontainers.utility.MountableFile
import java.io.IOException
+import java.time.Duration
+import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.CompletableFuture
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicInteger
+import kotlin.concurrent.thread
import kotlin.test.assertFailsWith
@IntegrationTest
class TestConnectionManager {
+
+ @Test
+ fun `connection manager redelivers unconfirmed messages`() {
+ val routingKey = "routingKey1"
+ val queueName = "queue1"
+ val exchange = "test-exchange1"
+ rabbit
+ .let { rabbit ->
+ declareQueue(rabbit, queueName)
+ declareFanoutExchangeWithBinding(rabbit, exchange, queueName)
+ LOGGER.info { "Started with port ${rabbit.amqpPort}" }
+ val messagesCount = 10
+ val countDown = CountDownLatch(messagesCount)
+ val messageSizeBytes = 7
+ createConnectionManager(
+ rabbit, ConnectionManagerConfiguration(
+ subscriberName = "test",
+ prefetchCount = DEFAULT_PREFETCH_COUNT,
+ confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT,
+ enablePublisherConfirmation = true,
+ maxInflightPublicationsBytes = 5 * messageSizeBytes,
+ heartbeatIntervalSeconds = 1,
+ minConnectionRecoveryTimeout = 2000,
+ maxConnectionRecoveryTimeout = 2000,
+ // to avoid unexpected delays before recovery
+ retryTimeDeviationPercent = 0,
+ )
+ ).use { manager ->
+ val receivedMessages = linkedSetOf()
+ manager.basicConsume(queueName, { _, delivery, ack ->
+ val message = delivery.body.toString(Charsets.UTF_8)
+ LOGGER.info { "Received $message from ${delivery.envelope.routingKey}" }
+ if (receivedMessages.add(message)) {
+ // decrement only unique messages
+ countDown.countDown()
+ } else {
+ LOGGER.warn { "Duplicated $message for ${delivery.envelope.routingKey}" }
+ }
+ ack.confirm()
+ }) {
+ LOGGER.info { "Canceled $it" }
+ }
+
+
+ var future: CompletableFuture<*>? = null
+ repeat(messagesCount) { index ->
+ if (index == 1) {
+ // delay should allow ack for the first message be received
+ Awaitility.await("first message is confirmed")
+ .pollInterval(10, TimeUnit.MILLISECONDS)
+ .atMost(100, TimeUnit.MILLISECONDS)
+ .until { countDown.count == messagesCount - 1L }
+ // Man pages:
+ // https://man7.org/linux/man-pages/man8/tc-netem.8.html
+ // https://man7.org/linux/man-pages/man8/ifconfig.8.html
+ //
+ // Here we try to emulate network outage to cause missing publication confirmations.
+ //
+ // In real life we will probably get duplicates in this case because
+ // rabbitmq does not provide exactly-once semantic.
+ // So, we will have to deal with it on the consumer side
+ rabbit.executeInContainerWithLogging("ifconfig", "eth0", "down")
+ } else if (index == 4) {
+ future = CompletableFuture.supplyAsync {
+ // Interface is unblock in separate thread to emulate more realistic scenario
+
+ // More than 2 HB will be missed
+ // This is enough for rabbitmq server to understand the connection is lost
+ Awaitility.await("connection is closed")
+ .atMost(3, TimeUnit.SECONDS)
+ .until { !manager.isOpen }
+ // enabling network interface back
+ rabbit.executeInContainerWithLogging("ifconfig", "eth0", "up")
+ }
+ }
+ manager.basicPublish(exchange, routingKey, null, "Hello $index".toByteArray(Charsets.UTF_8))
+ }
+
+ future?.get(30, TimeUnit.SECONDS)
+
+ countDown.assertComplete { "Not all messages were received: $receivedMessages" }
+ assertEquals(
+ (0 until messagesCount).map {
+ "Hello $it"
+ },
+ receivedMessages.toList(),
+ "messages received in unexpected order",
+ )
+ }
+ }
+ }
+
+ @Test
+ fun `connection manager redelivers unconfirmed messages on close`() {
+ val routingKey = "routingKey1"
+ val queueName = "queue1"
+ val exchange = "test-exchange1"
+ rabbit
+ .let { rabbit ->
+ declareQueue(rabbit, queueName)
+ declareFanoutExchangeWithBinding(rabbit, exchange, queueName)
+ LOGGER.info { "Started with port ${rabbit.amqpPort}" }
+ val messagesCount = 10
+ val countDown = CountDownLatch(messagesCount)
+ val messageSizeBytes = 7
+ createConnectionManager(
+ rabbit, ConnectionManagerConfiguration(
+ subscriberName = "test",
+ prefetchCount = DEFAULT_PREFETCH_COUNT,
+ confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT,
+ minConnectionRecoveryTimeout = 2000,
+ maxConnectionRecoveryTimeout = 2000,
+ // to avoid unexpected delays before recovery
+ retryTimeDeviationPercent = 0,
+ )
+ ).use { manager ->
+ val receivedMessages = linkedSetOf()
+ manager.basicConsume(queueName, { _, delivery, ack ->
+ val message = delivery.body.toString(Charsets.UTF_8)
+ LOGGER.info { "Received $message from ${delivery.envelope.routingKey}" }
+ if (receivedMessages.add(message)) {
+ // decrement only unique messages
+ countDown.countDown()
+ } else {
+ LOGGER.warn { "Duplicated $message for ${delivery.envelope.routingKey}" }
+ }
+ ack.confirm()
+ }) {
+ LOGGER.info { "Canceled $it" }
+ }
+
+ createConnectionManager(
+ rabbit, ConnectionManagerConfiguration(
+ prefetchCount = DEFAULT_PREFETCH_COUNT,
+ confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT,
+ enablePublisherConfirmation = true,
+ maxInflightPublicationsBytes = messagesCount * 2 * messageSizeBytes,
+ heartbeatIntervalSeconds = 1,
+ minConnectionRecoveryTimeout = 2000,
+ maxConnectionRecoveryTimeout = 2000,
+ // to avoid unexpected delays before recovery
+ retryTimeDeviationPercent = 0,
+ )
+ ).use { managerWithConfirmation ->
+ repeat(messagesCount) { index ->
+ if (index == 1) {
+ // delay should allow ack for the first message be received
+ Awaitility.await("first message is confirmed")
+ .pollInterval(10, TimeUnit.MILLISECONDS)
+ .atMost(100, TimeUnit.MILLISECONDS)
+ .until { countDown.count == messagesCount - 1L }
+ // looks like if nothing is sent yet through the channel
+ // it will detekt connection lost right away and start waiting for recovery
+ rabbit.executeInContainerWithLogging("ifconfig", "eth0", "down")
+ }
+ managerWithConfirmation.basicPublish(
+ exchange,
+ routingKey,
+ null,
+ "Hello $index".toByteArray(Charsets.UTF_8)
+ )
+ }
+ // ensure connection is closed because of HB timeout
+ Awaitility.await("connection is closed")
+ .atMost(4, TimeUnit.SECONDS)
+ .until { !managerWithConfirmation.isOpen }
+ rabbit.executeInContainerWithLogging("ifconfig", "eth0", "up")
+ // wait for connection to recover before closing
+ Awaitility.await("connection is recovered")
+ .atMost(4, TimeUnit.SECONDS)
+ .until { managerWithConfirmation.isOpen }
+ }
+
+ countDown.assertComplete { "Not all messages were received: $receivedMessages" }
+ assertEquals(
+ (0 until messagesCount).map {
+ "Hello $it"
+ },
+ receivedMessages.toList(),
+ "messages received in unexpected order",
+ )
+ }
+ }
+ }
+
@Test
fun `connection manager reports unacked messages when confirmation timeout elapsed`() {
val routingKey = "routingKey1"
@@ -189,7 +381,8 @@ class TestConnectionManager {
.let {
declareQueue(rabbit, queueName)
LOGGER.info { "Started with port ${it.amqpPort}" }
- val counter = AtomicInteger(0)
+ val counter = AtomicInteger()
+ val downLatch = CountDownLatch(1)
createConnectionManager(
it,
ConnectionManagerConfiguration(
@@ -205,6 +398,7 @@ class TestConnectionManager {
try {
monitor = connectionManager.basicConsume(queueName, { _, delivery, _ ->
counter.incrementAndGet()
+ downLatch.countDown()
LOGGER.info { "Received ${delivery.body.toString(Charsets.UTF_8)} from \"${delivery.envelope.routingKey}\"" }
}) {
LOGGER.info { "Canceled $it" }
@@ -216,19 +410,18 @@ class TestConnectionManager {
LOGGER.info { "Publication finished!" }
assertEquals(
0,
- counter.get()
+ counter.get(),
) { "Unexpected number of messages received. The first message shouldn't be received" }
- Thread.sleep(200)
LOGGER.info { "Creating the correct exchange..." }
declareFanoutExchangeWithBinding(it, exchange, queueName)
- Thread.sleep(200)
LOGGER.info { "Exchange created!" }
Assertions.assertDoesNotThrow {
connectionManager.basicPublish(exchange, "", null, "Hello2".toByteArray(Charsets.UTF_8))
}
- Thread.sleep(200)
+ downLatch.assertComplete(1L, TimeUnit.SECONDS) { "no messages were received" }
+
assertEquals(
1,
counter.get()
@@ -408,9 +601,10 @@ class TestConnectionManager {
fun `connection manager receives a messages after container restart`() {
val queueName = "queue5"
val amqpPort = 5672
- val container = object : RabbitMQContainer(RABBITMQ_IMAGE_NAME) {
- init { super.addFixedExposedPort(amqpPort, amqpPort) }
- }
+ val container = RabbitMQContainer(RABBITMQ_IMAGE_NAME)
+ .apply {
+ portBindings = listOf("$amqpPort:$amqpPort")
+ }
container
.use {
@@ -873,13 +1067,22 @@ class TestConnectionManager {
}
}
- private fun CountDownLatch.assertComplete(message: String) {
+ private fun CountDownLatch.assertComplete(
+ message: String,
+ timeout: Long = 1,
+ timeUnit: TimeUnit = TimeUnit.SECONDS,
+ ) {
+ assertComplete(timeout, timeUnit) { message }
+ }
+
+ private fun CountDownLatch.assertComplete(
+ timeout: Long = 1,
+ timeUnit: TimeUnit = TimeUnit.SECONDS,
+ messageSupplier: () -> String,
+ ) {
assertTrue(
- await(
- 1L,
- TimeUnit.SECONDS
- )
- ) { "$message, actual count: $count" }
+ await(timeout, timeUnit)
+ ) { "${messageSupplier()}, actual count: $count" }
}
private fun assertTarget(target: T, timeout: Long = 1_000, message: String, func: () -> T) {
@@ -987,15 +1190,41 @@ class TestConnectionManager {
)
)
+ @AfterEach
+ fun cleanupRabbitMq() {
+ // cleanup is done to prevent queue name collision during test
+ rabbit.apply {
+ executeInContainerWithLogging("rabbitmqctl", "stop_app")
+ executeInContainerWithLogging("rabbitmqctl", "reset")
+ executeInContainerWithLogging("rabbitmqctl", "start_app")
+ }
+ }
+
companion object {
private val LOGGER = KotlinLogging.logger { }
private lateinit var rabbit: RabbitMQContainer
+ @JvmStatic
+ fun GenericContainer<*>.executeInContainerWithLogging(vararg command: String, exceptionOnExecutionError: Boolean = true) {
+ execInContainer(*command).also {
+ LOGGER.info { "Command: ${command.joinToString(separator = " ")}; out: ${it.stdout}; err: ${it.stderr}; exit code: ${it.exitCode}" }
+ if (exceptionOnExecutionError && it.exitCode != 0) {
+ throw IllegalStateException("Command ${command.joinToString()} exited with error code: ${it.exitCode}")
+ }
+ }
+ }
+
@BeforeAll
@JvmStatic
fun initRabbit() {
rabbit = RabbitMQContainer(RABBITMQ_IMAGE_NAME)
+ .withLogConsumer(Slf4jLogConsumer(LoggerFactory.getLogger("rabbitmq")))
+ .withCreateContainerCmdModifier {
+ it.hostConfig
+ // required to use tc tool to emulate network problems
+ ?.withCapAdd(Capability.NET_ADMIN)
+ }
rabbit.start()
}