From 0be84cfd008c1718beea0cc0828747007932030a Mon Sep 17 00:00:00 2001 From: Oleg Smelov <oleg.smelov@exactpro.com> Date: Wed, 10 Jul 2024 17:11:31 +0400 Subject: [PATCH] ConsumeConnectionManager & PublishConnectionManager --- README.md | 6 +- gradle.properties | 2 +- .../common/schema/event/EventBatchSender.java | 10 +- .../schema/event/EventBatchSubscriber.java | 10 +- .../schema/factory/AbstractCommonFactory.java | 26 +- .../common/schema/message/MessageRouter.java | 23 +- .../impl/rabbitmq/AbstractRabbitSender.java | 15 +- .../rabbitmq/AbstractRabbitSubscriber.java | 13 +- .../connection/ConnectionManager.java | 236 +----- .../schema/message/MessageRouterContext.kt | 10 +- .../context/DefaultMessageRouterContext.kt | 10 +- .../impl/rabbitmq/AbstractRabbitRouter.kt | 11 +- .../connection/ConsumeConnectionManager.kt | 217 ++++++ .../connection/PublishConnectionManager.kt | 59 ++ .../rabbitmq/custom/RabbitCustomRouter.kt | 12 +- .../group/RabbitMessageGroupBatchSender.kt | 8 +- .../RabbitMessageGroupBatchSubscriber.kt | 9 +- .../NotificationEventBatchSender.kt | 7 +- .../NotificationEventBatchSubscriber.kt | 10 +- .../transport/TransportGroupBatchSender.kt | 8 +- .../TransportGroupBatchSubscriber.kt | 9 +- .../AbstractRabbitRouterIntegrationTest.kt | 53 +- .../impl/rabbitmq/AbstractRabbitRouterTest.kt | 17 +- .../connection/ConnectionManualBenchmark.kt | 31 +- .../connection/TestConnectionManager.kt | 696 +++++++++--------- ...IntegrationTestRabbitMessageBatchRouter.kt | 103 +-- .../group/TestRabbitMessageBatchRouter.kt | 29 +- ...ransportGroupBatchRouterIntegrationTest.kt | 111 +-- .../TransportGroupBatchRouterTest.kt | 36 +- .../common/util/RabbitTestContainerUtil.kt | 11 +- 30 files changed, 990 insertions(+), 808 deletions(-) create mode 100644 src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConsumeConnectionManager.kt create mode 100644 src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/PublishConnectionManager.kt diff --git a/README.md b/README.md index a29d9b81c..413fbadba 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# th2 common library (Java) (5.13.1) +# th2 common library (Java) (5.14.0) ## Usage @@ -513,7 +513,7 @@ dependencies { ### 5.14.0-dev -+ Separate connections for publisher and consumer ++ Separate connections for publisher and consumer (allows to consume while publishing is blocked by RabbitMQ) + Updated cradle `5.4.1-dev` + Updated kubernetes-client: `6.13.1` @@ -611,7 +611,7 @@ dependencies { ### 5.4.1-dev #### Fix -+ `SubscriberMonitor` is returned from `MessageRouter.subscribe` methods is proxy object to manage RabbitMQ subscribtion without internal listener ++ `SubscriberMonitor` is returned from `MessageRouter.subscribe` methods is proxy object to manage RabbitMQ subscription without internal listener ### 5.4.0-dev #### Updated diff --git a/gradle.properties b/gradle.properties index f8a2b4afb..5c8ad9276 100644 --- a/gradle.properties +++ b/gradle.properties @@ -17,4 +17,4 @@ release_version=5.14.0 kotlin_version=1.8.22 description='th2 common library (Java)' vcs_url=https://github.com/th2-net/th2-common-j -kapt.include.compile.classpath=false +kapt.include.compile.classpath=true diff --git a/src/main/java/com/exactpro/th2/common/schema/event/EventBatchSender.java b/src/main/java/com/exactpro/th2/common/schema/event/EventBatchSender.java index 810e5dc8a..21eff2dfa 100644 --- a/src/main/java/com/exactpro/th2/common/schema/event/EventBatchSender.java +++ b/src/main/java/com/exactpro/th2/common/schema/event/EventBatchSender.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,7 +24,7 @@ import com.exactpro.th2.common.grpc.EventBatch; import com.exactpro.th2.common.message.MessageUtils; import com.exactpro.th2.common.schema.message.impl.rabbitmq.AbstractRabbitSender; -import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager; +import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.PublishConnectionManager; import static com.exactpro.th2.common.metrics.CommonMetrics.TH2_PIN_LABEL; import static com.exactpro.th2.common.schema.event.EventBatchRouter.EVENT_TYPE; @@ -39,13 +39,13 @@ public class EventBatchSender extends AbstractRabbitSender<EventBatch> { .register(); public EventBatchSender( - @NotNull ConnectionManager connectionManager, + @NotNull PublishConnectionManager publishConnectionManager, @NotNull String exchangeName, @NotNull String routingKey, @NotNull String th2Pin, @NotNull String bookName ) { - super(connectionManager, exchangeName, routingKey, th2Pin, EVENT_TYPE, bookName); + super(publishConnectionManager, exchangeName, routingKey, th2Pin, EVENT_TYPE, bookName); } @Override @@ -82,4 +82,4 @@ protected String toShortTraceString(EventBatch value) { protected String toShortDebugString(EventBatch value) { return "EventBatch: parent_event_id = " + value.getParentEventId().getId(); } -} +} \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/common/schema/event/EventBatchSubscriber.java b/src/main/java/com/exactpro/th2/common/schema/event/EventBatchSubscriber.java index 78642b061..29d375b2f 100644 --- a/src/main/java/com/exactpro/th2/common/schema/event/EventBatchSubscriber.java +++ b/src/main/java/com/exactpro/th2/common/schema/event/EventBatchSubscriber.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,7 +21,7 @@ import com.exactpro.th2.common.schema.message.DeliveryMetadata; import com.exactpro.th2.common.schema.message.ManualAckDeliveryCallback.Confirmation; import com.exactpro.th2.common.schema.message.impl.rabbitmq.AbstractRabbitSubscriber; -import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager; +import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConsumeConnectionManager; import com.rabbitmq.client.Delivery; import io.prometheus.client.Counter; import org.jetbrains.annotations.NotNull; @@ -41,12 +41,12 @@ public class EventBatchSubscriber extends AbstractRabbitSubscriber<EventBatch> { .register(); public EventBatchSubscriber( - @NotNull ConnectionManager connectionManager, + @NotNull ConsumeConnectionManager consumeConnectionManager, @NotNull String queue, @NotNull String th2Pin, @NotNull ConfirmationListener<EventBatch> listener ) { - super(connectionManager, queue, th2Pin, EVENT_TYPE, listener); + super(consumeConnectionManager, queue, th2Pin, EVENT_TYPE, listener); } @Override @@ -78,4 +78,4 @@ protected void handle(DeliveryMetadata deliveryMetadata, Delivery delivery, Even .inc(value.getEventsCount()); super.handle(deliveryMetadata, delivery, value, confirmation); } -} +} \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/common/schema/factory/AbstractCommonFactory.java b/src/main/java/com/exactpro/th2/common/schema/factory/AbstractCommonFactory.java index f2595d769..eb36b4374 100644 --- a/src/main/java/com/exactpro/th2/common/schema/factory/AbstractCommonFactory.java +++ b/src/main/java/com/exactpro/th2/common/schema/factory/AbstractCommonFactory.java @@ -1,5 +1,6 @@ /* * Copyright 2020-2024 Exactpro (Exactpro Systems Limited) + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -52,7 +53,8 @@ import com.exactpro.th2.common.schema.message.impl.monitor.LogMessageRouterMonitor; import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.ConnectionManagerConfiguration; import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.RabbitMQConfiguration; -import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager; +import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.PublishConnectionManager; +import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConsumeConnectionManager; import com.exactpro.th2.common.schema.message.impl.rabbitmq.custom.MessageConverter; import com.exactpro.th2.common.schema.message.impl.rabbitmq.custom.RabbitCustomRouter; import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.GroupBatch; @@ -137,10 +139,10 @@ public abstract class AbstractCommonFactory implements AutoCloseable { private final Class<? extends MessageRouter<EventBatch>> eventBatchRouterClass; private final Class<? extends GrpcRouter> grpcRouterClass; private final Class<? extends NotificationRouter<EventBatch>> notificationEventBatchRouterClass; - private final LazyProvider<ConnectionManager> rabbitMqPublishConnectionManager = - lazyAutocloseable("publish-connection-manager", this::createRabbitMQConnectionManager); - private final LazyProvider<ConnectionManager> rabbitMqConsumeConnectionManager = - lazyAutocloseable("consume-connection-manager", this::createRabbitMQConnectionManager); + private final LazyProvider<PublishConnectionManager> rabbitMqPublishConnectionManager = + lazyAutocloseable("publish-connection-manager", this::createRabbitMQPublishConnectionManager); + private final LazyProvider<ConsumeConnectionManager> rabbitMqConsumeConnectionManager = + lazyAutocloseable("consume-connection-manager", this::createRabbitMQConsumeConnectionManager); private final LazyProvider<MessageRouterContext> routerContext = lazy("router-context", this::createMessageRouterContext); private final LazyProvider<MessageRouter<MessageBatch>> messageRouterParsedBatch = @@ -662,15 +664,19 @@ protected PrometheusConfiguration loadPrometheusConfiguration() { return getConfigurationOrLoad(PrometheusConfiguration.class, true); } - protected ConnectionManager createRabbitMQConnectionManager() { - return new ConnectionManager(getBoxConfiguration().getBoxName(), getRabbitMqConfiguration(), getConnectionManagerConfiguration()); + protected PublishConnectionManager createRabbitMQPublishConnectionManager() { + return new PublishConnectionManager(getBoxConfiguration().getBoxName(), getRabbitMqConfiguration(), getConnectionManagerConfiguration()); + } + + protected ConsumeConnectionManager createRabbitMQConsumeConnectionManager() { + return new ConsumeConnectionManager(getBoxConfiguration().getBoxName(), getRabbitMqConfiguration(), getConnectionManagerConfiguration()); } - protected ConnectionManager getRabbitMqPublishConnectionManager() { + protected PublishConnectionManager getRabbitMqPublishConnectionManager() { return rabbitMqPublishConnectionManager.get(); } - protected ConnectionManager getRabbitMqConsumeConnectionManager() { + protected ConsumeConnectionManager getRabbitMqConsumeConnectionManager() { return rabbitMqConsumeConnectionManager.get(); } @@ -756,4 +762,4 @@ protected static void configureLogger(Path... paths) { log4jConfigUtils.configure(listPath, LOG4J2_PROPERTIES_NAME); ExactproMetaInf.logging(); } -} +} \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/common/schema/message/MessageRouter.java b/src/main/java/com/exactpro/th2/common/schema/message/MessageRouter.java index db9155034..5099e0a27 100644 --- a/src/main/java/com/exactpro/th2/common/schema/message/MessageRouter.java +++ b/src/main/java/com/exactpro/th2/common/schema/message/MessageRouter.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2022 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2024 Exactpro (Exactpro Systems Limited) * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -16,32 +16,14 @@ package com.exactpro.th2.common.schema.message; import com.exactpro.th2.common.grpc.MessageGroupBatch; -import com.exactpro.th2.common.schema.box.configuration.BoxConfiguration; -import com.exactpro.th2.common.schema.message.configuration.MessageRouterConfiguration; -import com.exactpro.th2.common.schema.message.impl.context.DefaultMessageRouterContext; -import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager; import org.jetbrains.annotations.NotNull; - import java.io.IOException; -import java.util.Objects; /** * Interface for send and receive RabbitMQ messages * @param <T> messages for send and receive */ public interface MessageRouter<T> extends AutoCloseable { - - /** - * Initialization message router - * @param configuration message router configuration - */ - @Deprecated(since = "3.2.2", forRemoval = true) - default void init(@NotNull ConnectionManager connectionManager, @NotNull MessageRouterConfiguration configuration) { - Objects.requireNonNull(connectionManager, "Connection owner can not be null"); - Objects.requireNonNull(configuration, "Configuration cannot be null"); - init(new DefaultMessageRouterContext(connectionManager, connectionManager, MessageRouterMonitor.DEFAULT_MONITOR, configuration, new BoxConfiguration())); - } - default void init(@NotNull MessageRouterContext context, @NotNull MessageRouter<MessageGroupBatch> groupBatchRouter) { init(context); } @@ -137,5 +119,4 @@ default void send(T message) throws IOException { * @throws IOException if can not send message */ void sendAll(T message, String... queueAttr) throws IOException; - -} +} \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitSender.java b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitSender.java index dfa769dce..0f945f1c3 100644 --- a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitSender.java +++ b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitSender.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,6 +25,7 @@ import com.exactpro.th2.common.schema.message.MessageSender; import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager; +import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.PublishConnectionManager; import static com.exactpro.th2.common.metrics.CommonMetrics.EXCHANGE_LABEL; import static com.exactpro.th2.common.metrics.CommonMetrics.ROUTING_KEY_LABEL; @@ -54,18 +55,18 @@ public abstract class AbstractRabbitSender<T> implements MessageSender<T> { protected final String bookName; private final AtomicReference<String> routingKey = new AtomicReference<>(); private final AtomicReference<String> exchangeName = new AtomicReference<>(); - private final AtomicReference<ConnectionManager> connectionManager = new AtomicReference<>(); + private final AtomicReference<PublishConnectionManager> publishConnectionManager = new AtomicReference<>(); private final String th2Type; public AbstractRabbitSender( - @NotNull ConnectionManager connectionManager, + @NotNull PublishConnectionManager publishConnectionManager, @NotNull String exchangeName, @NotNull String routingKey, @NotNull String th2Pin, @NotNull String th2Type, @NotNull String bookName ) { - this.connectionManager.set(requireNonNull(connectionManager, "Connection can not be null")); + this.publishConnectionManager.set(requireNonNull(publishConnectionManager, "Connection manager can not be null")); this.exchangeName.set(requireNonNull(exchangeName, "Exchange name can not be null")); this.routingKey.set(requireNonNull(routingKey, "Routing key can not be null")); this.th2Pin = requireNonNull(th2Pin, "TH2 pin can not be null"); @@ -84,7 +85,7 @@ public void send(T value) throws IOException { requireNonNull(value, "Value for send can not be null"); try { - ConnectionManager connection = this.connectionManager.get(); + PublishConnectionManager connectionManager = this.publishConnectionManager.get(); byte[] bytes = valueToBytes(value); MESSAGE_SIZE_PUBLISH_BYTES .labels(th2Pin, th2Type, exchangeName.get(), routingKey.get()) @@ -92,7 +93,7 @@ public void send(T value) throws IOException { MESSAGE_PUBLISH_TOTAL .labels(th2Pin, th2Type, exchangeName.get(), routingKey.get()) .inc(); - connection.basicPublish(exchangeName.get(), routingKey.get(), null, bytes); + connectionManager.basicPublish(exchangeName.get(), routingKey.get(), null, bytes); if (LOGGER.isTraceEnabled()) { LOGGER.trace("Message sent to exchangeName='{}', routing key='{}': '{}'", @@ -111,4 +112,4 @@ public void send(T value) throws IOException { protected abstract String toShortDebugString(T value); protected abstract byte[] valueToBytes(T value); -} +} \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitSubscriber.java b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitSubscriber.java index b9fa6cd7b..23ffbafa2 100644 --- a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitSubscriber.java +++ b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitSubscriber.java @@ -1,5 +1,6 @@ /* - * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2024 Exactpro (Exactpro Systems Limited) + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -21,7 +22,7 @@ import com.exactpro.th2.common.schema.message.ManualAckDeliveryCallback.Confirmation; import com.exactpro.th2.common.schema.message.MessageSubscriber; import com.exactpro.th2.common.schema.message.SubscriberMonitor; -import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager; +import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConsumeConnectionManager; import com.google.common.base.Suppliers; import com.google.common.io.BaseEncoding; import com.rabbitmq.client.Delivery; @@ -69,7 +70,7 @@ public abstract class AbstractRabbitSubscriber<T> implements MessageSubscriber { private final boolean manualConfirmation; private final ConfirmationListener<T> listener; private final String queue; - private final ConnectionManager connectionManager; + private final ConsumeConnectionManager consumeConnectionManager; private final AtomicReference<Supplier<SubscriberMonitor>> consumerMonitor = new AtomicReference<>(emptySupplier()); private final AtomicBoolean isAlive = new AtomicBoolean(true); private final String th2Type; @@ -78,13 +79,13 @@ public abstract class AbstractRabbitSubscriber<T> implements MessageSubscriber { protected final String th2Pin; public AbstractRabbitSubscriber( - @NotNull ConnectionManager connectionManager, + @NotNull ConsumeConnectionManager consumeConnectionManager, @NotNull String queue, @NotNull String th2Pin, @NotNull String th2Type, @NotNull ConfirmationListener<T> listener ) { - this.connectionManager = requireNonNull(connectionManager, "Connection can not be null"); + this.consumeConnectionManager = requireNonNull(consumeConnectionManager, "Connection can not be null"); this.queue = requireNonNull(queue, "Queue can not be null"); this.th2Pin = requireNonNull(th2Pin, "th2 pin can not be null"); this.th2Type = requireNonNull(th2Type, "th2 type can not be null"); @@ -186,7 +187,7 @@ private void resubscribe() { private SubscriberMonitor basicConsume() { try { LOGGER.info("Start listening queue name='{}', th2 pin='{}'", queue, th2Pin); - return connectionManager.basicConsume(queue, this::handle, this::canceled); + return consumeConnectionManager.basicConsume(queue, this::handle, this::canceled); } catch (IOException e) { throw new IllegalStateException("Can not subscribe to queue = " + queue, e); } catch (InterruptedException e) { diff --git a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java index 5292dc0de..99d416225 100644 --- a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java +++ b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java @@ -17,16 +17,11 @@ package com.exactpro.th2.common.schema.message.impl.rabbitmq.connection; import com.exactpro.th2.common.metrics.HealthMetrics; -import com.exactpro.th2.common.schema.message.DeliveryMetadata; -import com.exactpro.th2.common.schema.message.ExclusiveSubscriberMonitor; import com.exactpro.th2.common.schema.message.ManualAckDeliveryCallback; -import com.exactpro.th2.common.schema.message.ManualAckDeliveryCallback.Confirmation; -import com.exactpro.th2.common.schema.message.impl.OnlyOnceConfirmation; import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.ConnectionManagerConfiguration; import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.RabbitMQConfiguration; import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.RetryingDelay; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.BlockedListener; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; @@ -34,7 +29,6 @@ import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; -import com.rabbitmq.client.Envelope; import com.rabbitmq.client.ExceptionHandler; import com.rabbitmq.client.Method; import com.rabbitmq.client.Recoverable; @@ -64,7 +58,6 @@ import java.util.Map; import java.util.NavigableMap; import java.util.Objects; -import java.util.Collections; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -73,7 +66,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; @@ -84,23 +76,22 @@ import java.util.function.BiConsumer; import java.util.function.Supplier; -public class ConnectionManager implements AutoCloseable { +public abstract class ConnectionManager implements AutoCloseable { public static final String EMPTY_ROUTING_KEY = ""; private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionManager.class); private final Connection connection; - private final Map<PinId, ChannelHolder> channelsByPin = new ConcurrentHashMap<>(); + protected final Map<PinId, ChannelHolder> channelsByPin = new ConcurrentHashMap<>(); private final AtomicReference<State> connectionState = new AtomicReference<>(State.OPEN); - private final AtomicBoolean isPublishingBlocked = new AtomicBoolean(false); private final ConnectionManagerConfiguration configuration; - private final String subscriberName; - private final AtomicInteger nextSubscriberId = new AtomicInteger(1); + protected final String subscriberName; + protected final AtomicInteger nextSubscriberId = new AtomicInteger(1); private final ExecutorService sharedExecutor; - private final ScheduledExecutorService channelChecker = Executors.newSingleThreadScheduledExecutor( + protected final ScheduledExecutorService channelChecker = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("channel-checker-%d").build() ); - private final HealthMetrics metrics = new HealthMetrics(this); + protected final HealthMetrics metrics = new HealthMetrics(this); private final RecoveryListener recoveryListener = new RecoveryListener() { @Override @@ -293,29 +284,7 @@ private void addShutdownListenerToChannel(Channel channel, Boolean withRecovery) .orElse(null); } - private void recoverSubscriptionsOfChannel(@NotNull final PinId pinId, Channel channel, @NotNull final ChannelHolder holder) { - channelChecker.execute(() -> holder.withLock(() -> { - try { - var subscriptionCallbacks = holder.getCallbacksForRecovery(channel); - - if (subscriptionCallbacks != null) { - - LOGGER.info("Changing channel for holder with pin id: {}", pinId); - - var removedHolder = channelsByPin.remove(pinId); - if (removedHolder != holder) throw new IllegalStateException("Channel holder has been replaced"); - - basicConsume(pinId.queue, subscriptionCallbacks.deliverCallback, subscriptionCallbacks.cancelCallback); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOGGER.info("Recovering channel's subscriptions interrupted", e); - } catch (Throwable e) { - // this code executed in executor service and exception thrown here will not be handled anywhere - LOGGER.error("Failed to recovery channel's subscriptions", e); - } - })); - } + protected abstract void recoverSubscriptionsOfChannel(@NotNull final PinId pinId, Channel channel, @NotNull final ChannelHolder holder); private void addShutdownListenerToConnection(Connection conn) { conn.addShutdownListener(cause -> { @@ -347,20 +316,10 @@ private void addRecoveryListenerToConnection(Connection conn) { } } - private void addBlockedListenersToConnection(Connection conn) { - conn.addBlockedListener(new BlockedListener() { - @Override - public void handleBlocked(String reason) { - isPublishingBlocked.set(true); - LOGGER.warn("RabbitMQ blocked connection: {}", reason); - } + protected abstract BlockedListener getBlockedListener(); - @Override - public void handleUnblocked() { - isPublishingBlocked.set(false); - LOGGER.warn("RabbitMQ unblocked connection"); - } - }); + private void addBlockedListenersToConnection(Connection conn) { + conn.addBlockedListener(getBlockedListener()); } public boolean isOpen() { @@ -417,92 +376,6 @@ public void close() { shutdownExecutor(channelChecker, closeTimeout, "channel-checker"); } - public void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException, InterruptedException { - ChannelHolder holder = getOrCreateChannelFor(PinId.forRoutingKey(exchange, routingKey)); - holder.retryingPublishWithLock(configuration, body, - (channel, payload) -> channel.basicPublish(exchange, routingKey, props, payload)); - } - - public String queueDeclare() throws IOException { - ChannelHolder holder = new ChannelHolder(this::createChannel, this::waitForConnectionRecovery, configurationToOptions()); - return holder.mapWithLock(channel -> { - String queue = channel.queueDeclare( - "", // queue name - false, // durable - true, // exclusive - false, // autoDelete - Collections.emptyMap()).getQueue(); - LOGGER.info("Declared exclusive '{}' queue", queue); - putChannelFor(PinId.forQueue(queue), holder); - return queue; - }); - } - - public ExclusiveSubscriberMonitor basicConsume(String queue, ManualAckDeliveryCallback deliverCallback, CancelCallback cancelCallback) throws IOException, InterruptedException { - PinId pinId = PinId.forQueue(queue); - ChannelHolder holder = getOrCreateChannelFor(pinId, new SubscriptionCallbacks(deliverCallback, cancelCallback)); - String tag = holder.retryingConsumeWithLock(channel -> - channel.basicConsume(queue, false, subscriberName + "_" + nextSubscriberId.getAndIncrement(), (tagTmp, delivery) -> { - try { - Envelope envelope = delivery.getEnvelope(); - long deliveryTag = envelope.getDeliveryTag(); - String routingKey = envelope.getRoutingKey(); - LOGGER.trace("Received delivery {} from queue={} routing_key={}", deliveryTag, queue, routingKey); - - Confirmation wrappedConfirmation = new Confirmation() { - @Override - public void reject() throws IOException { - holder.withLock(ch -> { - try { - basicReject(ch, deliveryTag); - } catch (IOException | ShutdownSignalException e) { - LOGGER.warn("Error during basicReject of message with deliveryTag = {} inside channel #{}: {}", deliveryTag, ch.getChannelNumber(), e); - throw e; - } finally { - holder.release(() -> metrics.getReadinessMonitor().enable()); - } - }); - } - - @Override - public void confirm() throws IOException { - holder.withLock(ch -> { - try { - basicAck(ch, deliveryTag); - } catch (IOException | ShutdownSignalException e) { - LOGGER.warn("Error during basicAck of message with deliveryTag = {} inside channel #{}: {}", deliveryTag, ch.getChannelNumber(), e); - throw e; - } finally { - holder.release(() -> metrics.getReadinessMonitor().enable()); - } - }); - } - }; - - Confirmation confirmation = OnlyOnceConfirmation.wrap("from " + routingKey + " to " + queue, wrappedConfirmation); - - holder.withLock(() -> holder.acquireAndSubmitCheck(() -> - channelChecker.schedule(() -> { - holder.withLock(() -> { - LOGGER.warn("The confirmation for delivery {} in queue={} routing_key={} was not invoked within the specified delay", - deliveryTag, queue, routingKey); - if (holder.reachedPendingLimit()) { - metrics.getReadinessMonitor().disable(); - } - }); - return false; // to cast to Callable - }, configuration.getConfirmationTimeout().toMillis(), TimeUnit.MILLISECONDS) - )); - boolean redeliver = envelope.isRedeliver(); - deliverCallback.handle(new DeliveryMetadata(tagTmp, redeliver), delivery, confirmation); - } catch (IOException | RuntimeException e) { - LOGGER.error("Cannot handle delivery for tag {}: {}", tagTmp, e.getMessage(), e); - } - }, cancelCallback), configuration); - - return new RabbitMqSubscriberMonitor(holder, queue, tag, this::basicCancel); - } - boolean isReady() { return metrics.getReadinessMonitor().isEnabled(); } @@ -511,11 +384,7 @@ boolean isAlive() { return metrics.getLivenessMonitor().isEnabled(); } - boolean isPublishingBlocked() { - return isPublishingBlocked.get(); - } - - private ChannelHolderOptions configurationToOptions() { + protected ChannelHolderOptions configurationToOptions() { return new ChannelHolderOptions( configuration.getPrefetchCount(), configuration.getEnablePublisherConfirmation(), @@ -523,19 +392,6 @@ private ChannelHolderOptions configurationToOptions() { ); } - private void basicCancel(Channel channel, String consumerTag) throws IOException { - channel.basicCancel(consumerTag); - } - - public String queueExclusiveDeclareAndBind(String exchange) throws IOException, TimeoutException { - try (Channel channel = createChannel()) { - String queue = channel.queueDeclare().getQueue(); - channel.queueBind(queue, exchange, EMPTY_ROUTING_KEY); - LOGGER.info("Declared the '{}' queue to listen to the '{}'", queue, exchange); - return queue; - } - } - private void shutdownExecutor(ExecutorService executor, int closeTimeout, String name) { executor.shutdown(); try { @@ -549,9 +405,9 @@ private void shutdownExecutor(ExecutorService executor, int closeTimeout, String } } - private static final class SubscriptionCallbacks { - private final ManualAckDeliveryCallback deliverCallback; - private final CancelCallback cancelCallback; + protected static final class SubscriptionCallbacks { + protected final ManualAckDeliveryCallback deliverCallback; + protected final CancelCallback cancelCallback; public SubscriptionCallbacks(ManualAckDeliveryCallback deliverCallback, CancelCallback cancelCallback) { this.deliverCallback = deliverCallback; @@ -559,28 +415,28 @@ public SubscriptionCallbacks(ManualAckDeliveryCallback deliverCallback, CancelCa } } - private ChannelHolder getOrCreateChannelFor(PinId pinId) { + protected ChannelHolder getOrCreateChannelFor(PinId pinId) { return channelsByPin.computeIfAbsent(pinId, ignore -> { LOGGER.trace("Creating channel holder for {}", pinId); return new ChannelHolder(this::createChannel, this::waitForConnectionRecovery, configurationToOptions()); }); } - private ChannelHolder getOrCreateChannelFor(PinId pinId, SubscriptionCallbacks subscriptionCallbacks) { + protected ChannelHolder getOrCreateChannelFor(PinId pinId, SubscriptionCallbacks subscriptionCallbacks) { return channelsByPin.computeIfAbsent(pinId, ignore -> { LOGGER.trace("Creating channel holder with callbacks for {}", pinId); return new ChannelHolder(() -> createChannelWithOptionalRecovery(true), this::waitForConnectionRecovery, configurationToOptions(), subscriptionCallbacks); }); } - private void putChannelFor(PinId pinId, ChannelHolder holder) { + protected void putChannelFor(PinId pinId, ChannelHolder holder) { ChannelHolder previous = channelsByPin.putIfAbsent(pinId, holder); if (previous != null) { throw new IllegalStateException("Channel holder for the '" + pinId + "' pinId has been already registered"); } } - private Channel createChannel() { + protected Channel createChannel() { return createChannelWithOptionalRecovery(false); } @@ -605,7 +461,7 @@ private void waitForConnectionRecovery(ShutdownNotifier notifier) { waitForConnectionRecovery(notifier, true); } - private void waitForConnectionRecovery(ShutdownNotifier notifier, boolean waitForRecovery) { + protected void waitForConnectionRecovery(ShutdownNotifier notifier, boolean waitForRecovery) { if (isConnectionRecovery(notifier)) { if (waitForRecovery) { waitForRecovery(notifier); @@ -637,50 +493,14 @@ private boolean isConnectionRecovery(ShutdownNotifier notifier) { && connectionState.get() != State.CLOSED; } - /** - * @param channel pass channel witch used for basicConsume, because delivery tags are scoped per channel, - * deliveries must be acknowledged on the same channel they were received on. - */ - private static void basicAck(Channel channel, long deliveryTag) throws IOException { - channel.basicAck(deliveryTag, false); - } - - private static void basicReject(Channel channel, long deliveryTag) throws IOException { - channel.basicReject(deliveryTag, false); - } - - private static class RabbitMqSubscriberMonitor implements ExclusiveSubscriberMonitor { - private final ChannelHolder holder; - private final String queue; - private final String tag; - private final CancelAction action; - - public RabbitMqSubscriberMonitor(ChannelHolder holder, String queue, String tag, CancelAction action) { - this.holder = holder; - this.queue = queue; - this.tag = tag; - this.action = action; - } - - @Override - public @NotNull String getQueue() { - return queue; - } - - @Override - public void unsubscribe() throws IOException { - holder.unsubscribeWithLock(tag, action); - } - } - - private interface CancelAction { + protected interface CancelAction { void execute(Channel channel, String tag) throws IOException; } - private static class PinId { + protected static class PinId { private final String exchange; private final String routingKey; - private final String queue; + protected final String queue; public static PinId forRoutingKey(String exchange, String routingKey) { return new PinId(exchange, routingKey, null); @@ -731,7 +551,7 @@ public String toString() { } } - private static class ChannelHolderOptions { + protected static class ChannelHolderOptions { private final int maxCount; private final boolean enablePublisherConfirmation; private final int maxInflightRequestsBytes; @@ -755,8 +575,8 @@ public int getMaxInflightRequestsBytes() { } } - private static class ChannelHolder { - private final Lock lock = new ReentrantLock(); + protected static class ChannelHolder { + protected final Lock lock = new ReentrantLock(); private final Supplier<Channel> supplier; private final BiConsumer<ShutdownNotifier, Boolean> reconnectionChecker; private final ChannelHolderOptions options; @@ -1346,15 +1166,15 @@ public boolean awaitConfirmations(long timeout, TimeUnit timeUnit) throws Interr } } - private interface ChannelMapper<T> { + protected interface ChannelMapper<T> { T map(Channel channel) throws IOException; } - private interface ChannelConsumer { + protected interface ChannelConsumer { void consume(Channel channel) throws IOException; } - private interface ChannelPublisher { + protected interface ChannelPublisher { void publish(Channel channel, byte[] payload) throws IOException; } } \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/MessageRouterContext.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/MessageRouterContext.kt index 67f8e68b0..e92285153 100644 --- a/src/main/kotlin/com/exactpro/th2/common/schema/message/MessageRouterContext.kt +++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/MessageRouterContext.kt @@ -1,5 +1,6 @@ /* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2024 Exactpro (Exactpro Systems Limited) + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -17,11 +18,12 @@ package com.exactpro.th2.common.schema.message import com.exactpro.th2.common.schema.box.configuration.BoxConfiguration import com.exactpro.th2.common.schema.message.configuration.MessageRouterConfiguration -import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager +import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.PublishConnectionManager +import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConsumeConnectionManager interface MessageRouterContext { - val publishConnectionManager: ConnectionManager - val consumeConnectionManager: ConnectionManager + val publishConnectionManager: PublishConnectionManager + val consumeConnectionManager: ConsumeConnectionManager val routerMonitor: MessageRouterMonitor val configuration: MessageRouterConfiguration val boxConfiguration: BoxConfiguration diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/context/DefaultMessageRouterContext.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/context/DefaultMessageRouterContext.kt index dc503d422..f94ea7ff2 100644 --- a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/context/DefaultMessageRouterContext.kt +++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/context/DefaultMessageRouterContext.kt @@ -1,5 +1,6 @@ /* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2024 Exactpro (Exactpro Systems Limited) + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -19,11 +20,12 @@ import com.exactpro.th2.common.schema.box.configuration.BoxConfiguration import com.exactpro.th2.common.schema.message.MessageRouterContext import com.exactpro.th2.common.schema.message.MessageRouterMonitor import com.exactpro.th2.common.schema.message.configuration.MessageRouterConfiguration -import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager +import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.PublishConnectionManager +import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConsumeConnectionManager class DefaultMessageRouterContext( - override val publishConnectionManager: ConnectionManager, - override val consumeConnectionManager: ConnectionManager, + override val publishConnectionManager: PublishConnectionManager, + override val consumeConnectionManager: ConsumeConnectionManager, override val routerMonitor: MessageRouterMonitor, override val configuration: MessageRouterConfiguration, override val boxConfiguration: BoxConfiguration diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitRouter.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitRouter.kt index 91bc224ea..beeb7d58e 100644 --- a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitRouter.kt +++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitRouter.kt @@ -1,5 +1,6 @@ /* * Copyright 2021-2024 Exactpro (Exactpro Systems Limited) + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -12,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.exactpro.th2.common.schema.message.impl.rabbitmq import com.exactpro.th2.common.schema.box.configuration.BoxConfiguration @@ -21,7 +23,8 @@ import com.exactpro.th2.common.schema.message.QueueAttribute.PUBLISH import com.exactpro.th2.common.schema.message.QueueAttribute.SUBSCRIBE import com.exactpro.th2.common.schema.message.configuration.MessageRouterConfiguration import com.exactpro.th2.common.schema.message.configuration.QueueConfiguration -import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager +import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.PublishConnectionManager +import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConsumeConnectionManager import mu.KotlinLogging import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicReference @@ -43,10 +46,10 @@ abstract class AbstractRabbitRouter<T> : MessageRouter<T> { private val configuration: MessageRouterConfiguration get() = context.configuration - protected val publishConnectionManager: ConnectionManager + protected val publishConnectionManager: PublishConnectionManager get() = context.publishConnectionManager - protected val consumeConnectionManager: ConnectionManager + protected val consumeConnectionManager: ConsumeConnectionManager get() = context.consumeConnectionManager private val boxConfiguration: BoxConfiguration @@ -327,4 +330,4 @@ abstract class AbstractRabbitRouter<T> : MessageRouter<T> { private val REQUIRED_SEND_ATTRIBUTES = setOf(PUBLISH.toString()) private val REQUIRED_SUBSCRIBE_ATTRIBUTES = setOf(SUBSCRIBE.toString()) } -} +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConsumeConnectionManager.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConsumeConnectionManager.kt new file mode 100644 index 000000000..00683ed31 --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConsumeConnectionManager.kt @@ -0,0 +1,217 @@ +/* + * Copyright 2024 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.common.schema.message.impl.rabbitmq.connection + +import com.exactpro.th2.common.schema.message.DeliveryMetadata +import com.exactpro.th2.common.schema.message.ExclusiveSubscriberMonitor +import com.exactpro.th2.common.schema.message.ManualAckDeliveryCallback +import com.exactpro.th2.common.schema.message.ManualAckDeliveryCallback.Confirmation +import com.exactpro.th2.common.schema.message.impl.OnlyOnceConfirmation.Companion.wrap +import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.ConnectionManagerConfiguration +import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.RabbitMQConfiguration +import com.rabbitmq.client.Channel +import com.rabbitmq.client.Delivery +import com.rabbitmq.client.CancelCallback +import com.rabbitmq.client.BlockedListener +import com.rabbitmq.client.ShutdownNotifier +import com.rabbitmq.client.ShutdownSignalException + +import mu.KotlinLogging +import java.io.IOException +import java.util.concurrent.TimeUnit +import java.util.concurrent.TimeoutException + +class ConsumeConnectionManager( + connectionName: String, + rabbitMQConfiguration: RabbitMQConfiguration, + connectionManagerConfiguration: ConnectionManagerConfiguration +) : ConnectionManager(connectionName, rabbitMQConfiguration, connectionManagerConfiguration) { + override fun getBlockedListener(): BlockedListener = object : BlockedListener { + override fun handleBlocked(reason: String) { + LOGGER.info { "RabbitMQ blocked consume connection: $reason" } + } + + override fun handleUnblocked() { + LOGGER.info("RabbitMQ unblocked consume connection") + } + } + + @Throws(IOException::class) + fun queueDeclare(): String { + val holder = ChannelHolder({ this.createChannel() }, + { notifier: ShutdownNotifier, waitForRecovery: Boolean -> + this.waitForConnectionRecovery( + notifier, + waitForRecovery + ) + }, configurationToOptions() + ) + return holder.mapWithLock { channel: Channel -> + val queue = channel.queueDeclare( + "", // queue name + false, // durable + true, // exclusive + false, // autoDelete + emptyMap() + ).queue + LOGGER.info { "Declared exclusive '$queue' queue" } + putChannelFor(PinId.forQueue(queue), holder) + queue + } + } + + @Throws(IOException::class, TimeoutException::class) + fun queueExclusiveDeclareAndBind(exchange: String): String { + createChannel().use { channel -> + val queue = channel.queueDeclare().queue + channel.queueBind(queue, exchange, EMPTY_ROUTING_KEY) + LOGGER.info { "Declared the '$queue' queue to listen to the '$exchange'" } + return queue + } + } + + @Throws(IOException::class, InterruptedException::class) + fun basicConsume( + queue: String, + deliverCallback: ManualAckDeliveryCallback, + cancelCallback: CancelCallback + ): ExclusiveSubscriberMonitor { + val pinId = PinId.forQueue(queue) + val holder = getOrCreateChannelFor(pinId, SubscriptionCallbacks(deliverCallback, cancelCallback)) + val tag = holder.retryingConsumeWithLock({ channel: Channel -> + channel.basicConsume( + queue, false, subscriberName + "_" + nextSubscriberId.getAndIncrement(), + { tagTmp: String, delivery: Delivery -> + try { + val envelope = delivery.envelope + val deliveryTag = envelope.deliveryTag + val routingKey = envelope.routingKey + LOGGER.trace { "Received delivery $deliveryTag from queue=$queue routing_key=$routingKey" } + + val wrappedConfirmation: Confirmation = object : Confirmation { + @Throws(IOException::class) + override fun reject() { + holder.withLock { ch: Channel -> + try { + channel.basicReject(deliveryTag, false) + } catch (e: IOException) { + LOGGER.warn { "Error during basicReject of message with deliveryTag = $deliveryTag inside channel #${ch.channelNumber}: $e" } + throw e + } catch (e: ShutdownSignalException) { + LOGGER.warn { "Error during basicReject of message with deliveryTag = $deliveryTag inside channel #${ch.channelNumber}: $e" } + throw e + } finally { + holder.release { metrics.readinessMonitor.enable() } + } + } + } + + @Throws(IOException::class) + override fun confirm() { + holder.withLock { ch: Channel -> + try { + // because delivery tags are scoped per channel, + // deliveries must be acknowledged on the same channel they were received on. + ch.basicAck(deliveryTag, false) + } catch (e: Exception) { + LOGGER.warn { "Error during basicAck of message with deliveryTag = $deliveryTag inside channel #${ch.channelNumber}: $e" } + throw e + } finally { + holder.release { metrics.readinessMonitor.enable() } + } + } + } + } + + val confirmation = wrap("from $routingKey to $queue", wrappedConfirmation) + + holder.withLock(Runnable { + holder.acquireAndSubmitCheck { + channelChecker.schedule<Boolean>( + { + holder.withLock(Runnable { + LOGGER.warn { "The confirmation for delivery $deliveryTag in queue=$queue routing_key=$routingKey was not invoked within the specified delay" } + if (holder.reachedPendingLimit()) { + metrics.readinessMonitor.disable() + } + }) + false // to cast to Callable + }, + configuration.confirmationTimeout.toMillis(), + TimeUnit.MILLISECONDS + ) + } + }) + val redeliver = envelope.isRedeliver + deliverCallback.handle(DeliveryMetadata(tagTmp, redeliver), delivery, confirmation) + } catch (e: Exception) { + LOGGER.error("Cannot handle delivery for tag $tagTmp: ${e.message}", e) + } catch (e: RuntimeException) { + LOGGER.error("Cannot handle delivery for tag $tagTmp: ${e.message}", e) + } + }, cancelCallback + ) + }, configuration) + + return RabbitMqSubscriberMonitor(holder, queue, tag) { channel, consumerTag -> channel.basicCancel(consumerTag) } + } + + override fun recoverSubscriptionsOfChannel(pinId: PinId, channel: Channel, holder: ChannelHolder) { + channelChecker.execute { + holder.withLock(Runnable { + try { + val subscriptionCallbacks = holder.getCallbacksForRecovery(channel) + + if (subscriptionCallbacks != null) { + LOGGER.info { "Changing channel for holder with pin id: $pinId" } + + val removedHolder = channelsByPin.remove(pinId) + check(removedHolder === holder) { "Channel holder has been replaced" } + + basicConsume( + pinId.queue, + subscriptionCallbacks.deliverCallback, + subscriptionCallbacks.cancelCallback + ) + } + } catch (e: InterruptedException) { + Thread.currentThread().interrupt() + LOGGER.info("Recovering channel's subscriptions interrupted", e) + } catch (e: Throwable) { + // this code executed in executor service and exception thrown here will not be handled anywhere + LOGGER.error("Failed to recovery channel's subscriptions", e) + } + }) + } + } + + private class RabbitMqSubscriberMonitor( + private val holder: ChannelHolder, + override val queue: String, + private val tag: String, + private val action: CancelAction + ) : ExclusiveSubscriberMonitor { + @Throws(IOException::class) + override fun unsubscribe() { + holder.unsubscribeWithLock(tag, action) + } + } + + companion object { + private val LOGGER = KotlinLogging.logger {} + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/PublishConnectionManager.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/PublishConnectionManager.kt new file mode 100644 index 000000000..0cfb15cfb --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/PublishConnectionManager.kt @@ -0,0 +1,59 @@ +/* + * Copyright 2024 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.common.schema.message.impl.rabbitmq.connection + +import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.ConnectionManagerConfiguration +import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.RabbitMQConfiguration +import com.rabbitmq.client.AMQP +import com.rabbitmq.client.BlockedListener +import com.rabbitmq.client.Channel +import mu.KotlinLogging +import java.io.IOException + +class PublishConnectionManager( + connectionName: String, + rabbitMQConfiguration: RabbitMQConfiguration, + connectionManagerConfiguration: ConnectionManagerConfiguration +) : ConnectionManager(connectionName, rabbitMQConfiguration, connectionManagerConfiguration) { + @Volatile var isPublishingBlocked = false + + @Throws(IOException::class, InterruptedException::class) + fun basicPublish(exchange: String, routingKey: String, props: AMQP.BasicProperties?, body: ByteArray) { + val holder = getOrCreateChannelFor(PinId.forRoutingKey(exchange, routingKey)) + holder.retryingPublishWithLock(configuration, body) { channel: Channel, payload: ByteArray -> + channel.basicPublish(exchange, routingKey, props, payload) + } + } + + override fun getBlockedListener(): BlockedListener = object : BlockedListener { + override fun handleBlocked(reason: String) { + isPublishingBlocked = true + LOGGER.info { "RabbitMQ blocked publish connection: $reason" } + } + + override fun handleUnblocked() { + isPublishingBlocked = false + LOGGER.info("RabbitMQ unblocked publish connection") + } + } + + override fun recoverSubscriptionsOfChannel(pinId: PinId, channel: Channel, holder: ChannelHolder) {} + + companion object { + private val LOGGER = KotlinLogging.logger {} + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/custom/RabbitCustomRouter.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/custom/RabbitCustomRouter.kt index 464e3c078..ae3693051 100644 --- a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/custom/RabbitCustomRouter.kt +++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/custom/RabbitCustomRouter.kt @@ -1,5 +1,6 @@ /* * Copyright 2020-2024 Exactpro (Exactpro Systems Limited) + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -25,7 +26,8 @@ import com.exactpro.th2.common.schema.message.impl.rabbitmq.AbstractRabbitSubscr import com.exactpro.th2.common.schema.message.impl.rabbitmq.BookName import com.exactpro.th2.common.schema.message.impl.rabbitmq.PinConfiguration import com.exactpro.th2.common.schema.message.impl.rabbitmq.PinName -import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager +import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.PublishConnectionManager +import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConsumeConnectionManager /** * NOTE: labels are used for back compatibility and may be deleted later @@ -91,14 +93,14 @@ class RabbitCustomRouter<T : Any>( } private class Sender<T : Any>( - connectionManager: ConnectionManager, + publishConnectionManager: PublishConnectionManager, exchangeName: String, routingKey: String, th2Pin: String, bookName: BookName, customTag: String, private val converter: MessageConverter<T> - ) : AbstractRabbitSender<T>(connectionManager, exchangeName, routingKey, th2Pin, customTag, bookName) { + ) : AbstractRabbitSender<T>(publishConnectionManager, exchangeName, routingKey, th2Pin, customTag, bookName) { override fun valueToBytes(value: T): ByteArray = converter.toByteArray(value) override fun toShortTraceString(value: T): String = converter.toTraceString(value) @@ -107,13 +109,13 @@ class RabbitCustomRouter<T : Any>( } private class Subscriber<T : Any>( - connectionManager: ConnectionManager, + consumeConnectionManager: ConsumeConnectionManager, queue: String, th2Pin: String, customTag: String, private val converter: MessageConverter<T>, messageListener: ConfirmationListener<T> - ) : AbstractRabbitSubscriber<T>(connectionManager, queue, th2Pin, customTag, messageListener) { + ) : AbstractRabbitSubscriber<T>(consumeConnectionManager, queue, th2Pin, customTag, messageListener) { override fun valueFromBytes(body: ByteArray): T = converter.fromByteArray(body) override fun toShortTraceString(value: T): String = converter.toTraceString(value) diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/RabbitMessageGroupBatchSender.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/RabbitMessageGroupBatchSender.kt index 8bd12019c..752fee745 100644 --- a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/RabbitMessageGroupBatchSender.kt +++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/RabbitMessageGroupBatchSender.kt @@ -1,5 +1,5 @@ /* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,20 +27,20 @@ import com.exactpro.th2.common.metrics.TH2_PIN_LABEL import com.exactpro.th2.common.metrics.incrementTotalMetrics import com.exactpro.th2.common.schema.message.impl.rabbitmq.AbstractRabbitSender import com.exactpro.th2.common.schema.message.impl.rabbitmq.BookName -import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager +import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.PublishConnectionManager import com.exactpro.th2.common.schema.message.impl.rabbitmq.group.RabbitMessageGroupBatchRouter.Companion.MESSAGE_GROUP_TYPE import com.exactpro.th2.common.schema.message.toShortDebugString import io.prometheus.client.Counter import io.prometheus.client.Gauge class RabbitMessageGroupBatchSender( - connectionManager: ConnectionManager, + publishConnectionManager: PublishConnectionManager, exchangeName: String, routingKey: String, th2Pin: String, bookName: BookName ) : AbstractRabbitSender<MessageGroupBatch>( - connectionManager, + publishConnectionManager, exchangeName, routingKey, th2Pin, diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/RabbitMessageGroupBatchSubscriber.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/RabbitMessageGroupBatchSubscriber.kt index 194233f01..3a3aeea35 100644 --- a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/RabbitMessageGroupBatchSubscriber.kt +++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/RabbitMessageGroupBatchSubscriber.kt @@ -1,5 +1,6 @@ /* - * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2024 Exactpro (Exactpro Systems Limited) + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -29,7 +30,7 @@ import com.exactpro.th2.common.schema.message.FilterFunction import com.exactpro.th2.common.schema.message.ManualAckDeliveryCallback import com.exactpro.th2.common.schema.message.configuration.RouterFilter import com.exactpro.th2.common.schema.message.impl.rabbitmq.AbstractRabbitSubscriber -import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager +import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConsumeConnectionManager import com.exactpro.th2.common.schema.message.impl.rabbitmq.group.RabbitMessageGroupBatchRouter.Companion.MESSAGE_GROUP_TYPE import com.exactpro.th2.common.schema.message.toBuilderWithMetadata import com.exactpro.th2.common.schema.message.toShortDebugString @@ -41,14 +42,14 @@ import io.prometheus.client.Gauge import mu.KotlinLogging class RabbitMessageGroupBatchSubscriber( - connectionManager: ConnectionManager, + consumeConnectionManager: ConsumeConnectionManager, queue: String, private val filterFunction: FilterFunction, th2Pin: String, private val filters: List<RouterFilter>, private val messageRecursionLimit: Int, messageListener: ConfirmationListener<MessageGroupBatch> -) : AbstractRabbitSubscriber<MessageGroupBatch>(connectionManager, queue, th2Pin, MESSAGE_GROUP_TYPE, messageListener) { +) : AbstractRabbitSubscriber<MessageGroupBatch>(consumeConnectionManager, queue, th2Pin, MESSAGE_GROUP_TYPE, messageListener) { private val logger = KotlinLogging.logger {} override fun valueFromBytes(body: ByteArray): MessageGroupBatch = parseEncodedBatch(body) diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/notification/NotificationEventBatchSender.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/notification/NotificationEventBatchSender.kt index a81fe662a..9e6075723 100644 --- a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/notification/NotificationEventBatchSender.kt +++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/notification/NotificationEventBatchSender.kt @@ -1,5 +1,5 @@ /* - * Copyright 2021-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2021-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,10 +20,11 @@ import com.exactpro.th2.common.grpc.EventBatch import com.exactpro.th2.common.schema.message.MessageSender import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager.EMPTY_ROUTING_KEY +import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.PublishConnectionManager import java.io.IOException class NotificationEventBatchSender( - private val connectionManager: ConnectionManager, + private val publishConnectionManager: PublishConnectionManager, private val exchange: String ) : MessageSender<EventBatch> { @Deprecated( @@ -37,7 +38,7 @@ class NotificationEventBatchSender( @Throws(IOException::class) override fun send(message: EventBatch) { try { - connectionManager.basicPublish(exchange, EMPTY_ROUTING_KEY, null, message.toByteArray()) + publishConnectionManager.basicPublish(exchange, EMPTY_ROUTING_KEY, null, message.toByteArray()) } catch (e: Exception) { throw IOException( "Can not send notification message: EventBatch: parent_event_id = ${message.parentEventId.id}", diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/notification/NotificationEventBatchSubscriber.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/notification/NotificationEventBatchSubscriber.kt index f8b66e930..ccc21e2ce 100644 --- a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/notification/NotificationEventBatchSubscriber.kt +++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/notification/NotificationEventBatchSubscriber.kt @@ -1,5 +1,5 @@ /* - * Copyright 2021-2023 Exactpro (Exactpro Systems Limited) + * Copyright 2021-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,21 +22,21 @@ import com.exactpro.th2.common.schema.message.DeliveryMetadata import com.exactpro.th2.common.schema.message.ManualAckDeliveryCallback import com.exactpro.th2.common.schema.message.MessageSubscriber import com.exactpro.th2.common.schema.message.SubscriberMonitor -import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager +import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConsumeConnectionManager import com.rabbitmq.client.Delivery import mu.KotlinLogging import java.util.concurrent.CopyOnWriteArrayList // DRAFT of notification router class NotificationEventBatchSubscriber( - private val connectionManager: ConnectionManager, + private val consumeConnectionManager: ConsumeConnectionManager, private val queue: String ) : MessageSubscriber { private val listeners = CopyOnWriteArrayList<ConfirmationListener<EventBatch>>() private lateinit var monitor: SubscriberMonitor fun start() { - monitor = connectionManager.basicConsume( + monitor = consumeConnectionManager.basicConsume( queue, { deliveryMetadata: DeliveryMetadata, delivery: Delivery, confirmation: ManualAckDeliveryCallback.Confirmation -> try { @@ -76,4 +76,4 @@ class NotificationEventBatchSubscriber( companion object { private val LOGGER = KotlinLogging.logger {} } -} +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/TransportGroupBatchSender.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/TransportGroupBatchSender.kt index 40e47fbef..29c425d55 100644 --- a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/TransportGroupBatchSender.kt +++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/TransportGroupBatchSender.kt @@ -1,5 +1,5 @@ /* - * Copyright 2023 Exactpro (Exactpro Systems Limited) + * Copyright 2023-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,18 +21,18 @@ import com.exactpro.th2.common.metrics.SESSION_GROUP_LABEL import com.exactpro.th2.common.metrics.TH2_PIN_LABEL import com.exactpro.th2.common.schema.message.impl.rabbitmq.AbstractRabbitSender import com.exactpro.th2.common.schema.message.impl.rabbitmq.BookName -import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager +import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.PublishConnectionManager import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.TransportGroupBatchRouter.Companion.TRANSPORT_GROUP_TYPE import io.prometheus.client.Counter class TransportGroupBatchSender( - connectionManager: ConnectionManager, + publishConnectionManager: PublishConnectionManager, exchangeName: String, routingKey: String, th2Pin: String, bookName: BookName, ) : AbstractRabbitSender<GroupBatch>( - connectionManager, + publishConnectionManager, exchangeName, routingKey, th2Pin, diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/TransportGroupBatchSubscriber.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/TransportGroupBatchSubscriber.kt index 4cc75b9eb..1063d35ac 100644 --- a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/TransportGroupBatchSubscriber.kt +++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/TransportGroupBatchSubscriber.kt @@ -1,5 +1,6 @@ /* - * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2024 Exactpro (Exactpro Systems Limited) + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -23,19 +24,19 @@ import com.exactpro.th2.common.schema.message.DeliveryMetadata import com.exactpro.th2.common.schema.message.ManualAckDeliveryCallback import com.exactpro.th2.common.schema.message.configuration.RouterFilter import com.exactpro.th2.common.schema.message.impl.rabbitmq.AbstractRabbitSubscriber -import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager +import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConsumeConnectionManager import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.TransportGroupBatchRouter.Companion.TRANSPORT_GROUP_TYPE import com.rabbitmq.client.Delivery import io.netty.buffer.Unpooled import io.prometheus.client.Counter class TransportGroupBatchSubscriber( - connectionManager: ConnectionManager, + consumeConnectionManager: ConsumeConnectionManager, queue: String, th2Pin: String, private val filters: List<RouterFilter>, messageListener: ConfirmationListener<GroupBatch> -) : AbstractRabbitSubscriber<GroupBatch>(connectionManager, queue, th2Pin, TRANSPORT_GROUP_TYPE, messageListener) { +) : AbstractRabbitSubscriber<GroupBatch>(consumeConnectionManager, queue, th2Pin, TRANSPORT_GROUP_TYPE, messageListener) { override fun valueFromBytes(body: ByteArray): GroupBatch = Unpooled.wrappedBuffer(body).run(GroupBatchCodec::decode) diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitRouterIntegrationTest.kt b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitRouterIntegrationTest.kt index 38247b0b8..eb928fd8e 100644 --- a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitRouterIntegrationTest.kt +++ b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitRouterIntegrationTest.kt @@ -29,9 +29,10 @@ import com.exactpro.th2.common.schema.message.configuration.MessageRouterConfigu import com.exactpro.th2.common.schema.message.configuration.QueueConfiguration import com.exactpro.th2.common.schema.message.impl.context.DefaultMessageRouterContext import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.ConnectionManagerConfiguration -import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.RabbitMQConfiguration -import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager +import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.PublishConnectionManager +import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConsumeConnectionManager import com.exactpro.th2.common.schema.message.impl.rabbitmq.custom.RabbitCustomRouter +import com.exactpro.th2.common.util.getRabbitMQConfiguration import com.rabbitmq.client.BuiltinExchangeType import mu.KotlinLogging import org.junit.jupiter.api.Assertions.assertNull @@ -61,9 +62,8 @@ class AbstractRabbitRouterIntegrationTest { rabbitMQContainer.start() K_LOGGER.info { "Started with port ${rabbitMQContainer.amqpPort}, rest ${rabbitMQContainer.httpUrl} ${rabbitMQContainer.adminUsername} ${rabbitMQContainer.adminPassword} " } - createConnectionManager(rabbitMQContainer).use { publishManager -> - createConnectionManager(rabbitMQContainer).use { consumeManager -> - + createConsumeConnectionManager(rabbitMQContainer).use { consumeManager -> + createPublishConnectionManager(rabbitMQContainer).use { publishManager -> createRouter(publishManager, consumeManager).use { firstRouter -> val messageA = "test-message-a" val messageB = "test-message-b" @@ -111,8 +111,8 @@ class AbstractRabbitRouterIntegrationTest { queue: ArrayBlockingQueue<Delivery>, expectations: List<Expectation>, ) { - createConnectionManager(rabbitMQContainer).use { publishManager -> - createConnectionManager(rabbitMQContainer).use { consumeManager -> + createConsumeConnectionManager(rabbitMQContainer).use { consumeManager -> + createPublishConnectionManager(rabbitMQContainer).use { publishManager -> createRouter(publishManager, consumeManager).use { router -> val monitor = router.subscribeWithManualAck({ deliveryMetadata, message, confirmation -> queue.put( @@ -160,27 +160,34 @@ class AbstractRabbitRouterIntegrationTest { } } - private fun createConnectionManager( - rabbitMQContainer: RabbitMQContainer, - prefetchCount: Int = DEFAULT_PREFETCH_COUNT, - confirmationTimeout: Duration = DEFAULT_CONFIRMATION_TIMEOUT - ) = ConnectionManager( - "test-connection", - RabbitMQConfiguration( - host = rabbitMQContainer.host, - vHost = "", - port = rabbitMQContainer.amqpPort, - username = rabbitMQContainer.adminUsername, - password = rabbitMQContainer.adminPassword, - ), + private fun getConnectionManagerConfiguration(prefetchCount: Int, confirmationTimeout: Duration) = ConnectionManagerConfiguration( subscriberName = "test", prefetchCount = prefetchCount, - confirmationTimeout = confirmationTimeout, - ), + confirmationTimeout = confirmationTimeout + ) + + private fun createPublishConnectionManager( + rabbitMQContainer: RabbitMQContainer, + prefetchCount: Int = DEFAULT_PREFETCH_COUNT, + confirmationTimeout: Duration = DEFAULT_CONFIRMATION_TIMEOUT + ) = PublishConnectionManager( + "test-publish-connection", + getRabbitMQConfiguration(rabbitMQContainer), + getConnectionManagerConfiguration(prefetchCount, confirmationTimeout) + ) + + private fun createConsumeConnectionManager( + rabbitMQContainer: RabbitMQContainer, + prefetchCount: Int = DEFAULT_PREFETCH_COUNT, + confirmationTimeout: Duration = DEFAULT_CONFIRMATION_TIMEOUT + ) = ConsumeConnectionManager( + "test-consume-connection", + getRabbitMQConfiguration(rabbitMQContainer), + getConnectionManagerConfiguration(prefetchCount, confirmationTimeout) ) - private fun createRouter(publishConnectionManager: ConnectionManager, consumeConnectionManager: ConnectionManager) = RabbitCustomRouter( + private fun createRouter(publishConnectionManager: PublishConnectionManager, consumeConnectionManager: ConsumeConnectionManager) = RabbitCustomRouter( "test-custom-tag", arrayOf("test-label"), TestMessageConverter() diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitRouterTest.kt b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitRouterTest.kt index 49723228a..015da1abd 100644 --- a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitRouterTest.kt +++ b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitRouterTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2023 Exactpro (Exactpro Systems Limited) + * Copyright 2023-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.exactpro.th2.common.schema.message.impl.rabbitmq import com.exactpro.th2.common.event.bean.BaseTest @@ -22,7 +23,8 @@ import com.exactpro.th2.common.schema.message.configuration.MessageRouterConfigu import com.exactpro.th2.common.schema.message.configuration.QueueConfiguration import com.exactpro.th2.common.schema.message.impl.context.DefaultMessageRouterContext import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.ConnectionManagerConfiguration -import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager +import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.PublishConnectionManager +import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConsumeConnectionManager import com.exactpro.th2.common.schema.message.impl.rabbitmq.custom.RabbitCustomRouter import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Assertions.assertAll @@ -48,13 +50,10 @@ private const val TEST_EXCLUSIVE_QUEUE = "test-exclusive-queue" class AbstractRabbitRouterTest { private val connectionConfiguration = ConnectionManagerConfiguration() private val managerMonitor: ExclusiveSubscriberMonitor = mock { } - private val publishConnectionManager: ConnectionManager = mock { + private val publishConnectionManager: PublishConnectionManager = mock { on { configuration }.thenReturn(connectionConfiguration) - on { basicConsume(any(), any(), any()) }.thenReturn(managerMonitor) - on { queueDeclare() }.thenAnswer { "$TEST_EXCLUSIVE_QUEUE-${exclusiveQueueCounter.incrementAndGet()}" } } - - private val consumeConnectionManager: ConnectionManager = mock { + private val consumeConnectionManager: ConsumeConnectionManager = mock { on { configuration }.thenReturn(connectionConfiguration) on { basicConsume(any(), any(), any()) }.thenReturn(managerMonitor) on { queueDeclare() }.thenAnswer { "$TEST_EXCLUSIVE_QUEUE-${exclusiveQueueCounter.incrementAndGet()}" } @@ -172,7 +171,7 @@ class AbstractRabbitRouterTest { } @Test - fun `subscribes when subscribtion active`() { + fun `subscribes when subscription active`() { router.subscribe(mock { }, "1") clearInvocations(consumeConnectionManager) clearInvocations(managerMonitor) @@ -190,7 +189,7 @@ class AbstractRabbitRouterTest { } @Test - fun `subscribes to exclusive queue when subscribtion active`() { + fun `subscribes to exclusive queue when subscription active`() { val monitorA = router.subscribeExclusive(mock { }) val monitorB = router.subscribeExclusive(mock { }) diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManualBenchmark.kt b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManualBenchmark.kt index 2f8fa25a6..a0c15317a 100644 --- a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManualBenchmark.kt +++ b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManualBenchmark.kt @@ -18,7 +18,7 @@ package com.exactpro.th2.common.schema.message.impl.rabbitmq.connection import com.exactpro.th2.common.schema.message.ContainerConstants.RABBITMQ_IMAGE_NAME import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.ConnectionManagerConfiguration -import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.RabbitMQConfiguration +import com.exactpro.th2.common.util.getRabbitMQConfiguration import mu.KotlinLogging import org.testcontainers.containers.RabbitMQContainer import java.time.Duration @@ -40,7 +40,7 @@ object ConnectionManualBenchmark { execInContainer("rabbitmqadmin", "declare", "exchange", "name=$exchangeName", "type=fanout") execInContainer("rabbitmqadmin", "declare", "binding", "source=$exchangeName", "destination_type=queue", "destination=$queueName") - val consumer = createConnectionManager( + val consumer = createConsumeConnectionManager( container, ConnectionManagerConfiguration( prefetchCount = 1000, @@ -51,7 +51,7 @@ object ConnectionManualBenchmark { ) val connectionManagerWithConfirmation = { - createConnectionManager( + createPublishConnectionManager( container, ConnectionManagerConfiguration( prefetchCount = 100, @@ -63,7 +63,7 @@ object ConnectionManualBenchmark { } val connectionManagerWithoutConfirmation = { - createConnectionManager( + createPublishConnectionManager( container, ConnectionManagerConfiguration( prefetchCount = 100, @@ -102,7 +102,7 @@ object ConnectionManualBenchmark { } } - private fun measure(name: String, manager: () -> ConnectionManager, payload: ByteArray): Duration { + private fun measure(name: String, manager: () -> PublishConnectionManager, payload: ByteArray): Duration { LOGGER.info("Measuring $name") val start: Long val sent: Long @@ -136,16 +136,17 @@ object ConnectionManualBenchmark { return duration } - private fun createConnectionManager(container: RabbitMQContainer, configuration: ConnectionManagerConfiguration) = - ConnectionManager( - "test-connection", - RabbitMQConfiguration( - host = container.host, - vHost = "", - port = container.amqpPort, - username = container.adminUsername, - password = container.adminPassword, - ), + private fun createPublishConnectionManager(container: RabbitMQContainer, configuration: ConnectionManagerConfiguration) = + PublishConnectionManager( + "test-publish-connection", + getRabbitMQConfiguration(container), + configuration + ) + + private fun createConsumeConnectionManager(container: RabbitMQContainer, configuration: ConnectionManagerConfiguration) = + ConsumeConnectionManager( + "test-consume-connection", + getRabbitMQConfiguration(container), configuration ) } \ No newline at end of file diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt index 1063ebf1f..e9164a84c 100644 --- a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt +++ b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt @@ -31,6 +31,7 @@ import com.exactpro.th2.common.util.getChannelsInfo import com.exactpro.th2.common.util.getQueuesInfo import com.exactpro.th2.common.util.getSubscribedChannelsCount import com.exactpro.th2.common.util.putMessageInQueue +import com.exactpro.th2.common.util.getRabbitMQConfiguration import com.github.dockerjava.api.model.Capability import com.rabbitmq.client.BuiltinExchangeType import com.rabbitmq.client.CancelCallback @@ -80,80 +81,85 @@ class TestConnectionManager { val messagesCount = 10 val countDown = CountDownLatch(messagesCount) val messageSizeBytes = 7 - createConnectionManager( - rabbit, ConnectionManagerConfiguration( - subscriberName = "test", - prefetchCount = DEFAULT_PREFETCH_COUNT, - confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT, - enablePublisherConfirmation = true, - maxInflightPublicationsBytes = 5 * messageSizeBytes, - heartbeatIntervalSeconds = 1, - minConnectionRecoveryTimeout = 2000, - maxConnectionRecoveryTimeout = 2000, - // to avoid unexpected delays before recovery - retryTimeDeviationPercent = 0, - ) - ).use { manager -> - val receivedMessages = linkedSetOf<String>() - manager.basicConsume(queueName, { _, delivery, ack -> - val message = delivery.body.toString(Charsets.UTF_8) - LOGGER.info { "Received $message from ${delivery.envelope.routingKey}" } - if (receivedMessages.add(message)) { - // decrement only unique messages - countDown.countDown() - } else { - LOGGER.warn { "Duplicated $message for ${delivery.envelope.routingKey}" } + val managerConfiguration = ConnectionManagerConfiguration( + subscriberName = "test", + prefetchCount = DEFAULT_PREFETCH_COUNT, + confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT, + enablePublisherConfirmation = true, + maxInflightPublicationsBytes = 5 * messageSizeBytes, + heartbeatIntervalSeconds = 1, + minConnectionRecoveryTimeout = 2000, + maxConnectionRecoveryTimeout = 2000, + // to avoid unexpected delays before recovery + retryTimeDeviationPercent = 0, + ) + createConsumeConnectionManager(rabbit, managerConfiguration).use { consumeManager -> + createPublishConnectionManager(rabbit, managerConfiguration).use { publishManager -> + val receivedMessages = linkedSetOf<String>() + consumeManager.basicConsume(queueName, { _, delivery, ack -> + val message = delivery.body.toString(Charsets.UTF_8) + LOGGER.info { "Received $message from ${delivery.envelope.routingKey}" } + if (receivedMessages.add(message)) { + // decrement only unique messages + countDown.countDown() + } else { + LOGGER.warn { "Duplicated $message for ${delivery.envelope.routingKey}" } + } + ack.confirm() + }) { + LOGGER.info { "Canceled $it" } } - ack.confirm() - }) { - LOGGER.info { "Canceled $it" } - } - - var future: CompletableFuture<*>? = null - repeat(messagesCount) { index -> - if (index == 1) { - // delay should allow ack for the first message be received - Awaitility.await("first message is confirmed") - .pollInterval(10, TimeUnit.MILLISECONDS) - .atMost(100, TimeUnit.MILLISECONDS) - .until { countDown.count == messagesCount - 1L } - // Man pages: - // https://man7.org/linux/man-pages/man8/tc-netem.8.html - // https://man7.org/linux/man-pages/man8/ifconfig.8.html - // - // Here we try to emulate network outage to cause missing publication confirmations. - // - // In real life we will probably get duplicates in this case because - // rabbitmq does not provide exactly-once semantic. - // So, we will have to deal with it on the consumer side - rabbit.executeInContainerWithLogging("ifconfig", "eth0", "down") - } else if (index == 4) { - future = CompletableFuture.supplyAsync { - // Interface is unblock in separate thread to emulate more realistic scenario - - // More than 2 HB will be missed - // This is enough for rabbitmq server to understand the connection is lost - Awaitility.await("connection is closed") - .atMost(3, TimeUnit.SECONDS) - .until { !manager.isOpen } - // enabling network interface back - rabbit.executeInContainerWithLogging("ifconfig", "eth0", "up") + var future: CompletableFuture<*>? = null + repeat(messagesCount) { index -> + if (index == 1) { + // delay should allow ack for the first message be received + Awaitility.await("first message is confirmed") + .pollInterval(10, TimeUnit.MILLISECONDS) + .atMost(100, TimeUnit.MILLISECONDS) + .until { countDown.count == messagesCount - 1L } + // Man pages: + // https://man7.org/linux/man-pages/man8/tc-netem.8.html + // https://man7.org/linux/man-pages/man8/ifconfig.8.html + // + // Here we try to emulate network outage to cause missing publication confirmations. + // + // In real life we will probably get duplicates in this case because + // rabbitmq does not provide exactly-once semantic. + // So, we will have to deal with it on the consumer side + rabbit.executeInContainerWithLogging("ifconfig", "eth0", "down") + } else if (index == 4) { + future = CompletableFuture.supplyAsync { + // Interface is unblock in separate thread to emulate more realistic scenario + + // More than 2 HB will be missed + // This is enough for rabbitmq server to understand the connection is lost + Awaitility.await("connection is closed") + .atMost(3, TimeUnit.SECONDS) + .until { !consumeManager.isOpen } + // enabling network interface back + rabbit.executeInContainerWithLogging("ifconfig", "eth0", "up") + } } + publishManager.basicPublish( + exchange, + routingKey, + null, + "Hello $index".toByteArray(Charsets.UTF_8) + ) } - manager.basicPublish(exchange, routingKey, null, "Hello $index".toByteArray(Charsets.UTF_8)) - } - future?.get(30, TimeUnit.SECONDS) + future?.get(30, TimeUnit.SECONDS) - countDown.assertComplete { "Not all messages were received: $receivedMessages" } - assertEquals( - (0 until messagesCount).map { - "Hello $it" - }, - receivedMessages.toList(), - "messages received in unexpected order", - ) + countDown.assertComplete { "Not all messages were received: $receivedMessages" } + assertEquals( + (0 until messagesCount).map { + "Hello $it" + }, + receivedMessages.toList(), + "messages received in unexpected order", + ) + } } } } @@ -171,7 +177,7 @@ class TestConnectionManager { val messagesCount = 10 val countDown = CountDownLatch(messagesCount) val messageSizeBytes = 7 - createConnectionManager( + createConsumeConnectionManager( rabbit, ConnectionManagerConfiguration( subscriberName = "test", prefetchCount = DEFAULT_PREFETCH_COUNT, @@ -197,7 +203,7 @@ class TestConnectionManager { LOGGER.info { "Canceled $it" } } - createConnectionManager( + createPublishConnectionManager( rabbit, ConnectionManagerConfiguration( prefetchCount = DEFAULT_PREFETCH_COUNT, confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT, @@ -263,44 +269,45 @@ class TestConnectionManager { LOGGER.info { "Started with port ${it.amqpPort}" } val queue = ArrayBlockingQueue<ManualAckDeliveryCallback.Confirmation>(DEFAULT_PREFETCH_COUNT) val countDown = CountDownLatch(DEFAULT_PREFETCH_COUNT) - createConnectionManager( - it, ConnectionManagerConfiguration( - subscriberName = "test", - prefetchCount = DEFAULT_PREFETCH_COUNT, - confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT, - ) - ).use { manager -> - manager.basicConsume(queueName, { _, delivery, ack -> - LOGGER.info { "Received ${delivery.body.toString(Charsets.UTF_8)} from ${delivery.envelope.routingKey}" } - queue += ack - countDown.countDown() - }) { - LOGGER.info { "Canceled $it" } - } + val managerConfiguration = ConnectionManagerConfiguration( + subscriberName = "test", + prefetchCount = DEFAULT_PREFETCH_COUNT, + confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT, + ) + createConsumeConnectionManager(it, managerConfiguration).use { consumeManager -> + createPublishConnectionManager(it, managerConfiguration).use { publishManager -> + consumeManager.basicConsume(queueName, { _, delivery, ack -> + LOGGER.info { "Received ${delivery.body.toString(Charsets.UTF_8)} from ${delivery.envelope.routingKey}" } + queue += ack + countDown.countDown() + }) { + LOGGER.info { "Canceled $it" } + } - repeat(DEFAULT_PREFETCH_COUNT + 1) { index -> - manager.basicPublish(exchange, routingKey, null, "Hello $index".toByteArray(Charsets.UTF_8)) - } + repeat(DEFAULT_PREFETCH_COUNT + 1) { index -> + publishManager.basicPublish(exchange, routingKey, null, "Hello $index".toByteArray(Charsets.UTF_8)) + } - countDown.assertComplete("Not all messages were received") + countDown.assertComplete("Not all messages were received") - assertTrue(manager.isAlive) { "Manager should still be alive" } - assertTrue(manager.isReady) { "Manager should be ready until the confirmation timeout expires" } + assertTrue(consumeManager.isAlive) { "Manager should still be alive" } + assertTrue(consumeManager.isReady) { "Manager should be ready until the confirmation timeout expires" } - Thread.sleep(DEFAULT_CONFIRMATION_TIMEOUT.toMillis() + 100/*just in case*/) // wait for confirmation timeout + Thread.sleep(DEFAULT_CONFIRMATION_TIMEOUT.toMillis() + 100/*just in case*/) // wait for confirmation timeout - assertTrue(manager.isAlive) { "Manager should still be alive" } - assertFalse(manager.isReady) { "Manager should not be ready" } + assertTrue(consumeManager.isAlive) { "Manager should still be alive" } + assertFalse(consumeManager.isReady) { "Manager should not be ready" } - queue.poll().confirm() + queue.poll().confirm() - assertTrue(manager.isAlive) { "Manager should still be alive" } - assertTrue(manager.isReady) { "Manager should be ready" } + assertTrue(consumeManager.isAlive) { "Manager should still be alive" } + assertTrue(consumeManager.isReady) { "Manager should be ready" } - val receivedData = generateSequence { queue.poll(10L, TimeUnit.MILLISECONDS) } - .onEach(ManualAckDeliveryCallback.Confirmation::confirm) - .count() - assertEquals(DEFAULT_PREFETCH_COUNT, receivedData) { "Unexpected number of messages received" } + val receivedData = generateSequence { queue.poll(10L, TimeUnit.MILLISECONDS) } + .onEach(ManualAckDeliveryCallback.Confirmation::confirm) + .count() + assertEquals(DEFAULT_PREFETCH_COUNT, receivedData) { "Unexpected number of messages received" } + } } } } @@ -311,7 +318,7 @@ class TestConnectionManager { rabbit .let { rabbitMQContainer -> LOGGER.info { "Started with port ${rabbitMQContainer.amqpPort}" } - createConnectionManager( + createConsumeConnectionManager( rabbitMQContainer, ConnectionManagerConfiguration( subscriberName = "test", @@ -338,7 +345,7 @@ class TestConnectionManager { assertTarget(true, message = "Thread for consuming isn't started", func = thread::isAlive) // todo check isReady and isAlive, it should be false at some point -// assertTarget(false, "Readiness probe doesn't fall down", connectionManager::isReady) + // assertTarget(false, "Readiness probe doesn't fall down", connectionManager::isReady) LOGGER.info { "creating the queue..." } declareQueue(rabbitMQContainer, wrongQueue) @@ -386,56 +393,56 @@ class TestConnectionManager { LOGGER.info { "Started with port ${it.amqpPort}" } val counter = AtomicInteger() val downLatch = CountDownLatch(1) - createConnectionManager( - it, - ConnectionManagerConfiguration( - subscriberName = "test", - prefetchCount = DEFAULT_PREFETCH_COUNT, - confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT, - minConnectionRecoveryTimeout = 100, - maxConnectionRecoveryTimeout = 200, - maxRecoveryAttempts = 5 - ), - ).use { connectionManager -> - var monitor: SubscriberMonitor? = null - try { - monitor = connectionManager.basicConsume(queueName, { _, delivery, _ -> - counter.incrementAndGet() - downLatch.countDown() - LOGGER.info { "Received ${delivery.body.toString(Charsets.UTF_8)} from \"${delivery.envelope.routingKey}\"" } - }) { - LOGGER.info { "Canceled $it" } - } - - LOGGER.info { "Starting first publishing..." } - connectionManager.basicPublish(exchange, "", null, "Hello1".toByteArray(Charsets.UTF_8)) - Thread.sleep(200) - LOGGER.info { "Publication finished!" } - assertEquals( - 0, - counter.get(), - ) { "Unexpected number of messages received. The first message shouldn't be received" } - LOGGER.info { "Creating the correct exchange..." } - declareFanoutExchangeWithBinding(it, exchange, queueName) - LOGGER.info { "Exchange created!" } + val connectionManagerConfiguration = ConnectionManagerConfiguration( + subscriberName = "test", + prefetchCount = DEFAULT_PREFETCH_COUNT, + confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT, + minConnectionRecoveryTimeout = 100, + maxConnectionRecoveryTimeout = 200, + maxRecoveryAttempts = 5 + ) + createConsumeConnectionManager(it, connectionManagerConfiguration).use { consumeManager -> + createPublishConnectionManager(it, connectionManagerConfiguration).use { publishManager -> + var monitor: SubscriberMonitor? = null + try { + monitor = consumeManager.basicConsume(queueName, { _, delivery, _ -> + counter.incrementAndGet() + downLatch.countDown() + LOGGER.info { "Received ${delivery.body.toString(Charsets.UTF_8)} from \"${delivery.envelope.routingKey}\"" } + }) { + LOGGER.info { "Canceled $it" } + } - Assertions.assertDoesNotThrow { - connectionManager.basicPublish(exchange, "", null, "Hello2".toByteArray(Charsets.UTF_8)) - } + LOGGER.info { "Starting first publishing..." } + publishManager.basicPublish(exchange, "", null, "Hello1".toByteArray(Charsets.UTF_8)) + Thread.sleep(200) + LOGGER.info { "Publication finished!" } + assertEquals( + 0, + counter.get(), + ) { "Unexpected number of messages received. The first message shouldn't be received" } + LOGGER.info { "Creating the correct exchange..." } + declareFanoutExchangeWithBinding(it, exchange, queueName) + LOGGER.info { "Exchange created!" } + + Assertions.assertDoesNotThrow { + publishManager.basicPublish(exchange, "", null, "Hello2".toByteArray(Charsets.UTF_8)) + } - downLatch.assertComplete(1L, TimeUnit.SECONDS) { "no messages were received" } + downLatch.assertComplete(1L, TimeUnit.SECONDS) { "no messages were received" } - assertEquals( - 1, - counter.get() - ) { "Unexpected number of messages received. The second message should be received" } - } finally { - Assertions.assertNotNull(monitor) - Assertions.assertDoesNotThrow { - monitor!!.unsubscribe() + assertEquals( + 1, + counter.get() + ) { "Unexpected number of messages received. The second message should be received" } + } finally { + Assertions.assertNotNull(monitor) + Assertions.assertDoesNotThrow { + monitor!!.unsubscribe() + } } - } + } } } } @@ -452,7 +459,7 @@ class TestConnectionManager { it.start() LOGGER.info { "Started with port ${it.amqpPort}" } - createConnectionManager( + createConsumeConnectionManager( it, ConnectionManagerConfiguration( subscriberName = "test", @@ -523,7 +530,7 @@ class TestConnectionManager { queueNames[1] to AtomicInteger(-1), // this subscriber won't ack two first deliveries queueNames[2] to AtomicInteger(1) ) - createConnectionManager( + createConsumeConnectionManager( it, ConnectionManagerConfiguration( subscriberName = "test", @@ -614,8 +621,8 @@ class TestConnectionManager { it.start() declareQueue(it, queueName) LOGGER.info { "Started with port ${it.amqpPort}" } - ConnectionManager( - "test-connection", + ConsumeConnectionManager( + "test-consume-connection", RabbitMQConfiguration( host = it.host, vHost = "", @@ -632,9 +639,9 @@ class TestConnectionManager { connectionTimeout = 1000, maxRecoveryAttempts = 5 ), - ).use { connectionManager -> + ).use { publishConnectionManager -> val consume = CountDownLatch(1) - connectionManager.basicConsume(queueName, { _, delivery, ack -> + publishConnectionManager.basicConsume(queueName, { _, delivery, ack -> LOGGER.info { "Received ${delivery.body.toString(Charsets.UTF_8)} from ${delivery.envelope.routingKey}" } consume.countDown() ack.confirm() @@ -678,47 +685,52 @@ class TestConnectionManager { .let { LOGGER.info { "Started with port ${it.amqpPort}" } val counter = AtomicInteger(0) - createConnectionManager( - it, - ConnectionManagerConfiguration( - subscriberName = "test", - prefetchCount = DEFAULT_PREFETCH_COUNT, - confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT, - minConnectionRecoveryTimeout = 10000, - maxConnectionRecoveryTimeout = 20000, - connectionTimeout = 10000, - maxRecoveryAttempts = 5 - ), - ).use { connectionManager -> - var monitor: SubscriberMonitor? = null - try { - declareQueue(it, queueName) - declareFanoutExchangeWithBinding(it, exchange, queueName) - - connectionManager.basicPublish(exchange, routingKey, null, "Hello1".toByteArray(Charsets.UTF_8)) + val connectionManagerConfiguration = ConnectionManagerConfiguration( + subscriberName = "test", + prefetchCount = DEFAULT_PREFETCH_COUNT, + confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT, + minConnectionRecoveryTimeout = 10000, + maxConnectionRecoveryTimeout = 20000, + connectionTimeout = 10000, + maxRecoveryAttempts = 5 + ) + createConsumeConnectionManager(it, connectionManagerConfiguration).use { consumeConnectionManager -> + createPublishConnectionManager(it, connectionManagerConfiguration).use { publishConnectionManager -> + var monitor: SubscriberMonitor? = null + try { + declareQueue(it, queueName) + declareFanoutExchangeWithBinding(it, exchange, queueName) + + publishConnectionManager.basicPublish( + exchange, + routingKey, + null, + "Hello1".toByteArray(Charsets.UTF_8) + ) - Thread.sleep(200) - monitor = connectionManager.basicConsume(queueName, { _, delivery, ack -> - LOGGER.info { "Received ${delivery.body.toString(Charsets.UTF_8)} from ${delivery.envelope.routingKey}" } - counter.incrementAndGet() - ack.confirm() - }) { - LOGGER.info { "Canceled $it" } + Thread.sleep(200) + monitor = consumeConnectionManager.basicConsume(queueName, { _, delivery, ack -> + LOGGER.info { "Received ${delivery.body.toString(Charsets.UTF_8)} from ${delivery.envelope.routingKey}" } + counter.incrementAndGet() + ack.confirm() + }) { + LOGGER.info { "Canceled $it" } + } + Thread.sleep(200) + + assertEquals(1, getSubscribedChannelsCount(it, queueName)) + assertEquals(1, counter.get()) { "Wrong number of received messages" } + assertTrue( + getQueuesInfo(it).toString().contains("$queueName\t0") + ) { "There should be no messages left in the queue" } + } finally { + Assertions.assertNotNull(monitor) + Assertions.assertDoesNotThrow { + monitor!!.unsubscribe() + } } - Thread.sleep(200) - assertEquals(1, getSubscribedChannelsCount(it, queueName)) - assertEquals(1, counter.get()) { "Wrong number of received messages" } - assertTrue( - getQueuesInfo(it).toString().contains("$queueName\t0") - ) { "There should be no messages left in the queue" } - } finally { - Assertions.assertNotNull(monitor) - Assertions.assertDoesNotThrow { - monitor!!.unsubscribe() - } } - } } } @@ -730,7 +742,6 @@ class TestConnectionManager { val exchange = "test-exchange7" val routingKey = "routingKey7" - RabbitMQContainer(RABBITMQ_IMAGE_NAME) .withRabbitMQConfig(MountableFile.forClasspathResource(configFilename)) .withExchange(exchange, BuiltinExchangeType.FANOUT.type, false, false, true, emptyMap()) @@ -740,56 +751,56 @@ class TestConnectionManager { it.start() LOGGER.info { "Started with port ${it.amqpPort}" } val counter = AtomicInteger(0) - createConnectionManager( - it, - ConnectionManagerConfiguration( - subscriberName = "test", - prefetchCount = DEFAULT_PREFETCH_COUNT, - confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT, - minConnectionRecoveryTimeout = 100, - maxConnectionRecoveryTimeout = 200, - maxRecoveryAttempts = 5 - ), - ).use { connectionManager -> - connectionManager.basicConsume(queueName, { _, delivery, ack -> - LOGGER.info { "Received ${delivery.body.toString(Charsets.UTF_8)} " } - if (counter.get() != 0) { - ack.confirm() - LOGGER.info { "Confirmed!" } - } else { - LOGGER.info { "Left this message unacked" } + val connectionManagerConfiguration = ConnectionManagerConfiguration( + subscriberName = "test", + prefetchCount = DEFAULT_PREFETCH_COUNT, + confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT, + minConnectionRecoveryTimeout = 100, + maxConnectionRecoveryTimeout = 200, + maxRecoveryAttempts = 5 + ) + createConsumeConnectionManager(it, connectionManagerConfiguration).use { consumeConnectionManager -> + createPublishConnectionManager(it, connectionManagerConfiguration).use { publishConnectionManager -> + consumeConnectionManager.basicConsume(queueName, { _, delivery, ack -> + LOGGER.info { "Received ${delivery.body.toString(Charsets.UTF_8)} " } + if (counter.get() != 0) { + ack.confirm() + LOGGER.info { "Confirmed!" } + } else { + LOGGER.info { "Left this message unacked" } + } + counter.incrementAndGet() + }) { + LOGGER.info { "Canceled $it" } } - counter.incrementAndGet() - }) { - LOGGER.info { "Canceled $it" } - } - LOGGER.info { "Sending the first message" } - connectionManager.basicPublish(exchange, routingKey, null, "Hello1".toByteArray(Charsets.UTF_8)) + LOGGER.info { "Sending the first message" } + publishConnectionManager.basicPublish(exchange, routingKey, null, "Hello1".toByteArray(Charsets.UTF_8)) - LOGGER.info { "queues list: \n ${getQueuesInfo(it)}" } - LOGGER.info { "Sleeping..." } - Thread.sleep(33000) + LOGGER.info { "queues list: \n ${getQueuesInfo(it)}" } + LOGGER.info { "Sleeping..." } + Thread.sleep(33000) - LOGGER.info { "Sending the second message" } - connectionManager.basicPublish(exchange, routingKey, null, "Hello2".toByteArray(Charsets.UTF_8)) + LOGGER.info { "Sending the second message" } + publishConnectionManager.basicPublish(exchange, routingKey, null, "Hello2".toByteArray(Charsets.UTF_8)) - Thread.sleep(30000) + Thread.sleep(30000) - LOGGER.info { "Sending the third message" } - connectionManager.basicPublish(exchange, routingKey, null, "Hello3".toByteArray(Charsets.UTF_8)) + LOGGER.info { "Sending the third message" } + publishConnectionManager.basicPublish(exchange, routingKey, null, "Hello3".toByteArray(Charsets.UTF_8)) - val queuesListExecResult = getQueuesInfo(it) - LOGGER.info { "queues list: \n $queuesListExecResult" } + val queuesListExecResult = getQueuesInfo(it) + LOGGER.info { "queues list: \n $queuesListExecResult" } - assertEquals(1, getSubscribedChannelsCount(it, queueName)) - assertEquals(4, counter.get()) { "Wrong number of received messages" } - assertTrue( - queuesListExecResult.toString().contains("$queueName\t0") - ) { "There should be no messages left in the queue" } + assertEquals(1, getSubscribedChannelsCount(it, queueName)) + assertEquals(4, counter.get()) { "Wrong number of received messages" } + assertTrue( + queuesListExecResult.toString().contains("$queueName\t0") + ) { "There should be no messages left in the queue" } + } } } } @@ -801,7 +812,7 @@ class TestConnectionManager { .let { LOGGER.info { "Started with port ${it.amqpPort}" } val counter = AtomicInteger(0) - createConnectionManager( + createConsumeConnectionManager( it, ConnectionManagerConfiguration( subscriberName = "test", @@ -852,7 +863,7 @@ class TestConnectionManager { .let { LOGGER.info { "Started with port ${it.amqpPort}" } val counter = AtomicInteger(0) - createConnectionManager( + createConsumeConnectionManager( it, ConnectionManagerConfiguration( subscriberName = "test", @@ -976,7 +987,7 @@ class TestConnectionManager { ) val testCasesContexts: List<TestCaseContext> = testCases.map { params -> - val connectionManager = createConnectionManager( + val connectionManager = createConsumeConnectionManager( container, ConnectionManagerConfiguration( subscriberName = params.subscriberName, @@ -1099,16 +1110,17 @@ class TestConnectionManager { assertEquals(target, func(), message) } - private fun createConnectionManager(container: RabbitMQContainer, configuration: ConnectionManagerConfiguration) = - ConnectionManager( - "test-connection", - RabbitMQConfiguration( - host = container.host, - vHost = "", - port = container.amqpPort, - username = container.adminUsername, - password = container.adminPassword, - ), + private fun createPublishConnectionManager(container: RabbitMQContainer, configuration: ConnectionManagerConfiguration) = + PublishConnectionManager( + "test-publish-connection", + getRabbitMQConfiguration(container), + configuration + ) + + private fun createConsumeConnectionManager(container: RabbitMQContainer, configuration: ConnectionManagerConfiguration) = + ConsumeConnectionManager( + "test-consume-connection", + getRabbitMQConfiguration(container), configuration ) @@ -1118,24 +1130,26 @@ class TestConnectionManager { rabbitMQContainer.start() LOGGER.info { "Started with port ${rabbitMQContainer.amqpPort}" } - createConnectionManager(rabbitMQContainer).use { firstManager -> - createConnectionManager(rabbitMQContainer).use { secondManager -> - val queue = firstManager.queueDeclare() + createConsumeConnectionManager(rabbitMQContainer).use { firstConsumeManager -> + createConsumeConnectionManager(rabbitMQContainer).use { secondConsumeManager -> + createPublishConnectionManager(rabbitMQContainer).use { publishManager -> + val queue = firstConsumeManager.queueDeclare() - assertFailsWith<IOException>("Another connection can subscribe to the $queue queue") { - secondManager.basicConsume(queue, { _, _, _ -> }, {}) - } + assertFailsWith<IOException>("Another connection can subscribe to the $queue queue") { + secondConsumeManager.basicConsume(queue, { _, _, _ -> }, {}) + } - extracted(firstManager, secondManager, queue, 3) - extracted(firstManager, secondManager, queue, 6) + extracted(firstConsumeManager, publishManager, queue, 3) + extracted(firstConsumeManager, publishManager, queue, 6) + } } } } } private fun extracted( - firstManager: ConnectionManager, - secondManager: ConnectionManager, + firstManager: ConsumeConnectionManager, + publishManager: PublishConnectionManager, queue: String, cycle: Int ) { @@ -1151,7 +1165,7 @@ class TestConnectionManager { val secondMonitor = firstManager.basicConsume(queue, deliverCallback, cancelCallback) repeat(cycle) { index -> - secondManager.basicPublish( + publishManager.basicPublish( "", queue, null, @@ -1173,24 +1187,30 @@ class TestConnectionManager { secondMonitor.unsubscribe() } - private fun createConnectionManager( + private fun getConnectionManagerConfiguration(prefetchCount: Int, confirmationTimeout: Duration) = ConnectionManagerConfiguration( + subscriberName = "test", + prefetchCount = prefetchCount, + confirmationTimeout = confirmationTimeout + ) + + private fun createPublishConnectionManager( rabbitMQContainer: RabbitMQContainer, prefetchCount: Int = DEFAULT_PREFETCH_COUNT, confirmationTimeout: Duration = DEFAULT_CONFIRMATION_TIMEOUT, - ) = ConnectionManager( - "test-connection", - RabbitMQConfiguration( - host = rabbitMQContainer.host, - vHost = "", - port = rabbitMQContainer.amqpPort, - username = rabbitMQContainer.adminUsername, - password = rabbitMQContainer.adminPassword, - ), - ConnectionManagerConfiguration( - subscriberName = "test", - prefetchCount = prefetchCount, - confirmationTimeout = confirmationTimeout - ) + ) = PublishConnectionManager( + "test-publish-connection", + getRabbitMQConfiguration(rabbitMQContainer), + getConnectionManagerConfiguration(prefetchCount, confirmationTimeout) + ) + + private fun createConsumeConnectionManager( + rabbitMQContainer: RabbitMQContainer, + prefetchCount: Int = DEFAULT_PREFETCH_COUNT, + confirmationTimeout: Duration = DEFAULT_CONFIRMATION_TIMEOUT, + ) = ConsumeConnectionManager( + "test-consume-connection", + getRabbitMQConfiguration(rabbitMQContainer), + getConnectionManagerConfiguration(prefetchCount, confirmationTimeout) ) @Test @@ -1209,83 +1229,89 @@ class TestConnectionManager { val blockAfter = 3 val countDown = CountDownLatch(messagesCount) val messageSizeBytes = 7 - createConnectionManager( - rabbit, ConnectionManagerConfiguration( - subscriberName = "test", - prefetchCount = DEFAULT_PREFETCH_COUNT, - confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT, - enablePublisherConfirmation = true, - maxInflightPublicationsBytes = messagesCount * messageSizeBytes, - heartbeatIntervalSeconds = 1, - minConnectionRecoveryTimeout = 2000, - maxConnectionRecoveryTimeout = 2000, - // to avoid unexpected delays before recovery - retryTimeDeviationPercent = 0 - ) - ).use { manager -> - repeat(messagesCount) { index -> - if (index == blockAfter) { - assertFalse(manager.isPublishingBlocked) - - // blocks all publishers ( https://www.rabbitmq.com/docs/memory ) - rabbit.executeInContainerWithLogging("rabbitmqctl", "set_vm_memory_high_watermark", "0") - } + val connectionManagerConfiguration = ConnectionManagerConfiguration( + subscriberName = "test", + prefetchCount = DEFAULT_PREFETCH_COUNT, + confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT, + enablePublisherConfirmation = true, + maxInflightPublicationsBytes = messagesCount * messageSizeBytes, + heartbeatIntervalSeconds = 1, + minConnectionRecoveryTimeout = 2000, + maxConnectionRecoveryTimeout = 2000, + // to avoid unexpected delays before recovery + retryTimeDeviationPercent = 0 + ) + createConsumeConnectionManager(rabbit, connectionManagerConfiguration).use { consumeManager -> + createPublishConnectionManager(rabbit, connectionManagerConfiguration).use { publishManager -> + repeat(messagesCount) { index -> + if (index == blockAfter) { + assertFalse(publishManager.isPublishingBlocked) - manager.basicPublish(exchange, routingKey, null, "Hello $index".toByteArray(Charsets.UTF_8)) - LOGGER.info("Published $index") + // blocks all publishers ( https://www.rabbitmq.com/docs/memory ) + rabbit.executeInContainerWithLogging("rabbitmqctl", "set_vm_memory_high_watermark", "0") + } - if (index == blockAfter) { - // wait for blocking of publishing connection - Awaitility.await("publishing blocked") - .pollInterval(10L, TimeUnit.MILLISECONDS) - .atMost(100L, TimeUnit.MILLISECONDS) - .until { manager.isPublishingBlocked } + publishManager.basicPublish( + exchange, + routingKey, + null, + "Hello $index".toByteArray(Charsets.UTF_8) + ) + LOGGER.info("Published $index") + + if (index == blockAfter) { + // wait for blocking of publishing connection + Awaitility.await("publishing blocked") + .pollInterval(10L, TimeUnit.MILLISECONDS) + .atMost(100L, TimeUnit.MILLISECONDS) + .until { publishManager.isPublishingBlocked } + } } - } - val receivedMessages = linkedSetOf<String>() - LOGGER.info { "creating consumer" } - - val subscribeFuture = Executors.newSingleThreadExecutor().submit { - manager.basicConsume(queueName, { _, delivery, ack -> - val message = delivery.body.toString(Charsets.UTF_8) - LOGGER.info { "Received $message from ${delivery.envelope.routingKey}" } - if (receivedMessages.add(message)) { - // decrement only unique messages - countDown.countDown() - } else { - LOGGER.warn { "Duplicated $message for ${delivery.envelope.routingKey}" } + val receivedMessages = linkedSetOf<String>() + LOGGER.info { "creating consumer" } + + val subscribeFuture = Executors.newSingleThreadExecutor().submit { + consumeManager.basicConsume(queueName, { _, delivery, ack -> + val message = delivery.body.toString(Charsets.UTF_8) + LOGGER.info { "Received $message from ${delivery.envelope.routingKey}" } + if (receivedMessages.add(message)) { + // decrement only unique messages + countDown.countDown() + } else { + LOGGER.warn { "Duplicated $message for ${delivery.envelope.routingKey}" } + } + ack.confirm() + }) { + LOGGER.info { "Canceled $it" } } - ack.confirm() - }) { - LOGGER.info { "Canceled $it" } } - } - assertDoesNotThrow("Failed to subscribe to queue") { - // if subscription connection is blocked generates TimeoutException - subscribeFuture.get(1, TimeUnit.SECONDS) - subscribeFuture.cancel(true) - } + assertDoesNotThrow("Failed to subscribe to queue") { + // if subscription connection is blocked generates TimeoutException + subscribeFuture.get(1, TimeUnit.SECONDS) + subscribeFuture.cancel(true) + } - Awaitility.await("receive messages sent before blocking") - .pollInterval(10L, TimeUnit.MILLISECONDS) - .atMost(100L, TimeUnit.MILLISECONDS) - .until { blockAfter.toLong() == messagesCount - countDown.count } + Awaitility.await("receive messages sent before blocking") + .pollInterval(10L, TimeUnit.MILLISECONDS) + .atMost(100L, TimeUnit.MILLISECONDS) + .until { blockAfter.toLong() == messagesCount - countDown.count } - Thread.sleep(100) // ensure no more messages received - assertEquals(blockAfter.toLong(), messagesCount - countDown.count) - assertTrue(manager.isPublishingBlocked) + Thread.sleep(100) // ensure no more messages received + assertEquals(blockAfter.toLong(), messagesCount - countDown.count) + assertTrue(publishManager.isPublishingBlocked) - // unblocks publishers - rabbit.executeInContainerWithLogging("rabbitmqctl", "set_vm_memory_high_watermark", "0.4") - assertFalse(manager.isPublishingBlocked) + // unblocks publishers + rabbit.executeInContainerWithLogging("rabbitmqctl", "set_vm_memory_high_watermark", "0.4") + assertFalse(publishManager.isPublishingBlocked) - // delay receiving all messages - Awaitility.await("all messages received") - .pollInterval(10L, TimeUnit.MILLISECONDS) - .atMost(100L, TimeUnit.MILLISECONDS) - .until { countDown.count == 0L } + // delay receiving all messages + Awaitility.await("all messages received") + .pollInterval(10L, TimeUnit.MILLISECONDS) + .atMost(100L, TimeUnit.MILLISECONDS) + .until { countDown.count == 0L } + } } } } diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/IntegrationTestRabbitMessageBatchRouter.kt b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/IntegrationTestRabbitMessageBatchRouter.kt index 0dd44ec8b..34b0458ae 100644 --- a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/IntegrationTestRabbitMessageBatchRouter.kt +++ b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/IntegrationTestRabbitMessageBatchRouter.kt @@ -28,8 +28,9 @@ import com.exactpro.th2.common.schema.message.ContainerConstants.ROUTING_KEY import com.exactpro.th2.common.schema.message.configuration.MessageRouterConfiguration import com.exactpro.th2.common.schema.message.impl.context.DefaultMessageRouterContext import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.ConnectionManagerConfiguration -import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.RabbitMQConfiguration -import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager +import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.PublishConnectionManager +import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConsumeConnectionManager +import com.exactpro.th2.common.util.getRabbitMQConfiguration import com.rabbitmq.client.BuiltinExchangeType import mu.KotlinLogging import org.junit.jupiter.api.Test @@ -49,18 +50,24 @@ class IntegrationTestRabbitMessageGroupBatchRouter { rabbitMQContainer.start() LOGGER.info { "Started with port ${rabbitMQContainer.amqpPort}" } - createConnectionManager(rabbitMQContainer).use { firstManager -> - createRouter(firstManager).use { firstRouter -> - createConnectionManager(rabbitMQContainer).use { secondManager -> - createRouter(secondManager).use { secondRouter -> - val counter = CountDownLatch(1) - val monitor = firstRouter.subscribeExclusive { _, _ -> counter.countDown() } - try { - secondRouter.sendExclusive(monitor.queue, MessageGroupBatch.getDefaultInstance()) - assertTrue("Message is not received") { counter.await(1, TimeUnit.SECONDS) } - - } finally { - monitor.unsubscribe() + createConsumeConnectionManager(rabbitMQContainer).use { firstConsumeManager -> + createPublishConnectionManager(rabbitMQContainer).use { firstPublishManager -> + createRouter(firstPublishManager, firstConsumeManager).use { firstRouter -> + createConsumeConnectionManager(rabbitMQContainer).use { secondConsumeManager -> + createPublishConnectionManager(rabbitMQContainer).use { secondPublishManager -> + createRouter(secondPublishManager, secondConsumeManager).use { secondRouter -> + val counter = CountDownLatch(1) + val monitor = firstRouter.subscribeExclusive { _, _ -> counter.countDown() } + try { + secondRouter.sendExclusive( + monitor.queue, + MessageGroupBatch.getDefaultInstance() + ) + assertTrue("Message is not received") { counter.await(1, TimeUnit.SECONDS) } + } finally { + monitor.unsubscribe() + } + } } } } @@ -79,19 +86,23 @@ class IntegrationTestRabbitMessageGroupBatchRouter { rabbitMQContainer.start() LOGGER.info { "Started with port ${rabbitMQContainer.amqpPort}" } - createConnectionManager(rabbitMQContainer).use { firstManager -> - createRouter(firstManager).use { firstRouter -> - createConnectionManager(rabbitMQContainer).use { secondManager -> - createRouter(secondManager).use { secondRouter -> - val counter = CountDownLatch(1) - val monitor = firstRouter.subscribeExclusive { _, _ -> counter.countDown() } - try { + createConsumeConnectionManager(rabbitMQContainer).use { firstConsumeManager -> + createPublishConnectionManager(rabbitMQContainer).use { firstPublishManager -> + createRouter(firstPublishManager, firstConsumeManager).use { firstRouter -> + createConsumeConnectionManager(rabbitMQContainer).use { secondConsumeManager -> + createPublishConnectionManager(rabbitMQContainer).use { secondPublishManager -> + createRouter(secondPublishManager, secondConsumeManager).use { secondRouter -> + val counter = CountDownLatch(1) + val monitor = firstRouter.subscribeExclusive { _, _ -> counter.countDown() } + try { - secondRouter.sendExclusive(monitor.queue, MessageGroupBatch.getDefaultInstance()) - assertTrue("Message is not received") { counter.await(1, TimeUnit.SECONDS) } + secondRouter.sendExclusive(monitor.queue, MessageGroupBatch.getDefaultInstance()) + assertTrue("Message is not received") { counter.await(1, TimeUnit.SECONDS) } - } finally { - monitor.unsubscribe() + } finally { + monitor.unsubscribe() + } + } } } } @@ -100,12 +111,12 @@ class IntegrationTestRabbitMessageGroupBatchRouter { } } - private fun createRouter(connectionManager: ConnectionManager) = RabbitMessageGroupBatchRouter() + private fun createRouter(publishConnectionManager: PublishConnectionManager, consumeConnectionManager: ConsumeConnectionManager) = RabbitMessageGroupBatchRouter() .apply { init( DefaultMessageRouterContext( - connectionManager, - connectionManager, + publishConnectionManager, + consumeConnectionManager, mock { }, MessageRouterConfiguration(), BoxConfiguration() @@ -113,24 +124,30 @@ class IntegrationTestRabbitMessageGroupBatchRouter { ) } - private fun createConnectionManager( + private fun getConnectionManagerConfiguration(prefetchCount: Int, confirmationTimeout: Duration) = ConnectionManagerConfiguration( + subscriberName = "test", + prefetchCount = prefetchCount, + confirmationTimeout = confirmationTimeout + ) + + private fun createPublishConnectionManager( + rabbitMQContainer: RabbitMQContainer, + prefetchCount: Int = DEFAULT_PREFETCH_COUNT, + confirmationTimeout: Duration = DEFAULT_CONFIRMATION_TIMEOUT + ) = PublishConnectionManager( + "test-publish-connection", + getRabbitMQConfiguration(rabbitMQContainer), + getConnectionManagerConfiguration(prefetchCount, confirmationTimeout) + ) + + private fun createConsumeConnectionManager( rabbitMQContainer: RabbitMQContainer, prefetchCount: Int = DEFAULT_PREFETCH_COUNT, confirmationTimeout: Duration = DEFAULT_CONFIRMATION_TIMEOUT - ) = ConnectionManager( - "test-connection", - RabbitMQConfiguration( - host = rabbitMQContainer.host, - vHost = "", - port = rabbitMQContainer.amqpPort, - username = rabbitMQContainer.adminUsername, - password = rabbitMQContainer.adminPassword, - ), - ConnectionManagerConfiguration( - subscriberName = "test", - prefetchCount = prefetchCount, - confirmationTimeout = confirmationTimeout, - ), + ) = ConsumeConnectionManager( + "test-consume-connection", + getRabbitMQConfiguration(rabbitMQContainer), + getConnectionManagerConfiguration(prefetchCount, confirmationTimeout) ) companion object { diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/TestRabbitMessageBatchRouter.kt b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/TestRabbitMessageBatchRouter.kt index ed05dc4b2..49ff54078 100644 --- a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/TestRabbitMessageBatchRouter.kt +++ b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/TestRabbitMessageBatchRouter.kt @@ -1,5 +1,5 @@ /* - * Copyright 2021 Exactpro (Exactpro Systems Limited) + * Copyright 2021-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,7 +33,8 @@ import com.exactpro.th2.common.schema.message.configuration.MqRouterFilterConfig import com.exactpro.th2.common.schema.message.configuration.QueueConfiguration import com.exactpro.th2.common.schema.message.impl.context.DefaultMessageRouterContext import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.ConnectionManagerConfiguration -import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager +import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.PublishConnectionManager +import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConsumeConnectionManager import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Assertions.assertArrayEquals import org.junit.jupiter.api.Assertions.assertThrows @@ -46,13 +47,15 @@ import org.mockito.kotlin.argumentCaptor import org.mockito.kotlin.eq import org.mockito.kotlin.mock import org.mockito.kotlin.never -import org.mockito.kotlin.times import org.mockito.kotlin.verify class TestRabbitMessageGroupBatchRouter { private val connectionConfiguration = ConnectionManagerConfiguration() private val managerMonitor: ExclusiveSubscriberMonitor = mock { } - private val connectionManager: ConnectionManager = mock { + private val publishConnectionManager: PublishConnectionManager = mock { + on { configuration }.thenReturn(connectionConfiguration) + } + private val consumeConnectionManager: ConsumeConnectionManager = mock { on { configuration }.thenReturn(connectionConfiguration) on { basicConsume(any(), any(), any()) }.thenReturn(managerMonitor) } @@ -116,7 +119,7 @@ class TestRabbitMessageGroupBatchRouter { router.send(batch, "test") val captor = argumentCaptor<ByteArray>() - verify(connectionManager).basicPublish(eq("test-exchange"), eq("test2"), anyOrNull(), captor.capture()) + verify(publishConnectionManager).basicPublish(eq("test-exchange"), eq("test2"), anyOrNull(), captor.capture()) val publishedBytes = captor.firstValue assertArrayEquals(batch.toByteArray(), publishedBytes) { "Unexpected batch published: ${MessageGroupBatch.parseFrom(publishedBytes)}" @@ -132,7 +135,7 @@ class TestRabbitMessageGroupBatchRouter { ).build() ) - verify(connectionManager, never()).basicPublish(any(), any(), anyOrNull(), any()) + verify(publishConnectionManager, never()).basicPublish(any(), any(), anyOrNull(), any()) } @Test @@ -144,7 +147,7 @@ class TestRabbitMessageGroupBatchRouter { router.send(batch, "test") val captor = argumentCaptor<ByteArray>() - verify(connectionManager).basicPublish(eq("test-exchange"), eq("test2"), anyOrNull(), captor.capture()) + verify(publishConnectionManager).basicPublish(eq("test-exchange"), eq("test2"), anyOrNull(), captor.capture()) val publishedBytes = captor.firstValue assertArrayEquals(batch.toByteArray(), publishedBytes) { "Unexpected batch published: ${MessageGroupBatch.parseFrom(publishedBytes)}" @@ -192,8 +195,8 @@ class TestRabbitMessageGroupBatchRouter { router.sendAll(batch) val captor = argumentCaptor<ByteArray>() - verify(connectionManager).basicPublish(eq("test-exchange"), eq("test"), anyOrNull(), captor.capture()) - verify(connectionManager).basicPublish(eq("test-exchange"), eq("test2"), anyOrNull(), captor.capture()) + verify(publishConnectionManager).basicPublish(eq("test-exchange"), eq("test"), anyOrNull(), captor.capture()) + verify(publishConnectionManager).basicPublish(eq("test-exchange"), eq("test2"), anyOrNull(), captor.capture()) val originalBytes = batch.toByteArray() Assertions.assertAll( Executable { @@ -251,7 +254,7 @@ class TestRabbitMessageGroupBatchRouter { router.send(batch, "test") val captor = argumentCaptor<ByteArray>() - verify(connectionManager).basicPublish(eq("test-exchange"), eq("publish"), anyOrNull(), captor.capture()) + verify(publishConnectionManager).basicPublish(eq("test-exchange"), eq("publish"), anyOrNull(), captor.capture()) val publishedBytes = captor.firstValue assertArrayEquals(batch.toByteArray(), publishedBytes) { "Unexpected batch published: ${MessageGroupBatch.parseFrom(publishedBytes)}" @@ -297,11 +300,11 @@ class TestRabbitMessageGroupBatchRouter { private fun createRouter(pins: Map<String, QueueConfiguration>): MessageRouter<MessageGroupBatch> = RabbitMessageGroupBatchRouter().apply { init(DefaultMessageRouterContext( - connectionManager, - connectionManager, + publishConnectionManager, + consumeConnectionManager, mock { }, MessageRouterConfiguration(pins, GlobalNotificationConfiguration()), BOX_CONFIGURATION )) } -} +} \ No newline at end of file diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/TransportGroupBatchRouterIntegrationTest.kt b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/TransportGroupBatchRouterIntegrationTest.kt index 60d61522e..2c666b18b 100644 --- a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/TransportGroupBatchRouterIntegrationTest.kt +++ b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/TransportGroupBatchRouterIntegrationTest.kt @@ -27,8 +27,9 @@ import com.exactpro.th2.common.schema.message.ContainerConstants.ROUTING_KEY import com.exactpro.th2.common.schema.message.configuration.MessageRouterConfiguration import com.exactpro.th2.common.schema.message.impl.context.DefaultMessageRouterContext import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.ConnectionManagerConfiguration -import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.RabbitMQConfiguration -import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager +import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.PublishConnectionManager +import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConsumeConnectionManager +import com.exactpro.th2.common.util.getRabbitMQConfiguration import com.rabbitmq.client.BuiltinExchangeType import mu.KotlinLogging import org.junit.jupiter.api.Test @@ -49,21 +50,25 @@ class TransportGroupBatchRouterIntegrationTest { rabbitMQContainer.start() LOGGER.info { "Started with port ${rabbitMQContainer.amqpPort}" } - createConnectionManager(rabbitMQContainer).use { firstManager -> - createRouter(firstManager).use { firstRouter -> - createConnectionManager(rabbitMQContainer).use { secondManager -> - createRouter(secondManager).use { secondRouter -> - val counter = CountDownLatch(1) - val monitor = firstRouter.subscribeExclusive { _, _ -> counter.countDown() } - try { - secondRouter.sendExclusive(monitor.queue, GroupBatch.builder() - .setBook("") - .setSessionGroup("") - .build()) - assertTrue("Message is not received") { counter.await(1, TimeUnit.SECONDS) } + createConsumeConnectionManager(rabbitMQContainer).use { firstConsumeManager -> + createPublishConnectionManager(rabbitMQContainer).use { firstPublishManager -> + createRouter(firstPublishManager, firstConsumeManager).use { firstRouter -> + createConsumeConnectionManager(rabbitMQContainer).use { secondConsumeManager -> + createPublishConnectionManager(rabbitMQContainer).use { secondPublishManager -> + createRouter(secondPublishManager, secondConsumeManager).use { secondRouter -> + val counter = CountDownLatch(1) + val monitor = firstRouter.subscribeExclusive { _, _ -> counter.countDown() } + try { + secondRouter.sendExclusive(monitor.queue, GroupBatch.builder() + .setBook("") + .setSessionGroup("") + .build()) + assertTrue("Message is not received") { counter.await(1, TimeUnit.SECONDS) } - } finally { - monitor.unsubscribe() + } finally { + monitor.unsubscribe() + } + } } } } @@ -82,22 +87,26 @@ class TransportGroupBatchRouterIntegrationTest { rabbitMQContainer.start() LOGGER.info { "Started with port ${rabbitMQContainer.amqpPort}" } - createConnectionManager(rabbitMQContainer).use { firstManager -> - createRouter(firstManager).use { firstRouter -> - createConnectionManager(rabbitMQContainer).use { secondManager -> - createRouter(secondManager).use { secondRouter -> - val counter = CountDownLatch(1) - val monitor = firstRouter.subscribeExclusive { _, _ -> counter.countDown() } - try { + createConsumeConnectionManager(rabbitMQContainer).use { firstConsumeManager -> + createPublishConnectionManager(rabbitMQContainer).use { firstPublishManager -> + createRouter(firstPublishManager, firstConsumeManager).use { firstRouter -> + createConsumeConnectionManager(rabbitMQContainer).use { secondConsumeManager -> + createPublishConnectionManager(rabbitMQContainer).use { secondPublishManager -> + createRouter(secondPublishManager, secondConsumeManager).use { secondRouter -> + val counter = CountDownLatch(1) + val monitor = firstRouter.subscribeExclusive { _, _ -> counter.countDown() } + try { - secondRouter.sendExclusive(monitor.queue, GroupBatch.builder() - .setBook("") - .setSessionGroup("") - .build()) - assertTrue("Message is not received") { counter.await(1, TimeUnit.SECONDS) } + secondRouter.sendExclusive(monitor.queue, GroupBatch.builder() + .setBook("") + .setSessionGroup("") + .build()) + assertTrue("Message is not received") { counter.await(1, TimeUnit.SECONDS) } - } finally { - monitor.unsubscribe() + } finally { + monitor.unsubscribe() + } + } } } } @@ -106,12 +115,12 @@ class TransportGroupBatchRouterIntegrationTest { } } - private fun createRouter(connectionManager: ConnectionManager) = TransportGroupBatchRouter() + private fun createRouter(publishConnectionManager: PublishConnectionManager, consumeConnectionManager: ConsumeConnectionManager) = TransportGroupBatchRouter() .apply { init( DefaultMessageRouterContext( - connectionManager, - connectionManager, + publishConnectionManager, + consumeConnectionManager, mock { }, MessageRouterConfiguration(), BoxConfiguration() @@ -119,24 +128,30 @@ class TransportGroupBatchRouterIntegrationTest { ) } - private fun createConnectionManager( + private fun getConnectionManagerConfiguration(prefetchCount: Int, confirmationTimeout: Duration) = ConnectionManagerConfiguration( + subscriberName = "test", + prefetchCount = prefetchCount, + confirmationTimeout = confirmationTimeout + ) + + private fun createPublishConnectionManager( + rabbitMQContainer: RabbitMQContainer, + prefetchCount: Int = DEFAULT_PREFETCH_COUNT, + confirmationTimeout: Duration = DEFAULT_CONFIRMATION_TIMEOUT + ) = PublishConnectionManager( + "test-publish-connection", + getRabbitMQConfiguration(rabbitMQContainer), + getConnectionManagerConfiguration(prefetchCount, confirmationTimeout) + ) + + private fun createConsumeConnectionManager( rabbitMQContainer: RabbitMQContainer, prefetchCount: Int = DEFAULT_PREFETCH_COUNT, confirmationTimeout: Duration = DEFAULT_CONFIRMATION_TIMEOUT - ) = ConnectionManager( - "test-connection", - RabbitMQConfiguration( - host = rabbitMQContainer.host, - vHost = "", - port = rabbitMQContainer.amqpPort, - username = rabbitMQContainer.adminUsername, - password = rabbitMQContainer.adminPassword, - ), - ConnectionManagerConfiguration( - subscriberName = "test", - prefetchCount = prefetchCount, - confirmationTimeout = confirmationTimeout, - ), + ) = ConsumeConnectionManager( + "test-consume-connection", + getRabbitMQConfiguration(rabbitMQContainer), + getConnectionManagerConfiguration(prefetchCount, confirmationTimeout) ) companion object { diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/TransportGroupBatchRouterTest.kt b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/TransportGroupBatchRouterTest.kt index fc094109b..60fc434b1 100644 --- a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/TransportGroupBatchRouterTest.kt +++ b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/TransportGroupBatchRouterTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2023 Exactpro (Exactpro Systems Limited) + * Copyright 2023-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,15 +20,20 @@ import com.exactpro.th2.common.event.bean.BaseTest.* import com.exactpro.th2.common.grpc.MessageGroupBatch import com.exactpro.th2.common.schema.message.ExclusiveSubscriberMonitor import com.exactpro.th2.common.schema.message.MessageRouter -import com.exactpro.th2.common.schema.message.configuration.* +import com.exactpro.th2.common.schema.message.configuration.QueueConfiguration +import com.exactpro.th2.common.schema.message.configuration.MqRouterFilterConfiguration +import com.exactpro.th2.common.schema.message.configuration.FieldFilterConfiguration +import com.exactpro.th2.common.schema.message.configuration.FieldFilterOperation +import com.exactpro.th2.common.schema.message.configuration.MessageRouterConfiguration +import com.exactpro.th2.common.schema.message.configuration.GlobalNotificationConfiguration import com.exactpro.th2.common.schema.message.impl.context.DefaultMessageRouterContext import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.ConnectionManagerConfiguration -import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager +import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.PublishConnectionManager +import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConsumeConnectionManager import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.TransportGroupBatchRouter.Companion.TRANSPORT_GROUP_ATTRIBUTE import io.netty.buffer.Unpooled import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Assertions.assertArrayEquals -import org.junit.jupiter.api.Assertions.assertNotNull import org.junit.jupiter.api.Assertions.assertThrows import org.junit.jupiter.api.Nested import org.junit.jupiter.api.Test @@ -39,7 +44,10 @@ import java.time.Instant class TransportGroupBatchRouterTest { private val connectionConfiguration = ConnectionManagerConfiguration() private val managerMonitor: ExclusiveSubscriberMonitor = mock { } - private val connectionManager: ConnectionManager = mock { + private val publishConnectionManager: PublishConnectionManager = mock { + on { configuration }.thenReturn(connectionConfiguration) + } + private val consumeConnectionManager: ConsumeConnectionManager = mock { on { configuration }.thenReturn(connectionConfiguration) on { basicConsume(any(), any(), any()) }.thenReturn(managerMonitor) } @@ -98,7 +106,7 @@ class TransportGroupBatchRouterTest { router.send(batch, "test") val captor = argumentCaptor<ByteArray>() - verify(connectionManager).basicPublish(eq("test-exchange"), eq("test2"), anyOrNull(), captor.capture()) + verify(publishConnectionManager).basicPublish(eq("test-exchange"), eq("test2"), anyOrNull(), captor.capture()) val publishedBytes = captor.firstValue assertArrayEquals(batch.toByteArray(), publishedBytes) { "Unexpected batch published: ${MessageGroupBatch.parseFrom(publishedBytes)}" @@ -109,7 +117,7 @@ class TransportGroupBatchRouterTest { fun `does not publish anything if all messages are filtered`() { router.send(createGroupBatch("test-message1")) - verify(connectionManager, never()).basicPublish(any(), any(), anyOrNull(), any()) + verify(publishConnectionManager, never()).basicPublish(any(), any(), anyOrNull(), any()) } @Test @@ -118,7 +126,7 @@ class TransportGroupBatchRouterTest { router.send(batch, "test") val captor = argumentCaptor<ByteArray>() - verify(connectionManager).basicPublish(eq("test-exchange"), eq("test2"), anyOrNull(), captor.capture()) + verify(publishConnectionManager).basicPublish(eq("test-exchange"), eq("test2"), anyOrNull(), captor.capture()) val publishedBytes = captor.firstValue assertArrayEquals(batch.toByteArray(), publishedBytes) { "Unexpected batch published: ${MessageGroupBatch.parseFrom(publishedBytes)}" @@ -155,8 +163,8 @@ class TransportGroupBatchRouterTest { router.sendAll(batch) val captor = argumentCaptor<ByteArray>() - verify(connectionManager).basicPublish(eq("test-exchange"), eq("test"), anyOrNull(), captor.capture()) - verify(connectionManager).basicPublish(eq("test-exchange"), eq("test2"), anyOrNull(), captor.capture()) + verify(publishConnectionManager).basicPublish(eq("test-exchange"), eq("test"), anyOrNull(), captor.capture()) + verify(publishConnectionManager).basicPublish(eq("test-exchange"), eq("test2"), anyOrNull(), captor.capture()) val originalBytes = batch.toByteArray() Assertions.assertAll( Executable { @@ -206,7 +214,7 @@ class TransportGroupBatchRouterTest { router.send(batch, "test") val captor = argumentCaptor<ByteArray>() - verify(connectionManager).basicPublish(eq("test-exchange"), eq("publish"), anyOrNull(), captor.capture()) + verify(publishConnectionManager).basicPublish(eq("test-exchange"), eq("publish"), anyOrNull(), captor.capture()) val publishedBytes = captor.firstValue assertArrayEquals(batch.toByteArray(), publishedBytes) { "Unexpected batch published: ${MessageGroupBatch.parseFrom(publishedBytes)}" @@ -283,8 +291,8 @@ class TransportGroupBatchRouterTest { TransportGroupBatchRouter().apply { init( DefaultMessageRouterContext( - connectionManager, - connectionManager, + publishConnectionManager, + consumeConnectionManager, mock { }, MessageRouterConfiguration(pins, GlobalNotificationConfiguration()), BOX_CONFIGURATION @@ -313,4 +321,4 @@ class TransportGroupBatchRouterTest { ) ) } -} +} \ No newline at end of file diff --git a/src/test/kotlin/com/exactpro/th2/common/util/RabbitTestContainerUtil.kt b/src/test/kotlin/com/exactpro/th2/common/util/RabbitTestContainerUtil.kt index 8d5e82e01..8c28a3d63 100644 --- a/src/test/kotlin/com/exactpro/th2/common/util/RabbitTestContainerUtil.kt +++ b/src/test/kotlin/com/exactpro/th2/common/util/RabbitTestContainerUtil.kt @@ -1,5 +1,5 @@ /* - * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ package com.exactpro.th2.common.util import kotlin.random.Random import org.testcontainers.containers.Container import org.testcontainers.containers.RabbitMQContainer +import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.RabbitMQConfiguration fun declareQueue(rabbit: RabbitMQContainer, queueName: String): Container.ExecResult? = execCommandWithSplit(rabbit, "rabbitmqadmin declare queue name=$queueName durable=false") @@ -65,6 +66,14 @@ fun restartContainer(rabbit: RabbitMQContainer) { rabbit.start() } +fun getRabbitMQConfiguration(container: RabbitMQContainer) = RabbitMQConfiguration( + host = container.host, + vHost = "", + port = container.amqpPort, + username = container.adminUsername, + password = container.adminPassword, +) + private fun execCommandWithSplit(rabbit: RabbitMQContainer, command: String): Container.ExecResult? = rabbit.execInContainer(*command.split(" ").toTypedArray())