diff --git a/README.md b/README.md
index a29d9b81c..413fbadba 100644
--- a/README.md
+++ b/README.md
@@ -1,4 +1,4 @@
-# th2 common library (Java) (5.13.1)
+# th2 common library (Java) (5.14.0)
 
 ## Usage
 
@@ -513,7 +513,7 @@ dependencies {
 
 ### 5.14.0-dev
 
-+ Separate connections for publisher and consumer
++ Separate connections for publisher and consumer (allows to consume while publishing is blocked by RabbitMQ)
 + Updated cradle `5.4.1-dev`
 + Updated kubernetes-client: `6.13.1`
 
@@ -611,7 +611,7 @@ dependencies {
 
 ### 5.4.1-dev
 #### Fix
-+ `SubscriberMonitor` is returned from `MessageRouter.subscribe` methods is proxy object to manage RabbitMQ subscribtion without internal listener  
++ `SubscriberMonitor` is returned from `MessageRouter.subscribe` methods is proxy object to manage RabbitMQ subscription without internal listener  
 
 ### 5.4.0-dev
 #### Updated
diff --git a/gradle.properties b/gradle.properties
index f8a2b4afb..5c8ad9276 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -17,4 +17,4 @@ release_version=5.14.0
 kotlin_version=1.8.22
 description='th2 common library (Java)'
 vcs_url=https://github.com/th2-net/th2-common-j
-kapt.include.compile.classpath=false
+kapt.include.compile.classpath=true
diff --git a/src/main/java/com/exactpro/th2/common/schema/event/EventBatchSender.java b/src/main/java/com/exactpro/th2/common/schema/event/EventBatchSender.java
index 810e5dc8a..21eff2dfa 100644
--- a/src/main/java/com/exactpro/th2/common/schema/event/EventBatchSender.java
+++ b/src/main/java/com/exactpro/th2/common/schema/event/EventBatchSender.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2020-2021 Exactpro (Exactpro Systems Limited)
+ * Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -24,7 +24,7 @@
 import com.exactpro.th2.common.grpc.EventBatch;
 import com.exactpro.th2.common.message.MessageUtils;
 import com.exactpro.th2.common.schema.message.impl.rabbitmq.AbstractRabbitSender;
-import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager;
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.PublishConnectionManager;
 
 import static com.exactpro.th2.common.metrics.CommonMetrics.TH2_PIN_LABEL;
 import static com.exactpro.th2.common.schema.event.EventBatchRouter.EVENT_TYPE;
@@ -39,13 +39,13 @@ public class EventBatchSender extends AbstractRabbitSender<EventBatch> {
             .register();
 
     public EventBatchSender(
-            @NotNull ConnectionManager connectionManager,
+            @NotNull PublishConnectionManager publishConnectionManager,
             @NotNull String exchangeName,
             @NotNull String routingKey,
             @NotNull String th2Pin,
             @NotNull String bookName
     ) {
-        super(connectionManager, exchangeName, routingKey, th2Pin, EVENT_TYPE, bookName);
+        super(publishConnectionManager, exchangeName, routingKey, th2Pin, EVENT_TYPE, bookName);
     }
 
     @Override
@@ -82,4 +82,4 @@ protected String toShortTraceString(EventBatch value) {
     protected String toShortDebugString(EventBatch value) {
         return "EventBatch: parent_event_id = " + value.getParentEventId().getId();
     }
-}
+}
\ No newline at end of file
diff --git a/src/main/java/com/exactpro/th2/common/schema/event/EventBatchSubscriber.java b/src/main/java/com/exactpro/th2/common/schema/event/EventBatchSubscriber.java
index 78642b061..29d375b2f 100644
--- a/src/main/java/com/exactpro/th2/common/schema/event/EventBatchSubscriber.java
+++ b/src/main/java/com/exactpro/th2/common/schema/event/EventBatchSubscriber.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2020-2023 Exactpro (Exactpro Systems Limited)
+ * Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -21,7 +21,7 @@
 import com.exactpro.th2.common.schema.message.DeliveryMetadata;
 import com.exactpro.th2.common.schema.message.ManualAckDeliveryCallback.Confirmation;
 import com.exactpro.th2.common.schema.message.impl.rabbitmq.AbstractRabbitSubscriber;
-import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager;
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConsumeConnectionManager;
 import com.rabbitmq.client.Delivery;
 import io.prometheus.client.Counter;
 import org.jetbrains.annotations.NotNull;
@@ -41,12 +41,12 @@ public class EventBatchSubscriber extends AbstractRabbitSubscriber<EventBatch> {
             .register();
 
     public EventBatchSubscriber(
-            @NotNull ConnectionManager connectionManager,
+            @NotNull ConsumeConnectionManager consumeConnectionManager,
             @NotNull String queue,
             @NotNull String th2Pin,
             @NotNull ConfirmationListener<EventBatch> listener
     ) {
-        super(connectionManager, queue, th2Pin, EVENT_TYPE, listener);
+        super(consumeConnectionManager, queue, th2Pin, EVENT_TYPE, listener);
     }
 
     @Override
@@ -78,4 +78,4 @@ protected void handle(DeliveryMetadata deliveryMetadata, Delivery delivery, Even
                 .inc(value.getEventsCount());
         super.handle(deliveryMetadata, delivery, value, confirmation);
     }
-}
+}
\ No newline at end of file
diff --git a/src/main/java/com/exactpro/th2/common/schema/factory/AbstractCommonFactory.java b/src/main/java/com/exactpro/th2/common/schema/factory/AbstractCommonFactory.java
index f2595d769..eb36b4374 100644
--- a/src/main/java/com/exactpro/th2/common/schema/factory/AbstractCommonFactory.java
+++ b/src/main/java/com/exactpro/th2/common/schema/factory/AbstractCommonFactory.java
@@ -1,5 +1,6 @@
 /*
  * Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -52,7 +53,8 @@
 import com.exactpro.th2.common.schema.message.impl.monitor.LogMessageRouterMonitor;
 import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.ConnectionManagerConfiguration;
 import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.RabbitMQConfiguration;
-import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager;
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.PublishConnectionManager;
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConsumeConnectionManager;
 import com.exactpro.th2.common.schema.message.impl.rabbitmq.custom.MessageConverter;
 import com.exactpro.th2.common.schema.message.impl.rabbitmq.custom.RabbitCustomRouter;
 import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.GroupBatch;
@@ -137,10 +139,10 @@ public abstract class AbstractCommonFactory implements AutoCloseable {
     private final Class<? extends MessageRouter<EventBatch>> eventBatchRouterClass;
     private final Class<? extends GrpcRouter> grpcRouterClass;
     private final Class<? extends NotificationRouter<EventBatch>> notificationEventBatchRouterClass;
-    private final LazyProvider<ConnectionManager> rabbitMqPublishConnectionManager =
-            lazyAutocloseable("publish-connection-manager", this::createRabbitMQConnectionManager);
-    private final LazyProvider<ConnectionManager> rabbitMqConsumeConnectionManager =
-            lazyAutocloseable("consume-connection-manager", this::createRabbitMQConnectionManager);
+    private final LazyProvider<PublishConnectionManager> rabbitMqPublishConnectionManager =
+            lazyAutocloseable("publish-connection-manager", this::createRabbitMQPublishConnectionManager);
+    private final LazyProvider<ConsumeConnectionManager> rabbitMqConsumeConnectionManager =
+            lazyAutocloseable("consume-connection-manager", this::createRabbitMQConsumeConnectionManager);
     private final LazyProvider<MessageRouterContext> routerContext =
             lazy("router-context", this::createMessageRouterContext);
     private final LazyProvider<MessageRouter<MessageBatch>> messageRouterParsedBatch =
@@ -662,15 +664,19 @@ protected PrometheusConfiguration loadPrometheusConfiguration() {
         return getConfigurationOrLoad(PrometheusConfiguration.class, true);
     }
 
-    protected ConnectionManager createRabbitMQConnectionManager() {
-        return new ConnectionManager(getBoxConfiguration().getBoxName(), getRabbitMqConfiguration(), getConnectionManagerConfiguration());
+    protected PublishConnectionManager createRabbitMQPublishConnectionManager() {
+        return new PublishConnectionManager(getBoxConfiguration().getBoxName(), getRabbitMqConfiguration(), getConnectionManagerConfiguration());
+    }
+
+    protected ConsumeConnectionManager createRabbitMQConsumeConnectionManager() {
+        return new ConsumeConnectionManager(getBoxConfiguration().getBoxName(), getRabbitMqConfiguration(), getConnectionManagerConfiguration());
     }
 
-    protected ConnectionManager getRabbitMqPublishConnectionManager() {
+    protected PublishConnectionManager getRabbitMqPublishConnectionManager() {
         return rabbitMqPublishConnectionManager.get();
     }
 
-    protected ConnectionManager getRabbitMqConsumeConnectionManager() {
+    protected ConsumeConnectionManager getRabbitMqConsumeConnectionManager() {
         return rabbitMqConsumeConnectionManager.get();
     }
 
@@ -756,4 +762,4 @@ protected static void configureLogger(Path... paths) {
         log4jConfigUtils.configure(listPath, LOG4J2_PROPERTIES_NAME);
         ExactproMetaInf.logging();
     }
-}
+}
\ No newline at end of file
diff --git a/src/main/java/com/exactpro/th2/common/schema/message/MessageRouter.java b/src/main/java/com/exactpro/th2/common/schema/message/MessageRouter.java
index db9155034..5099e0a27 100644
--- a/src/main/java/com/exactpro/th2/common/schema/message/MessageRouter.java
+++ b/src/main/java/com/exactpro/th2/common/schema/message/MessageRouter.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2020-2022 Exactpro (Exactpro Systems Limited)
+ * Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -16,32 +16,14 @@
 package com.exactpro.th2.common.schema.message;
 
 import com.exactpro.th2.common.grpc.MessageGroupBatch;
-import com.exactpro.th2.common.schema.box.configuration.BoxConfiguration;
-import com.exactpro.th2.common.schema.message.configuration.MessageRouterConfiguration;
-import com.exactpro.th2.common.schema.message.impl.context.DefaultMessageRouterContext;
-import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager;
 import org.jetbrains.annotations.NotNull;
-
 import java.io.IOException;
-import java.util.Objects;
 
 /**
  * Interface for send and receive RabbitMQ messages
  * @param <T> messages for send and receive
  */
 public interface MessageRouter<T> extends AutoCloseable {
-
-    /**
-     * Initialization message router
-     * @param configuration message router configuration
-     */
-    @Deprecated(since = "3.2.2", forRemoval = true)
-    default void init(@NotNull ConnectionManager connectionManager, @NotNull MessageRouterConfiguration configuration) {
-        Objects.requireNonNull(connectionManager, "Connection owner can not be null");
-        Objects.requireNonNull(configuration, "Configuration cannot be null");
-        init(new DefaultMessageRouterContext(connectionManager, connectionManager, MessageRouterMonitor.DEFAULT_MONITOR, configuration, new BoxConfiguration()));
-    }
-
     default void init(@NotNull MessageRouterContext context, @NotNull MessageRouter<MessageGroupBatch> groupBatchRouter) {
         init(context);
     }
@@ -137,5 +119,4 @@ default void send(T message) throws IOException {
      * @throws IOException if can not send message
      */
     void sendAll(T message, String... queueAttr) throws IOException;
-
-}
+}
\ No newline at end of file
diff --git a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitSender.java b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitSender.java
index dfa769dce..0f945f1c3 100644
--- a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitSender.java
+++ b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitSender.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2020-2021 Exactpro (Exactpro Systems Limited)
+ * Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -25,6 +25,7 @@
 
 import com.exactpro.th2.common.schema.message.MessageSender;
 import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager;
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.PublishConnectionManager;
 
 import static com.exactpro.th2.common.metrics.CommonMetrics.EXCHANGE_LABEL;
 import static com.exactpro.th2.common.metrics.CommonMetrics.ROUTING_KEY_LABEL;
@@ -54,18 +55,18 @@ public abstract class AbstractRabbitSender<T> implements MessageSender<T> {
     protected final String bookName;
     private final AtomicReference<String> routingKey = new AtomicReference<>();
     private final AtomicReference<String> exchangeName = new AtomicReference<>();
-    private final AtomicReference<ConnectionManager> connectionManager = new AtomicReference<>();
+    private final AtomicReference<PublishConnectionManager> publishConnectionManager = new AtomicReference<>();
     private final String th2Type;
 
     public AbstractRabbitSender(
-            @NotNull ConnectionManager connectionManager,
+            @NotNull PublishConnectionManager publishConnectionManager,
             @NotNull String exchangeName,
             @NotNull String routingKey,
             @NotNull String th2Pin,
             @NotNull String th2Type,
             @NotNull String bookName
     ) {
-        this.connectionManager.set(requireNonNull(connectionManager, "Connection can not be null"));
+        this.publishConnectionManager.set(requireNonNull(publishConnectionManager, "Connection manager can not be null"));
         this.exchangeName.set(requireNonNull(exchangeName, "Exchange name can not be null"));
         this.routingKey.set(requireNonNull(routingKey, "Routing key can not be null"));
         this.th2Pin = requireNonNull(th2Pin, "TH2 pin can not be null");
@@ -84,7 +85,7 @@ public void send(T value) throws IOException {
         requireNonNull(value, "Value for send can not be null");
 
         try {
-            ConnectionManager connection = this.connectionManager.get();
+            PublishConnectionManager connectionManager = this.publishConnectionManager.get();
             byte[] bytes = valueToBytes(value);
             MESSAGE_SIZE_PUBLISH_BYTES
                     .labels(th2Pin, th2Type, exchangeName.get(), routingKey.get())
@@ -92,7 +93,7 @@ public void send(T value) throws IOException {
             MESSAGE_PUBLISH_TOTAL
                     .labels(th2Pin, th2Type, exchangeName.get(), routingKey.get())
                     .inc();
-            connection.basicPublish(exchangeName.get(), routingKey.get(), null, bytes);
+            connectionManager.basicPublish(exchangeName.get(), routingKey.get(), null, bytes);
 
             if (LOGGER.isTraceEnabled()) {
                 LOGGER.trace("Message sent to exchangeName='{}', routing key='{}': '{}'",
@@ -111,4 +112,4 @@ public void send(T value) throws IOException {
     protected abstract String toShortDebugString(T value);
 
     protected abstract byte[] valueToBytes(T value);
-}
+}
\ No newline at end of file
diff --git a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitSubscriber.java b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitSubscriber.java
index b9fa6cd7b..23ffbafa2 100644
--- a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitSubscriber.java
+++ b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitSubscriber.java
@@ -1,5 +1,6 @@
 /*
- * Copyright 2020-2023 Exactpro (Exactpro Systems Limited)
+ * Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -21,7 +22,7 @@
 import com.exactpro.th2.common.schema.message.ManualAckDeliveryCallback.Confirmation;
 import com.exactpro.th2.common.schema.message.MessageSubscriber;
 import com.exactpro.th2.common.schema.message.SubscriberMonitor;
-import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager;
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConsumeConnectionManager;
 import com.google.common.base.Suppliers;
 import com.google.common.io.BaseEncoding;
 import com.rabbitmq.client.Delivery;
@@ -69,7 +70,7 @@ public abstract class AbstractRabbitSubscriber<T> implements MessageSubscriber {
     private final boolean manualConfirmation;
     private final ConfirmationListener<T> listener;
     private final String queue;
-    private final ConnectionManager connectionManager;
+    private final ConsumeConnectionManager consumeConnectionManager;
     private final AtomicReference<Supplier<SubscriberMonitor>> consumerMonitor = new AtomicReference<>(emptySupplier());
     private final AtomicBoolean isAlive = new AtomicBoolean(true);
     private final String th2Type;
@@ -78,13 +79,13 @@ public abstract class AbstractRabbitSubscriber<T> implements MessageSubscriber {
     protected final String th2Pin;
 
     public AbstractRabbitSubscriber(
-            @NotNull ConnectionManager connectionManager,
+            @NotNull ConsumeConnectionManager consumeConnectionManager,
             @NotNull String queue,
             @NotNull String th2Pin,
             @NotNull String th2Type,
             @NotNull ConfirmationListener<T> listener
     ) {
-        this.connectionManager = requireNonNull(connectionManager, "Connection can not be null");
+        this.consumeConnectionManager = requireNonNull(consumeConnectionManager, "Connection can not be null");
         this.queue = requireNonNull(queue, "Queue can not be null");
         this.th2Pin = requireNonNull(th2Pin, "th2 pin can not be null");
         this.th2Type = requireNonNull(th2Type, "th2 type can not be null");
@@ -186,7 +187,7 @@ private void resubscribe() {
     private SubscriberMonitor basicConsume() {
         try {
             LOGGER.info("Start listening queue name='{}', th2 pin='{}'", queue, th2Pin);
-            return connectionManager.basicConsume(queue, this::handle, this::canceled);
+            return consumeConnectionManager.basicConsume(queue, this::handle, this::canceled);
         } catch (IOException e) {
             throw new IllegalStateException("Can not subscribe to queue = " + queue, e);
         } catch (InterruptedException e) {
diff --git a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java
index 5292dc0de..99d416225 100644
--- a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java
+++ b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java
@@ -17,16 +17,11 @@
 package com.exactpro.th2.common.schema.message.impl.rabbitmq.connection;
 
 import com.exactpro.th2.common.metrics.HealthMetrics;
-import com.exactpro.th2.common.schema.message.DeliveryMetadata;
-import com.exactpro.th2.common.schema.message.ExclusiveSubscriberMonitor;
 import com.exactpro.th2.common.schema.message.ManualAckDeliveryCallback;
-import com.exactpro.th2.common.schema.message.ManualAckDeliveryCallback.Confirmation;
-import com.exactpro.th2.common.schema.message.impl.OnlyOnceConfirmation;
 import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.ConnectionManagerConfiguration;
 import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.RabbitMQConfiguration;
 import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.RetryingDelay;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.rabbitmq.client.AMQP.BasicProperties;
 import com.rabbitmq.client.BlockedListener;
 import com.rabbitmq.client.CancelCallback;
 import com.rabbitmq.client.Channel;
@@ -34,7 +29,6 @@
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 import com.rabbitmq.client.Consumer;
-import com.rabbitmq.client.Envelope;
 import com.rabbitmq.client.ExceptionHandler;
 import com.rabbitmq.client.Method;
 import com.rabbitmq.client.Recoverable;
@@ -64,7 +58,6 @@
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Objects;
-import java.util.Collections;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
@@ -73,7 +66,6 @@
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Condition;
@@ -84,23 +76,22 @@
 import java.util.function.BiConsumer;
 import java.util.function.Supplier;
 
-public class ConnectionManager implements AutoCloseable {
+public abstract class ConnectionManager implements AutoCloseable {
     public static final String EMPTY_ROUTING_KEY = "";
     private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionManager.class);
 
     private final Connection connection;
-    private final Map<PinId, ChannelHolder> channelsByPin = new ConcurrentHashMap<>();
+    protected final Map<PinId, ChannelHolder> channelsByPin = new ConcurrentHashMap<>();
     private final AtomicReference<State> connectionState = new AtomicReference<>(State.OPEN);
-    private final AtomicBoolean isPublishingBlocked = new AtomicBoolean(false);
     private final ConnectionManagerConfiguration configuration;
-    private final String subscriberName;
-    private final AtomicInteger nextSubscriberId = new AtomicInteger(1);
+    protected final String subscriberName;
+    protected final AtomicInteger nextSubscriberId = new AtomicInteger(1);
     private final ExecutorService sharedExecutor;
-    private final ScheduledExecutorService channelChecker = Executors.newSingleThreadScheduledExecutor(
+    protected final ScheduledExecutorService channelChecker = Executors.newSingleThreadScheduledExecutor(
             new ThreadFactoryBuilder().setNameFormat("channel-checker-%d").build()
     );
 
-    private final HealthMetrics metrics = new HealthMetrics(this);
+    protected final HealthMetrics metrics = new HealthMetrics(this);
 
     private final RecoveryListener recoveryListener = new RecoveryListener() {
         @Override
@@ -293,29 +284,7 @@ private void addShutdownListenerToChannel(Channel channel, Boolean withRecovery)
                 .orElse(null);
     }
 
-    private void recoverSubscriptionsOfChannel(@NotNull final PinId pinId, Channel channel, @NotNull final ChannelHolder holder) {
-        channelChecker.execute(() -> holder.withLock(() -> {
-            try {
-                var subscriptionCallbacks = holder.getCallbacksForRecovery(channel);
-
-                if (subscriptionCallbacks != null) {
-
-                    LOGGER.info("Changing channel for holder with pin id: {}", pinId);
-
-                    var removedHolder = channelsByPin.remove(pinId);
-                    if (removedHolder != holder) throw new IllegalStateException("Channel holder has been replaced");
-
-                    basicConsume(pinId.queue, subscriptionCallbacks.deliverCallback, subscriptionCallbacks.cancelCallback);
-                }
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                LOGGER.info("Recovering channel's subscriptions interrupted", e);
-            } catch (Throwable e) {
-                // this code executed in executor service and exception thrown here will not be handled anywhere
-                LOGGER.error("Failed to recovery channel's subscriptions", e);
-            }
-        }));
-    }
+    protected abstract void recoverSubscriptionsOfChannel(@NotNull final PinId pinId, Channel channel, @NotNull final ChannelHolder holder);
 
     private void addShutdownListenerToConnection(Connection conn) {
         conn.addShutdownListener(cause -> {
@@ -347,20 +316,10 @@ private void addRecoveryListenerToConnection(Connection conn) {
         }
     }
 
-    private void addBlockedListenersToConnection(Connection conn) {
-        conn.addBlockedListener(new BlockedListener() {
-            @Override
-            public void handleBlocked(String reason) {
-                isPublishingBlocked.set(true);
-                LOGGER.warn("RabbitMQ blocked connection: {}", reason);
-            }
+    protected abstract BlockedListener getBlockedListener();
 
-            @Override
-            public void handleUnblocked() {
-                isPublishingBlocked.set(false);
-                LOGGER.warn("RabbitMQ unblocked connection");
-            }
-        });
+    private void addBlockedListenersToConnection(Connection conn) {
+        conn.addBlockedListener(getBlockedListener());
     }
 
     public boolean isOpen() {
@@ -417,92 +376,6 @@ public void close() {
         shutdownExecutor(channelChecker, closeTimeout, "channel-checker");
     }
 
-    public void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException, InterruptedException {
-        ChannelHolder holder = getOrCreateChannelFor(PinId.forRoutingKey(exchange, routingKey));
-        holder.retryingPublishWithLock(configuration, body,
-                (channel, payload) -> channel.basicPublish(exchange, routingKey, props, payload));
-    }
-
-    public String queueDeclare() throws IOException {
-        ChannelHolder holder = new ChannelHolder(this::createChannel, this::waitForConnectionRecovery, configurationToOptions());
-        return holder.mapWithLock(channel -> {
-            String queue = channel.queueDeclare(
-                    "", // queue name
-                    false, // durable
-                    true, // exclusive
-                    false, // autoDelete
-                    Collections.emptyMap()).getQueue();
-            LOGGER.info("Declared exclusive '{}' queue", queue);
-            putChannelFor(PinId.forQueue(queue), holder);
-            return queue;
-        });
-    }
-
-    public ExclusiveSubscriberMonitor basicConsume(String queue, ManualAckDeliveryCallback deliverCallback, CancelCallback cancelCallback) throws IOException, InterruptedException {
-        PinId pinId = PinId.forQueue(queue);
-        ChannelHolder holder = getOrCreateChannelFor(pinId, new SubscriptionCallbacks(deliverCallback, cancelCallback));
-        String tag = holder.retryingConsumeWithLock(channel ->
-                channel.basicConsume(queue, false, subscriberName + "_" + nextSubscriberId.getAndIncrement(), (tagTmp, delivery) -> {
-                    try {
-                        Envelope envelope = delivery.getEnvelope();
-                        long deliveryTag = envelope.getDeliveryTag();
-                        String routingKey = envelope.getRoutingKey();
-                        LOGGER.trace("Received delivery {} from queue={} routing_key={}", deliveryTag, queue, routingKey);
-
-                        Confirmation wrappedConfirmation = new Confirmation() {
-                            @Override
-                            public void reject() throws IOException {
-                                holder.withLock(ch -> {
-                                    try {
-                                        basicReject(ch, deliveryTag);
-                                    } catch (IOException | ShutdownSignalException e) {
-                                        LOGGER.warn("Error during basicReject of message with deliveryTag = {} inside channel #{}: {}", deliveryTag, ch.getChannelNumber(), e);
-                                        throw e;
-                                    } finally {
-                                        holder.release(() -> metrics.getReadinessMonitor().enable());
-                                    }
-                                });
-                            }
-
-                            @Override
-                            public void confirm() throws IOException {
-                                holder.withLock(ch -> {
-                                    try {
-                                        basicAck(ch, deliveryTag);
-                                    } catch (IOException | ShutdownSignalException e) {
-                                        LOGGER.warn("Error during basicAck of message with deliveryTag = {} inside channel #{}: {}", deliveryTag, ch.getChannelNumber(), e);
-                                        throw e;
-                                    } finally {
-                                        holder.release(() -> metrics.getReadinessMonitor().enable());
-                                    }
-                                });
-                            }
-                        };
-
-                        Confirmation confirmation = OnlyOnceConfirmation.wrap("from " + routingKey + " to " + queue, wrappedConfirmation);
-
-                        holder.withLock(() -> holder.acquireAndSubmitCheck(() ->
-                                channelChecker.schedule(() -> {
-                                    holder.withLock(() -> {
-                                        LOGGER.warn("The confirmation for delivery {} in queue={} routing_key={} was not invoked within the specified delay",
-                                                deliveryTag, queue, routingKey);
-                                        if (holder.reachedPendingLimit()) {
-                                            metrics.getReadinessMonitor().disable();
-                                        }
-                                    });
-                                    return false; // to cast to Callable
-                                }, configuration.getConfirmationTimeout().toMillis(), TimeUnit.MILLISECONDS)
-                        ));
-                        boolean redeliver = envelope.isRedeliver();
-                        deliverCallback.handle(new DeliveryMetadata(tagTmp, redeliver), delivery, confirmation);
-                    } catch (IOException | RuntimeException e) {
-                        LOGGER.error("Cannot handle delivery for tag {}: {}", tagTmp, e.getMessage(), e);
-                    }
-                }, cancelCallback), configuration);
-
-        return new RabbitMqSubscriberMonitor(holder, queue, tag, this::basicCancel);
-    }
-
     boolean isReady() {
         return metrics.getReadinessMonitor().isEnabled();
     }
@@ -511,11 +384,7 @@ boolean isAlive() {
         return metrics.getLivenessMonitor().isEnabled();
     }
 
-    boolean isPublishingBlocked() {
-        return isPublishingBlocked.get();
-    }
-
-    private ChannelHolderOptions configurationToOptions() {
+    protected ChannelHolderOptions configurationToOptions() {
         return new ChannelHolderOptions(
                 configuration.getPrefetchCount(),
                 configuration.getEnablePublisherConfirmation(),
@@ -523,19 +392,6 @@ private ChannelHolderOptions configurationToOptions() {
         );
     }
 
-    private void basicCancel(Channel channel, String consumerTag) throws IOException {
-        channel.basicCancel(consumerTag);
-    }
-
-    public String queueExclusiveDeclareAndBind(String exchange) throws IOException, TimeoutException {
-        try (Channel channel = createChannel()) {
-            String queue = channel.queueDeclare().getQueue();
-            channel.queueBind(queue, exchange, EMPTY_ROUTING_KEY);
-            LOGGER.info("Declared the '{}' queue to listen to the '{}'", queue, exchange);
-            return queue;
-        }
-    }
-
     private void shutdownExecutor(ExecutorService executor, int closeTimeout, String name) {
         executor.shutdown();
         try {
@@ -549,9 +405,9 @@ private void shutdownExecutor(ExecutorService executor, int closeTimeout, String
         }
     }
 
-    private static final class SubscriptionCallbacks {
-        private final ManualAckDeliveryCallback deliverCallback;
-        private final CancelCallback cancelCallback;
+    protected static final class SubscriptionCallbacks {
+        protected final ManualAckDeliveryCallback deliverCallback;
+        protected final CancelCallback cancelCallback;
 
         public SubscriptionCallbacks(ManualAckDeliveryCallback deliverCallback, CancelCallback cancelCallback) {
             this.deliverCallback = deliverCallback;
@@ -559,28 +415,28 @@ public SubscriptionCallbacks(ManualAckDeliveryCallback deliverCallback, CancelCa
         }
     }
 
-    private ChannelHolder getOrCreateChannelFor(PinId pinId) {
+   protected ChannelHolder getOrCreateChannelFor(PinId pinId) {
         return channelsByPin.computeIfAbsent(pinId, ignore -> {
             LOGGER.trace("Creating channel holder for {}", pinId);
             return new ChannelHolder(this::createChannel, this::waitForConnectionRecovery, configurationToOptions());
         });
     }
 
-    private ChannelHolder getOrCreateChannelFor(PinId pinId, SubscriptionCallbacks subscriptionCallbacks) {
+    protected ChannelHolder getOrCreateChannelFor(PinId pinId, SubscriptionCallbacks subscriptionCallbacks) {
         return channelsByPin.computeIfAbsent(pinId, ignore -> {
             LOGGER.trace("Creating channel holder with callbacks for {}", pinId);
             return new ChannelHolder(() -> createChannelWithOptionalRecovery(true), this::waitForConnectionRecovery, configurationToOptions(), subscriptionCallbacks);
         });
     }
 
-    private void putChannelFor(PinId pinId, ChannelHolder holder) {
+    protected void putChannelFor(PinId pinId, ChannelHolder holder) {
         ChannelHolder previous = channelsByPin.putIfAbsent(pinId, holder);
         if (previous != null) {
             throw new IllegalStateException("Channel holder for the '" + pinId + "' pinId has been already registered");
         }
     }
 
-    private Channel createChannel() {
+    protected Channel createChannel() {
         return createChannelWithOptionalRecovery(false);
     }
 
@@ -605,7 +461,7 @@ private void waitForConnectionRecovery(ShutdownNotifier notifier) {
         waitForConnectionRecovery(notifier, true);
     }
 
-    private void waitForConnectionRecovery(ShutdownNotifier notifier, boolean waitForRecovery) {
+    protected void waitForConnectionRecovery(ShutdownNotifier notifier, boolean waitForRecovery) {
         if (isConnectionRecovery(notifier)) {
             if (waitForRecovery) {
                 waitForRecovery(notifier);
@@ -637,50 +493,14 @@ private boolean isConnectionRecovery(ShutdownNotifier notifier) {
                 && connectionState.get() != State.CLOSED;
     }
 
-    /**
-     * @param channel pass channel witch used for basicConsume, because delivery tags are scoped per channel,
-     *                deliveries must be acknowledged on the same channel they were received on.
-     */
-    private static void basicAck(Channel channel, long deliveryTag) throws IOException {
-        channel.basicAck(deliveryTag, false);
-    }
-
-    private static void basicReject(Channel channel, long deliveryTag) throws IOException {
-        channel.basicReject(deliveryTag, false);
-    }
-
-    private static class RabbitMqSubscriberMonitor implements ExclusiveSubscriberMonitor {
-        private final ChannelHolder holder;
-        private final String queue;
-        private final String tag;
-        private final CancelAction action;
-
-        public RabbitMqSubscriberMonitor(ChannelHolder holder, String queue, String tag, CancelAction action) {
-            this.holder = holder;
-            this.queue = queue;
-            this.tag = tag;
-            this.action = action;
-        }
-
-        @Override
-        public @NotNull String getQueue() {
-            return queue;
-        }
-
-        @Override
-        public void unsubscribe() throws IOException {
-            holder.unsubscribeWithLock(tag, action);
-        }
-    }
-
-    private interface CancelAction {
+    protected interface CancelAction {
         void execute(Channel channel, String tag) throws IOException;
     }
 
-    private static class PinId {
+    protected static class PinId {
         private final String exchange;
         private final String routingKey;
-        private final String queue;
+        protected final String queue;
 
         public static PinId forRoutingKey(String exchange, String routingKey) {
             return new PinId(exchange, routingKey, null);
@@ -731,7 +551,7 @@ public String toString() {
         }
     }
 
-    private static class ChannelHolderOptions {
+    protected static class ChannelHolderOptions {
         private final int maxCount;
         private final boolean enablePublisherConfirmation;
         private final int maxInflightRequestsBytes;
@@ -755,8 +575,8 @@ public int getMaxInflightRequestsBytes() {
         }
     }
 
-    private static class ChannelHolder {
-        private final Lock lock = new ReentrantLock();
+    protected static class ChannelHolder {
+        protected final Lock lock = new ReentrantLock();
         private final Supplier<Channel> supplier;
         private final BiConsumer<ShutdownNotifier, Boolean> reconnectionChecker;
         private final ChannelHolderOptions options;
@@ -1346,15 +1166,15 @@ public boolean awaitConfirmations(long timeout, TimeUnit timeUnit) throws Interr
         }
     }
 
-    private interface ChannelMapper<T> {
+    protected interface ChannelMapper<T> {
         T map(Channel channel) throws IOException;
     }
 
-    private interface ChannelConsumer {
+    protected interface ChannelConsumer {
         void consume(Channel channel) throws IOException;
     }
 
-    private interface ChannelPublisher {
+    protected interface ChannelPublisher {
         void publish(Channel channel, byte[] payload) throws IOException;
     }
 }
\ No newline at end of file
diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/MessageRouterContext.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/MessageRouterContext.kt
index 67f8e68b0..e92285153 100644
--- a/src/main/kotlin/com/exactpro/th2/common/schema/message/MessageRouterContext.kt
+++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/MessageRouterContext.kt
@@ -1,5 +1,6 @@
 /*
- * Copyright 2020-2021 Exactpro (Exactpro Systems Limited)
+ * Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -17,11 +18,12 @@ package com.exactpro.th2.common.schema.message
 
 import com.exactpro.th2.common.schema.box.configuration.BoxConfiguration
 import com.exactpro.th2.common.schema.message.configuration.MessageRouterConfiguration
-import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.PublishConnectionManager
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConsumeConnectionManager
 
 interface MessageRouterContext {
-    val publishConnectionManager: ConnectionManager
-    val consumeConnectionManager: ConnectionManager
+    val publishConnectionManager: PublishConnectionManager
+    val consumeConnectionManager: ConsumeConnectionManager
     val routerMonitor: MessageRouterMonitor
     val configuration: MessageRouterConfiguration
     val boxConfiguration: BoxConfiguration
diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/context/DefaultMessageRouterContext.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/context/DefaultMessageRouterContext.kt
index dc503d422..f94ea7ff2 100644
--- a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/context/DefaultMessageRouterContext.kt
+++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/context/DefaultMessageRouterContext.kt
@@ -1,5 +1,6 @@
 /*
- * Copyright 2020-2021 Exactpro (Exactpro Systems Limited)
+ * Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -19,11 +20,12 @@ import com.exactpro.th2.common.schema.box.configuration.BoxConfiguration
 import com.exactpro.th2.common.schema.message.MessageRouterContext
 import com.exactpro.th2.common.schema.message.MessageRouterMonitor
 import com.exactpro.th2.common.schema.message.configuration.MessageRouterConfiguration
-import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.PublishConnectionManager
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConsumeConnectionManager
 
 class DefaultMessageRouterContext(
-    override val publishConnectionManager: ConnectionManager,
-    override val consumeConnectionManager: ConnectionManager,
+    override val publishConnectionManager: PublishConnectionManager,
+    override val consumeConnectionManager: ConsumeConnectionManager,
     override val routerMonitor: MessageRouterMonitor,
     override val configuration: MessageRouterConfiguration,
     override val boxConfiguration: BoxConfiguration
diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitRouter.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitRouter.kt
index 91bc224ea..beeb7d58e 100644
--- a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitRouter.kt
+++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitRouter.kt
@@ -1,5 +1,6 @@
 /*
  * Copyright 2021-2024 Exactpro (Exactpro Systems Limited)
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -12,6 +13,7 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
+
 package com.exactpro.th2.common.schema.message.impl.rabbitmq
 
 import com.exactpro.th2.common.schema.box.configuration.BoxConfiguration
@@ -21,7 +23,8 @@ import com.exactpro.th2.common.schema.message.QueueAttribute.PUBLISH
 import com.exactpro.th2.common.schema.message.QueueAttribute.SUBSCRIBE
 import com.exactpro.th2.common.schema.message.configuration.MessageRouterConfiguration
 import com.exactpro.th2.common.schema.message.configuration.QueueConfiguration
-import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.PublishConnectionManager
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConsumeConnectionManager
 import mu.KotlinLogging
 import java.util.concurrent.ConcurrentHashMap
 import java.util.concurrent.atomic.AtomicReference
@@ -43,10 +46,10 @@ abstract class AbstractRabbitRouter<T> : MessageRouter<T> {
     private val configuration: MessageRouterConfiguration
         get() = context.configuration
 
-    protected val publishConnectionManager: ConnectionManager
+    protected val publishConnectionManager: PublishConnectionManager
         get() = context.publishConnectionManager
 
-    protected val consumeConnectionManager: ConnectionManager
+    protected val consumeConnectionManager: ConsumeConnectionManager
         get() = context.consumeConnectionManager
 
     private val boxConfiguration: BoxConfiguration
@@ -327,4 +330,4 @@ abstract class AbstractRabbitRouter<T> : MessageRouter<T> {
         private val REQUIRED_SEND_ATTRIBUTES = setOf(PUBLISH.toString())
         private val REQUIRED_SUBSCRIBE_ATTRIBUTES = setOf(SUBSCRIBE.toString())
     }
-}
+}
\ No newline at end of file
diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConsumeConnectionManager.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConsumeConnectionManager.kt
new file mode 100644
index 000000000..00683ed31
--- /dev/null
+++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConsumeConnectionManager.kt
@@ -0,0 +1,217 @@
+/*
+ * Copyright 2024 Exactpro (Exactpro Systems Limited)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.exactpro.th2.common.schema.message.impl.rabbitmq.connection
+
+import com.exactpro.th2.common.schema.message.DeliveryMetadata
+import com.exactpro.th2.common.schema.message.ExclusiveSubscriberMonitor
+import com.exactpro.th2.common.schema.message.ManualAckDeliveryCallback
+import com.exactpro.th2.common.schema.message.ManualAckDeliveryCallback.Confirmation
+import com.exactpro.th2.common.schema.message.impl.OnlyOnceConfirmation.Companion.wrap
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.ConnectionManagerConfiguration
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.RabbitMQConfiguration
+import com.rabbitmq.client.Channel
+import com.rabbitmq.client.Delivery
+import com.rabbitmq.client.CancelCallback
+import com.rabbitmq.client.BlockedListener
+import com.rabbitmq.client.ShutdownNotifier
+import com.rabbitmq.client.ShutdownSignalException
+
+import mu.KotlinLogging
+import java.io.IOException
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.TimeoutException
+
+class ConsumeConnectionManager(
+    connectionName: String,
+    rabbitMQConfiguration: RabbitMQConfiguration,
+    connectionManagerConfiguration: ConnectionManagerConfiguration
+) : ConnectionManager(connectionName, rabbitMQConfiguration, connectionManagerConfiguration) {
+    override fun getBlockedListener(): BlockedListener = object : BlockedListener {
+        override fun handleBlocked(reason: String) {
+            LOGGER.info { "RabbitMQ blocked consume connection: $reason" }
+        }
+
+        override fun handleUnblocked() {
+            LOGGER.info("RabbitMQ unblocked consume connection")
+        }
+    }
+
+    @Throws(IOException::class)
+    fun queueDeclare(): String {
+        val holder = ChannelHolder({ this.createChannel() },
+            { notifier: ShutdownNotifier, waitForRecovery: Boolean ->
+                this.waitForConnectionRecovery(
+                    notifier,
+                    waitForRecovery
+                )
+            }, configurationToOptions()
+        )
+        return holder.mapWithLock { channel: Channel ->
+            val queue = channel.queueDeclare(
+                "",  // queue name
+                false,  // durable
+                true,  // exclusive
+                false,  // autoDelete
+                emptyMap()
+            ).queue
+            LOGGER.info { "Declared exclusive '$queue' queue" }
+            putChannelFor(PinId.forQueue(queue), holder)
+            queue
+        }
+    }
+
+    @Throws(IOException::class, TimeoutException::class)
+    fun queueExclusiveDeclareAndBind(exchange: String): String {
+        createChannel().use { channel ->
+            val queue = channel.queueDeclare().queue
+            channel.queueBind(queue, exchange, EMPTY_ROUTING_KEY)
+            LOGGER.info { "Declared the '$queue' queue to listen to the '$exchange'" }
+            return queue
+        }
+    }
+
+    @Throws(IOException::class, InterruptedException::class)
+    fun basicConsume(
+        queue: String,
+        deliverCallback: ManualAckDeliveryCallback,
+        cancelCallback: CancelCallback
+    ): ExclusiveSubscriberMonitor {
+        val pinId = PinId.forQueue(queue)
+        val holder = getOrCreateChannelFor(pinId, SubscriptionCallbacks(deliverCallback, cancelCallback))
+        val tag = holder.retryingConsumeWithLock({ channel: Channel ->
+            channel.basicConsume(
+                queue, false, subscriberName + "_" + nextSubscriberId.getAndIncrement(),
+                { tagTmp: String, delivery: Delivery ->
+                    try {
+                        val envelope = delivery.envelope
+                        val deliveryTag = envelope.deliveryTag
+                        val routingKey = envelope.routingKey
+                        LOGGER.trace { "Received delivery $deliveryTag from queue=$queue routing_key=$routingKey" }
+
+                        val wrappedConfirmation: Confirmation = object : Confirmation {
+                            @Throws(IOException::class)
+                            override fun reject() {
+                                holder.withLock { ch: Channel ->
+                                    try {
+                                        channel.basicReject(deliveryTag, false)
+                                    } catch (e: IOException) {
+                                        LOGGER.warn { "Error during basicReject of message with deliveryTag = $deliveryTag inside channel #${ch.channelNumber}: $e" }
+                                        throw e
+                                    } catch (e: ShutdownSignalException) {
+                                        LOGGER.warn { "Error during basicReject of message with deliveryTag = $deliveryTag inside channel #${ch.channelNumber}: $e" }
+                                        throw e
+                                    } finally {
+                                        holder.release { metrics.readinessMonitor.enable() }
+                                    }
+                                }
+                            }
+
+                            @Throws(IOException::class)
+                            override fun confirm() {
+                                holder.withLock { ch: Channel ->
+                                    try {
+                                        // because delivery tags are scoped per channel,
+                                        // deliveries must be acknowledged on the same channel they were received on.
+                                        ch.basicAck(deliveryTag, false)
+                                    } catch (e: Exception) {
+                                        LOGGER.warn { "Error during basicAck of message with deliveryTag = $deliveryTag inside channel #${ch.channelNumber}: $e" }
+                                        throw e
+                                    } finally {
+                                        holder.release { metrics.readinessMonitor.enable() }
+                                    }
+                                }
+                            }
+                        }
+
+                        val confirmation = wrap("from $routingKey to $queue", wrappedConfirmation)
+
+                        holder.withLock(Runnable {
+                            holder.acquireAndSubmitCheck {
+                                channelChecker.schedule<Boolean>(
+                                    {
+                                        holder.withLock(Runnable {
+                                            LOGGER.warn { "The confirmation for delivery $deliveryTag in queue=$queue routing_key=$routingKey was not invoked within the specified delay" }
+                                            if (holder.reachedPendingLimit()) {
+                                                metrics.readinessMonitor.disable()
+                                            }
+                                        })
+                                        false // to cast to Callable
+                                    },
+                                    configuration.confirmationTimeout.toMillis(),
+                                    TimeUnit.MILLISECONDS
+                                )
+                            }
+                        })
+                        val redeliver = envelope.isRedeliver
+                        deliverCallback.handle(DeliveryMetadata(tagTmp, redeliver), delivery, confirmation)
+                    } catch (e: Exception) {
+                        LOGGER.error("Cannot handle delivery for tag $tagTmp: ${e.message}", e)
+                    } catch (e: RuntimeException) {
+                        LOGGER.error("Cannot handle delivery for tag $tagTmp: ${e.message}", e)
+                    }
+                }, cancelCallback
+            )
+        }, configuration)
+
+        return RabbitMqSubscriberMonitor(holder, queue, tag) { channel, consumerTag -> channel.basicCancel(consumerTag) }
+    }
+
+    override fun recoverSubscriptionsOfChannel(pinId: PinId, channel: Channel, holder: ChannelHolder) {
+        channelChecker.execute {
+            holder.withLock(Runnable {
+                try {
+                    val subscriptionCallbacks = holder.getCallbacksForRecovery(channel)
+
+                    if (subscriptionCallbacks != null) {
+                        LOGGER.info { "Changing channel for holder with pin id: $pinId" }
+
+                        val removedHolder = channelsByPin.remove(pinId)
+                        check(removedHolder === holder) { "Channel holder has been replaced" }
+
+                        basicConsume(
+                            pinId.queue,
+                            subscriptionCallbacks.deliverCallback,
+                            subscriptionCallbacks.cancelCallback
+                        )
+                    }
+                } catch (e: InterruptedException) {
+                    Thread.currentThread().interrupt()
+                    LOGGER.info("Recovering channel's subscriptions interrupted", e)
+                } catch (e: Throwable) {
+                    // this code executed in executor service and exception thrown here will not be handled anywhere
+                    LOGGER.error("Failed to recovery channel's subscriptions", e)
+                }
+            })
+        }
+    }
+
+    private class RabbitMqSubscriberMonitor(
+        private val holder: ChannelHolder,
+        override val queue: String,
+        private val tag: String,
+        private val action: CancelAction
+    ) : ExclusiveSubscriberMonitor {
+        @Throws(IOException::class)
+        override fun unsubscribe() {
+            holder.unsubscribeWithLock(tag, action)
+        }
+    }
+
+    companion object {
+        private val LOGGER = KotlinLogging.logger {}
+    }
+}
\ No newline at end of file
diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/PublishConnectionManager.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/PublishConnectionManager.kt
new file mode 100644
index 000000000..0cfb15cfb
--- /dev/null
+++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/PublishConnectionManager.kt
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2024 Exactpro (Exactpro Systems Limited)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.exactpro.th2.common.schema.message.impl.rabbitmq.connection
+
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.ConnectionManagerConfiguration
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.RabbitMQConfiguration
+import com.rabbitmq.client.AMQP
+import com.rabbitmq.client.BlockedListener
+import com.rabbitmq.client.Channel
+import mu.KotlinLogging
+import java.io.IOException
+
+class PublishConnectionManager(
+    connectionName: String,
+    rabbitMQConfiguration: RabbitMQConfiguration,
+    connectionManagerConfiguration: ConnectionManagerConfiguration
+) : ConnectionManager(connectionName, rabbitMQConfiguration, connectionManagerConfiguration) {
+    @Volatile var isPublishingBlocked = false
+
+    @Throws(IOException::class, InterruptedException::class)
+    fun basicPublish(exchange: String, routingKey: String, props: AMQP.BasicProperties?, body: ByteArray) {
+        val holder = getOrCreateChannelFor(PinId.forRoutingKey(exchange, routingKey))
+        holder.retryingPublishWithLock(configuration, body) { channel: Channel, payload: ByteArray ->
+            channel.basicPublish(exchange, routingKey, props, payload)
+        }
+    }
+
+    override fun getBlockedListener(): BlockedListener = object : BlockedListener {
+        override fun handleBlocked(reason: String) {
+            isPublishingBlocked = true
+            LOGGER.info { "RabbitMQ blocked publish connection: $reason" }
+        }
+
+        override fun handleUnblocked() {
+            isPublishingBlocked = false
+            LOGGER.info("RabbitMQ unblocked publish connection")
+        }
+    }
+
+    override fun recoverSubscriptionsOfChannel(pinId: PinId, channel: Channel, holder: ChannelHolder) {}
+
+    companion object {
+        private val LOGGER = KotlinLogging.logger {}
+    }
+}
\ No newline at end of file
diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/custom/RabbitCustomRouter.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/custom/RabbitCustomRouter.kt
index 464e3c078..ae3693051 100644
--- a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/custom/RabbitCustomRouter.kt
+++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/custom/RabbitCustomRouter.kt
@@ -1,5 +1,6 @@
 /*
  * Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -25,7 +26,8 @@ import com.exactpro.th2.common.schema.message.impl.rabbitmq.AbstractRabbitSubscr
 import com.exactpro.th2.common.schema.message.impl.rabbitmq.BookName
 import com.exactpro.th2.common.schema.message.impl.rabbitmq.PinConfiguration
 import com.exactpro.th2.common.schema.message.impl.rabbitmq.PinName
-import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.PublishConnectionManager
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConsumeConnectionManager
 
 /**
  * NOTE: labels are used for back compatibility and may be deleted later
@@ -91,14 +93,14 @@ class RabbitCustomRouter<T : Any>(
     }
 
     private class Sender<T : Any>(
-        connectionManager: ConnectionManager,
+        publishConnectionManager: PublishConnectionManager,
         exchangeName: String,
         routingKey: String,
         th2Pin: String,
         bookName: BookName,
         customTag: String,
         private val converter: MessageConverter<T>
-    ) : AbstractRabbitSender<T>(connectionManager, exchangeName, routingKey, th2Pin, customTag, bookName) {
+    ) : AbstractRabbitSender<T>(publishConnectionManager, exchangeName, routingKey, th2Pin, customTag, bookName) {
         override fun valueToBytes(value: T): ByteArray = converter.toByteArray(value)
 
         override fun toShortTraceString(value: T): String = converter.toTraceString(value)
@@ -107,13 +109,13 @@ class RabbitCustomRouter<T : Any>(
     }
 
     private class Subscriber<T : Any>(
-        connectionManager: ConnectionManager,
+        consumeConnectionManager: ConsumeConnectionManager,
         queue: String,
         th2Pin: String,
         customTag: String,
         private val converter: MessageConverter<T>,
         messageListener: ConfirmationListener<T>
-    ) : AbstractRabbitSubscriber<T>(connectionManager, queue, th2Pin, customTag, messageListener) {
+    ) : AbstractRabbitSubscriber<T>(consumeConnectionManager, queue, th2Pin, customTag, messageListener) {
         override fun valueFromBytes(body: ByteArray): T = converter.fromByteArray(body)
 
         override fun toShortTraceString(value: T): String = converter.toTraceString(value)
diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/RabbitMessageGroupBatchSender.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/RabbitMessageGroupBatchSender.kt
index 8bd12019c..752fee745 100644
--- a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/RabbitMessageGroupBatchSender.kt
+++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/RabbitMessageGroupBatchSender.kt
@@ -1,5 +1,5 @@
 /*
- * Copyright 2020-2021 Exactpro (Exactpro Systems Limited)
+ * Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -27,20 +27,20 @@ import com.exactpro.th2.common.metrics.TH2_PIN_LABEL
 import com.exactpro.th2.common.metrics.incrementTotalMetrics
 import com.exactpro.th2.common.schema.message.impl.rabbitmq.AbstractRabbitSender
 import com.exactpro.th2.common.schema.message.impl.rabbitmq.BookName
-import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.PublishConnectionManager
 import com.exactpro.th2.common.schema.message.impl.rabbitmq.group.RabbitMessageGroupBatchRouter.Companion.MESSAGE_GROUP_TYPE
 import com.exactpro.th2.common.schema.message.toShortDebugString
 import io.prometheus.client.Counter
 import io.prometheus.client.Gauge
 
 class RabbitMessageGroupBatchSender(
-    connectionManager: ConnectionManager,
+    publishConnectionManager: PublishConnectionManager,
     exchangeName: String,
     routingKey: String,
     th2Pin: String,
     bookName: BookName
 ) : AbstractRabbitSender<MessageGroupBatch>(
-    connectionManager,
+    publishConnectionManager,
     exchangeName,
     routingKey,
     th2Pin,
diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/RabbitMessageGroupBatchSubscriber.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/RabbitMessageGroupBatchSubscriber.kt
index 194233f01..3a3aeea35 100644
--- a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/RabbitMessageGroupBatchSubscriber.kt
+++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/RabbitMessageGroupBatchSubscriber.kt
@@ -1,5 +1,6 @@
 /*
- * Copyright 2020-2023 Exactpro (Exactpro Systems Limited)
+ * Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -29,7 +30,7 @@ import com.exactpro.th2.common.schema.message.FilterFunction
 import com.exactpro.th2.common.schema.message.ManualAckDeliveryCallback
 import com.exactpro.th2.common.schema.message.configuration.RouterFilter
 import com.exactpro.th2.common.schema.message.impl.rabbitmq.AbstractRabbitSubscriber
-import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConsumeConnectionManager
 import com.exactpro.th2.common.schema.message.impl.rabbitmq.group.RabbitMessageGroupBatchRouter.Companion.MESSAGE_GROUP_TYPE
 import com.exactpro.th2.common.schema.message.toBuilderWithMetadata
 import com.exactpro.th2.common.schema.message.toShortDebugString
@@ -41,14 +42,14 @@ import io.prometheus.client.Gauge
 import mu.KotlinLogging
 
 class RabbitMessageGroupBatchSubscriber(
-    connectionManager: ConnectionManager,
+    consumeConnectionManager: ConsumeConnectionManager,
     queue: String,
     private val filterFunction: FilterFunction,
     th2Pin: String,
     private val filters: List<RouterFilter>,
     private val messageRecursionLimit: Int,
     messageListener: ConfirmationListener<MessageGroupBatch>
-) : AbstractRabbitSubscriber<MessageGroupBatch>(connectionManager, queue, th2Pin, MESSAGE_GROUP_TYPE, messageListener) {
+) : AbstractRabbitSubscriber<MessageGroupBatch>(consumeConnectionManager, queue, th2Pin, MESSAGE_GROUP_TYPE, messageListener) {
     private val logger = KotlinLogging.logger {}
 
     override fun valueFromBytes(body: ByteArray): MessageGroupBatch = parseEncodedBatch(body)
diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/notification/NotificationEventBatchSender.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/notification/NotificationEventBatchSender.kt
index a81fe662a..9e6075723 100644
--- a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/notification/NotificationEventBatchSender.kt
+++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/notification/NotificationEventBatchSender.kt
@@ -1,5 +1,5 @@
 /*
- * Copyright 2021-2021 Exactpro (Exactpro Systems Limited)
+ * Copyright 2021-2024 Exactpro (Exactpro Systems Limited)
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -20,10 +20,11 @@ import com.exactpro.th2.common.grpc.EventBatch
 import com.exactpro.th2.common.schema.message.MessageSender
 import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager
 import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager.EMPTY_ROUTING_KEY
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.PublishConnectionManager
 import java.io.IOException
 
 class NotificationEventBatchSender(
-    private val connectionManager: ConnectionManager,
+    private val publishConnectionManager: PublishConnectionManager,
     private val exchange: String
 ) : MessageSender<EventBatch> {
     @Deprecated(
@@ -37,7 +38,7 @@ class NotificationEventBatchSender(
     @Throws(IOException::class)
     override fun send(message: EventBatch) {
         try {
-            connectionManager.basicPublish(exchange, EMPTY_ROUTING_KEY, null, message.toByteArray())
+            publishConnectionManager.basicPublish(exchange, EMPTY_ROUTING_KEY, null, message.toByteArray())
         } catch (e: Exception) {
             throw IOException(
                 "Can not send notification message: EventBatch: parent_event_id = ${message.parentEventId.id}",
diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/notification/NotificationEventBatchSubscriber.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/notification/NotificationEventBatchSubscriber.kt
index f8b66e930..ccc21e2ce 100644
--- a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/notification/NotificationEventBatchSubscriber.kt
+++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/notification/NotificationEventBatchSubscriber.kt
@@ -1,5 +1,5 @@
 /*
- * Copyright 2021-2023 Exactpro (Exactpro Systems Limited)
+ * Copyright 2021-2024 Exactpro (Exactpro Systems Limited)
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -22,21 +22,21 @@ import com.exactpro.th2.common.schema.message.DeliveryMetadata
 import com.exactpro.th2.common.schema.message.ManualAckDeliveryCallback
 import com.exactpro.th2.common.schema.message.MessageSubscriber
 import com.exactpro.th2.common.schema.message.SubscriberMonitor
-import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConsumeConnectionManager
 import com.rabbitmq.client.Delivery
 import mu.KotlinLogging
 import java.util.concurrent.CopyOnWriteArrayList
 
 // DRAFT of notification router
 class NotificationEventBatchSubscriber(
-    private val connectionManager: ConnectionManager,
+    private val consumeConnectionManager: ConsumeConnectionManager,
     private val queue: String
 ) : MessageSubscriber {
     private val listeners = CopyOnWriteArrayList<ConfirmationListener<EventBatch>>()
     private lateinit var monitor: SubscriberMonitor
 
     fun start() {
-        monitor = connectionManager.basicConsume(
+        monitor = consumeConnectionManager.basicConsume(
             queue,
             { deliveryMetadata: DeliveryMetadata, delivery: Delivery, confirmation: ManualAckDeliveryCallback.Confirmation ->
                 try {
@@ -76,4 +76,4 @@ class NotificationEventBatchSubscriber(
     companion object {
         private val LOGGER = KotlinLogging.logger {}
     }
-}
+}
\ No newline at end of file
diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/TransportGroupBatchSender.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/TransportGroupBatchSender.kt
index 40e47fbef..29c425d55 100644
--- a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/TransportGroupBatchSender.kt
+++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/TransportGroupBatchSender.kt
@@ -1,5 +1,5 @@
 /*
- * Copyright 2023 Exactpro (Exactpro Systems Limited)
+ * Copyright 2023-2024 Exactpro (Exactpro Systems Limited)
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -21,18 +21,18 @@ import com.exactpro.th2.common.metrics.SESSION_GROUP_LABEL
 import com.exactpro.th2.common.metrics.TH2_PIN_LABEL
 import com.exactpro.th2.common.schema.message.impl.rabbitmq.AbstractRabbitSender
 import com.exactpro.th2.common.schema.message.impl.rabbitmq.BookName
-import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.PublishConnectionManager
 import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.TransportGroupBatchRouter.Companion.TRANSPORT_GROUP_TYPE
 import io.prometheus.client.Counter
 
 class TransportGroupBatchSender(
-    connectionManager: ConnectionManager,
+    publishConnectionManager: PublishConnectionManager,
     exchangeName: String,
     routingKey: String,
     th2Pin: String,
     bookName: BookName,
 ) : AbstractRabbitSender<GroupBatch>(
-    connectionManager,
+    publishConnectionManager,
     exchangeName,
     routingKey,
     th2Pin,
diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/TransportGroupBatchSubscriber.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/TransportGroupBatchSubscriber.kt
index 4cc75b9eb..1063d35ac 100644
--- a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/TransportGroupBatchSubscriber.kt
+++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/TransportGroupBatchSubscriber.kt
@@ -1,5 +1,6 @@
 /*
- * Copyright 2020-2023 Exactpro (Exactpro Systems Limited)
+ * Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -23,19 +24,19 @@ import com.exactpro.th2.common.schema.message.DeliveryMetadata
 import com.exactpro.th2.common.schema.message.ManualAckDeliveryCallback
 import com.exactpro.th2.common.schema.message.configuration.RouterFilter
 import com.exactpro.th2.common.schema.message.impl.rabbitmq.AbstractRabbitSubscriber
-import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConsumeConnectionManager
 import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.TransportGroupBatchRouter.Companion.TRANSPORT_GROUP_TYPE
 import com.rabbitmq.client.Delivery
 import io.netty.buffer.Unpooled
 import io.prometheus.client.Counter
 
 class TransportGroupBatchSubscriber(
-    connectionManager: ConnectionManager,
+    consumeConnectionManager: ConsumeConnectionManager,
     queue: String,
     th2Pin: String,
     private val filters: List<RouterFilter>,
     messageListener: ConfirmationListener<GroupBatch>
-) : AbstractRabbitSubscriber<GroupBatch>(connectionManager, queue, th2Pin, TRANSPORT_GROUP_TYPE, messageListener) {
+) : AbstractRabbitSubscriber<GroupBatch>(consumeConnectionManager, queue, th2Pin, TRANSPORT_GROUP_TYPE, messageListener) {
 
     override fun valueFromBytes(body: ByteArray): GroupBatch = Unpooled.wrappedBuffer(body).run(GroupBatchCodec::decode)
 
diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitRouterIntegrationTest.kt b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitRouterIntegrationTest.kt
index 38247b0b8..eb928fd8e 100644
--- a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitRouterIntegrationTest.kt
+++ b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitRouterIntegrationTest.kt
@@ -29,9 +29,10 @@ import com.exactpro.th2.common.schema.message.configuration.MessageRouterConfigu
 import com.exactpro.th2.common.schema.message.configuration.QueueConfiguration
 import com.exactpro.th2.common.schema.message.impl.context.DefaultMessageRouterContext
 import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.ConnectionManagerConfiguration
-import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.RabbitMQConfiguration
-import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.PublishConnectionManager
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConsumeConnectionManager
 import com.exactpro.th2.common.schema.message.impl.rabbitmq.custom.RabbitCustomRouter
+import com.exactpro.th2.common.util.getRabbitMQConfiguration
 import com.rabbitmq.client.BuiltinExchangeType
 import mu.KotlinLogging
 import org.junit.jupiter.api.Assertions.assertNull
@@ -61,9 +62,8 @@ class AbstractRabbitRouterIntegrationTest {
                 rabbitMQContainer.start()
                 K_LOGGER.info { "Started with port ${rabbitMQContainer.amqpPort}, rest ${rabbitMQContainer.httpUrl} ${rabbitMQContainer.adminUsername} ${rabbitMQContainer.adminPassword} " }
 
-                createConnectionManager(rabbitMQContainer).use { publishManager ->
-                    createConnectionManager(rabbitMQContainer).use { consumeManager ->
-
+                createConsumeConnectionManager(rabbitMQContainer).use { consumeManager ->
+                    createPublishConnectionManager(rabbitMQContainer).use { publishManager ->
                         createRouter(publishManager, consumeManager).use { firstRouter ->
                             val messageA = "test-message-a"
                             val messageB = "test-message-b"
@@ -111,8 +111,8 @@ class AbstractRabbitRouterIntegrationTest {
         queue: ArrayBlockingQueue<Delivery>,
         expectations: List<Expectation>,
     ) {
-        createConnectionManager(rabbitMQContainer).use { publishManager ->
-            createConnectionManager(rabbitMQContainer).use { consumeManager ->
+        createConsumeConnectionManager(rabbitMQContainer).use { consumeManager ->
+            createPublishConnectionManager(rabbitMQContainer).use { publishManager ->
                 createRouter(publishManager, consumeManager).use { router ->
                     val monitor = router.subscribeWithManualAck({ deliveryMetadata, message, confirmation ->
                         queue.put(
@@ -160,27 +160,34 @@ class AbstractRabbitRouterIntegrationTest {
         }
     }
 
-    private fun createConnectionManager(
-        rabbitMQContainer: RabbitMQContainer,
-        prefetchCount: Int = DEFAULT_PREFETCH_COUNT,
-        confirmationTimeout: Duration = DEFAULT_CONFIRMATION_TIMEOUT
-    ) = ConnectionManager(
-        "test-connection",
-        RabbitMQConfiguration(
-            host = rabbitMQContainer.host,
-            vHost = "",
-            port = rabbitMQContainer.amqpPort,
-            username = rabbitMQContainer.adminUsername,
-            password = rabbitMQContainer.adminPassword,
-        ),
+    private fun getConnectionManagerConfiguration(prefetchCount: Int, confirmationTimeout: Duration) =
         ConnectionManagerConfiguration(
             subscriberName = "test",
             prefetchCount = prefetchCount,
-            confirmationTimeout = confirmationTimeout,
-        ),
+            confirmationTimeout = confirmationTimeout
+        )
+
+    private fun createPublishConnectionManager(
+        rabbitMQContainer: RabbitMQContainer,
+        prefetchCount: Int = DEFAULT_PREFETCH_COUNT,
+        confirmationTimeout: Duration = DEFAULT_CONFIRMATION_TIMEOUT
+    ) = PublishConnectionManager(
+        "test-publish-connection",
+        getRabbitMQConfiguration(rabbitMQContainer),
+        getConnectionManagerConfiguration(prefetchCount, confirmationTimeout)
+    )
+
+    private fun createConsumeConnectionManager(
+        rabbitMQContainer: RabbitMQContainer,
+        prefetchCount: Int = DEFAULT_PREFETCH_COUNT,
+        confirmationTimeout: Duration = DEFAULT_CONFIRMATION_TIMEOUT
+    ) = ConsumeConnectionManager(
+        "test-consume-connection",
+        getRabbitMQConfiguration(rabbitMQContainer),
+        getConnectionManagerConfiguration(prefetchCount, confirmationTimeout)
     )
 
-    private fun createRouter(publishConnectionManager: ConnectionManager, consumeConnectionManager: ConnectionManager) = RabbitCustomRouter(
+    private fun createRouter(publishConnectionManager: PublishConnectionManager, consumeConnectionManager: ConsumeConnectionManager) = RabbitCustomRouter(
         "test-custom-tag",
         arrayOf("test-label"),
         TestMessageConverter()
diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitRouterTest.kt b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitRouterTest.kt
index 49723228a..015da1abd 100644
--- a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitRouterTest.kt
+++ b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitRouterTest.kt
@@ -1,5 +1,5 @@
 /*
- * Copyright 2023 Exactpro (Exactpro Systems Limited)
+ * Copyright 2023-2024 Exactpro (Exactpro Systems Limited)
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -13,6 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package com.exactpro.th2.common.schema.message.impl.rabbitmq
 
 import com.exactpro.th2.common.event.bean.BaseTest
@@ -22,7 +23,8 @@ import com.exactpro.th2.common.schema.message.configuration.MessageRouterConfigu
 import com.exactpro.th2.common.schema.message.configuration.QueueConfiguration
 import com.exactpro.th2.common.schema.message.impl.context.DefaultMessageRouterContext
 import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.ConnectionManagerConfiguration
-import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.PublishConnectionManager
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConsumeConnectionManager
 import com.exactpro.th2.common.schema.message.impl.rabbitmq.custom.RabbitCustomRouter
 import org.junit.jupiter.api.AfterEach
 import org.junit.jupiter.api.Assertions.assertAll
@@ -48,13 +50,10 @@ private const val TEST_EXCLUSIVE_QUEUE = "test-exclusive-queue"
 class AbstractRabbitRouterTest {
     private val connectionConfiguration = ConnectionManagerConfiguration()
     private val managerMonitor: ExclusiveSubscriberMonitor = mock { }
-    private val publishConnectionManager: ConnectionManager = mock {
+    private val publishConnectionManager: PublishConnectionManager = mock {
         on { configuration }.thenReturn(connectionConfiguration)
-        on { basicConsume(any(), any(), any()) }.thenReturn(managerMonitor)
-        on { queueDeclare() }.thenAnswer { "$TEST_EXCLUSIVE_QUEUE-${exclusiveQueueCounter.incrementAndGet()}" }
     }
-
-    private val consumeConnectionManager: ConnectionManager = mock {
+    private val consumeConnectionManager: ConsumeConnectionManager = mock {
         on { configuration }.thenReturn(connectionConfiguration)
         on { basicConsume(any(), any(), any()) }.thenReturn(managerMonitor)
         on { queueDeclare() }.thenAnswer { "$TEST_EXCLUSIVE_QUEUE-${exclusiveQueueCounter.incrementAndGet()}" }
@@ -172,7 +171,7 @@ class AbstractRabbitRouterTest {
         }
 
         @Test
-        fun `subscribes when subscribtion active`() {
+        fun `subscribes when subscription active`() {
             router.subscribe(mock { }, "1")
             clearInvocations(consumeConnectionManager)
             clearInvocations(managerMonitor)
@@ -190,7 +189,7 @@ class AbstractRabbitRouterTest {
         }
 
         @Test
-        fun `subscribes to exclusive queue when subscribtion active`() {
+        fun `subscribes to exclusive queue when subscription active`() {
             val monitorA = router.subscribeExclusive(mock { })
             val monitorB = router.subscribeExclusive(mock { })
 
diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManualBenchmark.kt b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManualBenchmark.kt
index 2f8fa25a6..a0c15317a 100644
--- a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManualBenchmark.kt
+++ b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManualBenchmark.kt
@@ -18,7 +18,7 @@ package com.exactpro.th2.common.schema.message.impl.rabbitmq.connection
 
 import com.exactpro.th2.common.schema.message.ContainerConstants.RABBITMQ_IMAGE_NAME
 import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.ConnectionManagerConfiguration
-import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.RabbitMQConfiguration
+import com.exactpro.th2.common.util.getRabbitMQConfiguration
 import mu.KotlinLogging
 import org.testcontainers.containers.RabbitMQContainer
 import java.time.Duration
@@ -40,7 +40,7 @@ object ConnectionManualBenchmark {
                 execInContainer("rabbitmqadmin", "declare", "exchange", "name=$exchangeName", "type=fanout")
                 execInContainer("rabbitmqadmin", "declare", "binding", "source=$exchangeName", "destination_type=queue", "destination=$queueName")
 
-                val consumer = createConnectionManager(
+                val consumer = createConsumeConnectionManager(
                     container,
                     ConnectionManagerConfiguration(
                         prefetchCount = 1000,
@@ -51,7 +51,7 @@ object ConnectionManualBenchmark {
                 )
 
                 val connectionManagerWithConfirmation = {
-                    createConnectionManager(
+                    createPublishConnectionManager(
                         container,
                         ConnectionManagerConfiguration(
                             prefetchCount = 100,
@@ -63,7 +63,7 @@ object ConnectionManualBenchmark {
                 }
 
                 val connectionManagerWithoutConfirmation = {
-                    createConnectionManager(
+                    createPublishConnectionManager(
                         container,
                         ConnectionManagerConfiguration(
                             prefetchCount = 100,
@@ -102,7 +102,7 @@ object ConnectionManualBenchmark {
         }
     }
 
-    private fun measure(name: String, manager: () -> ConnectionManager, payload: ByteArray): Duration {
+    private fun measure(name: String, manager: () -> PublishConnectionManager, payload: ByteArray): Duration {
         LOGGER.info("Measuring $name")
         val start: Long
         val sent: Long
@@ -136,16 +136,17 @@ object ConnectionManualBenchmark {
         return duration
     }
 
-    private fun createConnectionManager(container: RabbitMQContainer, configuration: ConnectionManagerConfiguration) =
-        ConnectionManager(
-            "test-connection",
-            RabbitMQConfiguration(
-                host = container.host,
-                vHost = "",
-                port = container.amqpPort,
-                username = container.adminUsername,
-                password = container.adminPassword,
-            ),
+    private fun createPublishConnectionManager(container: RabbitMQContainer, configuration: ConnectionManagerConfiguration) =
+        PublishConnectionManager(
+            "test-publish-connection",
+            getRabbitMQConfiguration(container),
+            configuration
+        )
+
+    private fun createConsumeConnectionManager(container: RabbitMQContainer, configuration: ConnectionManagerConfiguration) =
+        ConsumeConnectionManager(
+            "test-consume-connection",
+            getRabbitMQConfiguration(container),
             configuration
         )
 }
\ No newline at end of file
diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt
index 1063ebf1f..e9164a84c 100644
--- a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt
+++ b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt
@@ -31,6 +31,7 @@ import com.exactpro.th2.common.util.getChannelsInfo
 import com.exactpro.th2.common.util.getQueuesInfo
 import com.exactpro.th2.common.util.getSubscribedChannelsCount
 import com.exactpro.th2.common.util.putMessageInQueue
+import com.exactpro.th2.common.util.getRabbitMQConfiguration
 import com.github.dockerjava.api.model.Capability
 import com.rabbitmq.client.BuiltinExchangeType
 import com.rabbitmq.client.CancelCallback
@@ -80,80 +81,85 @@ class TestConnectionManager {
                 val messagesCount = 10
                 val countDown = CountDownLatch(messagesCount)
                 val messageSizeBytes = 7
-                createConnectionManager(
-                    rabbit, ConnectionManagerConfiguration(
-                        subscriberName = "test",
-                        prefetchCount = DEFAULT_PREFETCH_COUNT,
-                        confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT,
-                        enablePublisherConfirmation = true,
-                        maxInflightPublicationsBytes = 5 * messageSizeBytes,
-                        heartbeatIntervalSeconds = 1,
-                        minConnectionRecoveryTimeout = 2000,
-                        maxConnectionRecoveryTimeout = 2000,
-                        // to avoid unexpected delays before recovery
-                        retryTimeDeviationPercent = 0,
-                    )
-                ).use { manager ->
-                    val receivedMessages = linkedSetOf<String>()
-                    manager.basicConsume(queueName, { _, delivery, ack ->
-                        val message = delivery.body.toString(Charsets.UTF_8)
-                        LOGGER.info { "Received $message from ${delivery.envelope.routingKey}" }
-                        if (receivedMessages.add(message)) {
-                            // decrement only unique messages
-                            countDown.countDown()
-                        } else {
-                            LOGGER.warn { "Duplicated $message for ${delivery.envelope.routingKey}" }
+                val managerConfiguration = ConnectionManagerConfiguration(
+                    subscriberName = "test",
+                    prefetchCount = DEFAULT_PREFETCH_COUNT,
+                    confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT,
+                    enablePublisherConfirmation = true,
+                    maxInflightPublicationsBytes = 5 * messageSizeBytes,
+                    heartbeatIntervalSeconds = 1,
+                    minConnectionRecoveryTimeout = 2000,
+                    maxConnectionRecoveryTimeout = 2000,
+                    // to avoid unexpected delays before recovery
+                    retryTimeDeviationPercent = 0,
+                )
+                createConsumeConnectionManager(rabbit, managerConfiguration).use { consumeManager ->
+                    createPublishConnectionManager(rabbit, managerConfiguration).use { publishManager ->
+                        val receivedMessages = linkedSetOf<String>()
+                        consumeManager.basicConsume(queueName, { _, delivery, ack ->
+                            val message = delivery.body.toString(Charsets.UTF_8)
+                            LOGGER.info { "Received $message from ${delivery.envelope.routingKey}" }
+                            if (receivedMessages.add(message)) {
+                                // decrement only unique messages
+                                countDown.countDown()
+                            } else {
+                                LOGGER.warn { "Duplicated $message for ${delivery.envelope.routingKey}" }
+                            }
+                            ack.confirm()
+                        }) {
+                            LOGGER.info { "Canceled $it" }
                         }
-                        ack.confirm()
-                    }) {
-                        LOGGER.info { "Canceled $it" }
-                    }
 
-
-                    var future: CompletableFuture<*>? = null
-                    repeat(messagesCount) { index ->
-                        if (index == 1) {
-                            // delay should allow ack for the first message be received
-                            Awaitility.await("first message is confirmed")
-                                .pollInterval(10, TimeUnit.MILLISECONDS)
-                                .atMost(100, TimeUnit.MILLISECONDS)
-                                .until { countDown.count == messagesCount - 1L }
-                            // Man pages:
-                            // https://man7.org/linux/man-pages/man8/tc-netem.8.html
-                            // https://man7.org/linux/man-pages/man8/ifconfig.8.html
-                            //
-                            // Here we try to emulate network outage to cause missing publication confirmations.
-                            //
-                            // In real life we will probably get duplicates in this case because
-                            // rabbitmq does not provide exactly-once semantic.
-                            // So, we will have to deal with it on the consumer side
-                            rabbit.executeInContainerWithLogging("ifconfig", "eth0", "down")
-                        } else if (index == 4) {
-                            future = CompletableFuture.supplyAsync {
-                                // Interface is unblock in separate thread to emulate more realistic scenario
-
-                                // More than 2 HB will be missed
-                                // This is enough for rabbitmq server to understand the connection is lost
-                                Awaitility.await("connection is closed")
-                                    .atMost(3, TimeUnit.SECONDS)
-                                    .until { !manager.isOpen }
-                                // enabling network interface back
-                                rabbit.executeInContainerWithLogging("ifconfig", "eth0", "up")
+                        var future: CompletableFuture<*>? = null
+                        repeat(messagesCount) { index ->
+                            if (index == 1) {
+                                // delay should allow ack for the first message be received
+                                Awaitility.await("first message is confirmed")
+                                    .pollInterval(10, TimeUnit.MILLISECONDS)
+                                    .atMost(100, TimeUnit.MILLISECONDS)
+                                    .until { countDown.count == messagesCount - 1L }
+                                // Man pages:
+                                // https://man7.org/linux/man-pages/man8/tc-netem.8.html
+                                // https://man7.org/linux/man-pages/man8/ifconfig.8.html
+                                //
+                                // Here we try to emulate network outage to cause missing publication confirmations.
+                                //
+                                // In real life we will probably get duplicates in this case because
+                                // rabbitmq does not provide exactly-once semantic.
+                                // So, we will have to deal with it on the consumer side
+                                rabbit.executeInContainerWithLogging("ifconfig", "eth0", "down")
+                            } else if (index == 4) {
+                                future = CompletableFuture.supplyAsync {
+                                    // Interface is unblock in separate thread to emulate more realistic scenario
+
+                                    // More than 2 HB will be missed
+                                    // This is enough for rabbitmq server to understand the connection is lost
+                                    Awaitility.await("connection is closed")
+                                        .atMost(3, TimeUnit.SECONDS)
+                                        .until { !consumeManager.isOpen }
+                                    // enabling network interface back
+                                    rabbit.executeInContainerWithLogging("ifconfig", "eth0", "up")
+                                }
                             }
+                            publishManager.basicPublish(
+                                exchange,
+                                routingKey,
+                                null,
+                                "Hello $index".toByteArray(Charsets.UTF_8)
+                            )
                         }
-                        manager.basicPublish(exchange, routingKey, null, "Hello $index".toByteArray(Charsets.UTF_8))
-                    }
 
-                    future?.get(30, TimeUnit.SECONDS)
+                        future?.get(30, TimeUnit.SECONDS)
 
-                    countDown.assertComplete { "Not all messages were received: $receivedMessages" }
-                    assertEquals(
-                        (0 until messagesCount).map {
-                            "Hello $it"
-                        },
-                        receivedMessages.toList(),
-                        "messages received in unexpected order",
-                    )
+                        countDown.assertComplete { "Not all messages were received: $receivedMessages" }
+                        assertEquals(
+                            (0 until messagesCount).map {
+                                "Hello $it"
+                            },
+                            receivedMessages.toList(),
+                            "messages received in unexpected order",
+                        )
+                    }
                 }
             }
     }
@@ -171,7 +177,7 @@ class TestConnectionManager {
                 val messagesCount = 10
                 val countDown = CountDownLatch(messagesCount)
                 val messageSizeBytes = 7
-                createConnectionManager(
+                createConsumeConnectionManager(
                     rabbit, ConnectionManagerConfiguration(
                         subscriberName = "test",
                         prefetchCount = DEFAULT_PREFETCH_COUNT,
@@ -197,7 +203,7 @@ class TestConnectionManager {
                         LOGGER.info { "Canceled $it" }
                     }
 
-                    createConnectionManager(
+                    createPublishConnectionManager(
                         rabbit, ConnectionManagerConfiguration(
                             prefetchCount = DEFAULT_PREFETCH_COUNT,
                             confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT,
@@ -263,44 +269,45 @@ class TestConnectionManager {
                 LOGGER.info { "Started with port ${it.amqpPort}" }
                 val queue = ArrayBlockingQueue<ManualAckDeliveryCallback.Confirmation>(DEFAULT_PREFETCH_COUNT)
                 val countDown = CountDownLatch(DEFAULT_PREFETCH_COUNT)
-                createConnectionManager(
-                    it, ConnectionManagerConfiguration(
-                        subscriberName = "test",
-                        prefetchCount = DEFAULT_PREFETCH_COUNT,
-                        confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT,
-                    )
-                ).use { manager ->
-                    manager.basicConsume(queueName, { _, delivery, ack ->
-                        LOGGER.info { "Received ${delivery.body.toString(Charsets.UTF_8)} from ${delivery.envelope.routingKey}" }
-                        queue += ack
-                        countDown.countDown()
-                    }) {
-                        LOGGER.info { "Canceled $it" }
-                    }
+                val managerConfiguration = ConnectionManagerConfiguration(
+                    subscriberName = "test",
+                    prefetchCount = DEFAULT_PREFETCH_COUNT,
+                    confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT,
+                )
+                createConsumeConnectionManager(it, managerConfiguration).use { consumeManager ->
+                    createPublishConnectionManager(it, managerConfiguration).use { publishManager ->
+                        consumeManager.basicConsume(queueName, { _, delivery, ack ->
+                            LOGGER.info { "Received ${delivery.body.toString(Charsets.UTF_8)} from ${delivery.envelope.routingKey}" }
+                            queue += ack
+                            countDown.countDown()
+                        }) {
+                            LOGGER.info { "Canceled $it" }
+                        }
 
-                    repeat(DEFAULT_PREFETCH_COUNT + 1) { index ->
-                        manager.basicPublish(exchange, routingKey, null, "Hello $index".toByteArray(Charsets.UTF_8))
-                    }
+                        repeat(DEFAULT_PREFETCH_COUNT + 1) { index ->
+                            publishManager.basicPublish(exchange, routingKey, null, "Hello $index".toByteArray(Charsets.UTF_8))
+                        }
 
-                    countDown.assertComplete("Not all messages were received")
+                        countDown.assertComplete("Not all messages were received")
 
-                    assertTrue(manager.isAlive) { "Manager should still be alive" }
-                    assertTrue(manager.isReady) { "Manager should be ready until the confirmation timeout expires" }
+                        assertTrue(consumeManager.isAlive) { "Manager should still be alive" }
+                        assertTrue(consumeManager.isReady) { "Manager should be ready until the confirmation timeout expires" }
 
-                    Thread.sleep(DEFAULT_CONFIRMATION_TIMEOUT.toMillis() + 100/*just in case*/) // wait for confirmation timeout
+                        Thread.sleep(DEFAULT_CONFIRMATION_TIMEOUT.toMillis() + 100/*just in case*/) // wait for confirmation timeout
 
-                    assertTrue(manager.isAlive) { "Manager should still be alive" }
-                    assertFalse(manager.isReady) { "Manager should not be ready" }
+                        assertTrue(consumeManager.isAlive) { "Manager should still be alive" }
+                        assertFalse(consumeManager.isReady) { "Manager should not be ready" }
 
-                    queue.poll().confirm()
+                        queue.poll().confirm()
 
-                    assertTrue(manager.isAlive) { "Manager should still be alive" }
-                    assertTrue(manager.isReady) { "Manager should be ready" }
+                        assertTrue(consumeManager.isAlive) { "Manager should still be alive" }
+                        assertTrue(consumeManager.isReady) { "Manager should be ready" }
 
-                    val receivedData = generateSequence { queue.poll(10L, TimeUnit.MILLISECONDS) }
-                        .onEach(ManualAckDeliveryCallback.Confirmation::confirm)
-                        .count()
-                    assertEquals(DEFAULT_PREFETCH_COUNT, receivedData) { "Unexpected number of messages received" }
+                        val receivedData = generateSequence { queue.poll(10L, TimeUnit.MILLISECONDS) }
+                            .onEach(ManualAckDeliveryCallback.Confirmation::confirm)
+                            .count()
+                        assertEquals(DEFAULT_PREFETCH_COUNT, receivedData) { "Unexpected number of messages received" }
+                    }
                 }
             }
     }
@@ -311,7 +318,7 @@ class TestConnectionManager {
         rabbit
             .let { rabbitMQContainer ->
                 LOGGER.info { "Started with port ${rabbitMQContainer.amqpPort}" }
-                createConnectionManager(
+                createConsumeConnectionManager(
                     rabbitMQContainer,
                     ConnectionManagerConfiguration(
                         subscriberName = "test",
@@ -338,7 +345,7 @@ class TestConnectionManager {
 
                         assertTarget(true, message = "Thread for consuming isn't started", func = thread::isAlive)
                         // todo check isReady and isAlive, it should be false at some point
-//                        assertTarget(false, "Readiness probe doesn't fall down", connectionManager::isReady)
+                        // assertTarget(false, "Readiness probe doesn't fall down", connectionManager::isReady)
 
                         LOGGER.info { "creating the queue..." }
                         declareQueue(rabbitMQContainer, wrongQueue)
@@ -386,56 +393,56 @@ class TestConnectionManager {
                 LOGGER.info { "Started with port ${it.amqpPort}" }
                 val counter = AtomicInteger()
                 val downLatch = CountDownLatch(1)
-                createConnectionManager(
-                    it,
-                    ConnectionManagerConfiguration(
-                        subscriberName = "test",
-                        prefetchCount = DEFAULT_PREFETCH_COUNT,
-                        confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT,
-                        minConnectionRecoveryTimeout = 100,
-                        maxConnectionRecoveryTimeout = 200,
-                        maxRecoveryAttempts = 5
-                    ),
-                ).use { connectionManager ->
-                    var monitor: SubscriberMonitor? = null
-                    try {
-                        monitor = connectionManager.basicConsume(queueName, { _, delivery, _ ->
-                            counter.incrementAndGet()
-                            downLatch.countDown()
-                            LOGGER.info { "Received ${delivery.body.toString(Charsets.UTF_8)} from \"${delivery.envelope.routingKey}\"" }
-                        }) {
-                            LOGGER.info { "Canceled $it" }
-                        }
-
-                        LOGGER.info { "Starting first publishing..." }
-                        connectionManager.basicPublish(exchange, "", null, "Hello1".toByteArray(Charsets.UTF_8))
-                        Thread.sleep(200)
-                        LOGGER.info { "Publication finished!" }
-                        assertEquals(
-                            0,
-                            counter.get(),
-                        ) { "Unexpected number of messages received. The first message shouldn't be received" }
-                        LOGGER.info { "Creating the correct exchange..." }
-                        declareFanoutExchangeWithBinding(it, exchange, queueName)
-                        LOGGER.info { "Exchange created!" }
+                val connectionManagerConfiguration = ConnectionManagerConfiguration(
+                    subscriberName = "test",
+                    prefetchCount = DEFAULT_PREFETCH_COUNT,
+                    confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT,
+                    minConnectionRecoveryTimeout = 100,
+                    maxConnectionRecoveryTimeout = 200,
+                    maxRecoveryAttempts = 5
+                )
+                createConsumeConnectionManager(it, connectionManagerConfiguration).use { consumeManager ->
+                    createPublishConnectionManager(it, connectionManagerConfiguration).use { publishManager ->
+                        var monitor: SubscriberMonitor? = null
+                        try {
+                            monitor = consumeManager.basicConsume(queueName, { _, delivery, _ ->
+                                counter.incrementAndGet()
+                                downLatch.countDown()
+                                LOGGER.info { "Received ${delivery.body.toString(Charsets.UTF_8)} from \"${delivery.envelope.routingKey}\"" }
+                            }) {
+                                LOGGER.info { "Canceled $it" }
+                            }
 
-                        Assertions.assertDoesNotThrow {
-                            connectionManager.basicPublish(exchange, "", null, "Hello2".toByteArray(Charsets.UTF_8))
-                        }
+                            LOGGER.info { "Starting first publishing..." }
+                            publishManager.basicPublish(exchange, "", null, "Hello1".toByteArray(Charsets.UTF_8))
+                            Thread.sleep(200)
+                            LOGGER.info { "Publication finished!" }
+                            assertEquals(
+                                0,
+                                counter.get(),
+                            ) { "Unexpected number of messages received. The first message shouldn't be received" }
+                            LOGGER.info { "Creating the correct exchange..." }
+                            declareFanoutExchangeWithBinding(it, exchange, queueName)
+                            LOGGER.info { "Exchange created!" }
+
+                            Assertions.assertDoesNotThrow {
+                                publishManager.basicPublish(exchange, "", null, "Hello2".toByteArray(Charsets.UTF_8))
+                            }
 
-                        downLatch.assertComplete(1L, TimeUnit.SECONDS) { "no messages were received" }
+                            downLatch.assertComplete(1L, TimeUnit.SECONDS) { "no messages were received" }
 
-                        assertEquals(
-                            1,
-                            counter.get()
-                        ) { "Unexpected number of messages received. The second message should be received" }
-                    } finally {
-                        Assertions.assertNotNull(monitor)
-                        Assertions.assertDoesNotThrow {
-                            monitor!!.unsubscribe()
+                            assertEquals(
+                                1,
+                                counter.get()
+                            ) { "Unexpected number of messages received. The second message should be received" }
+                        } finally {
+                            Assertions.assertNotNull(monitor)
+                            Assertions.assertDoesNotThrow {
+                                monitor!!.unsubscribe()
+                            }
                         }
-                    }
 
+                    }
                 }
             }
     }
@@ -452,7 +459,7 @@ class TestConnectionManager {
                 it.start()
                 LOGGER.info { "Started with port ${it.amqpPort}" }
 
-                createConnectionManager(
+                createConsumeConnectionManager(
                     it,
                     ConnectionManagerConfiguration(
                         subscriberName = "test",
@@ -523,7 +530,7 @@ class TestConnectionManager {
                     queueNames[1] to AtomicInteger(-1), // this subscriber won't ack two first deliveries
                     queueNames[2] to AtomicInteger(1)
                 )
-                createConnectionManager(
+                createConsumeConnectionManager(
                     it,
                     ConnectionManagerConfiguration(
                         subscriberName = "test",
@@ -614,8 +621,8 @@ class TestConnectionManager {
                 it.start()
                 declareQueue(it, queueName)
                 LOGGER.info { "Started with port ${it.amqpPort}" }
-                ConnectionManager(
-                    "test-connection",
+                ConsumeConnectionManager(
+                    "test-consume-connection",
                     RabbitMQConfiguration(
                         host = it.host,
                         vHost = "",
@@ -632,9 +639,9 @@ class TestConnectionManager {
                         connectionTimeout = 1000,
                         maxRecoveryAttempts = 5
                     ),
-                ).use { connectionManager ->
+                ).use { publishConnectionManager ->
                     val consume = CountDownLatch(1)
-                    connectionManager.basicConsume(queueName, { _, delivery, ack ->
+                    publishConnectionManager.basicConsume(queueName, { _, delivery, ack ->
                         LOGGER.info { "Received ${delivery.body.toString(Charsets.UTF_8)} from ${delivery.envelope.routingKey}" }
                         consume.countDown()
                         ack.confirm()
@@ -678,47 +685,52 @@ class TestConnectionManager {
             .let {
                 LOGGER.info { "Started with port ${it.amqpPort}" }
                 val counter = AtomicInteger(0)
-                createConnectionManager(
-                    it,
-                    ConnectionManagerConfiguration(
-                        subscriberName = "test",
-                        prefetchCount = DEFAULT_PREFETCH_COUNT,
-                        confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT,
-                        minConnectionRecoveryTimeout = 10000,
-                        maxConnectionRecoveryTimeout = 20000,
-                        connectionTimeout = 10000,
-                        maxRecoveryAttempts = 5
-                    ),
-                ).use { connectionManager ->
-                    var monitor: SubscriberMonitor? = null
-                    try {
-                        declareQueue(it, queueName)
-                        declareFanoutExchangeWithBinding(it, exchange, queueName)
-
-                        connectionManager.basicPublish(exchange, routingKey, null, "Hello1".toByteArray(Charsets.UTF_8))
+                val connectionManagerConfiguration = ConnectionManagerConfiguration(
+                    subscriberName = "test",
+                    prefetchCount = DEFAULT_PREFETCH_COUNT,
+                    confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT,
+                    minConnectionRecoveryTimeout = 10000,
+                    maxConnectionRecoveryTimeout = 20000,
+                    connectionTimeout = 10000,
+                    maxRecoveryAttempts = 5
+                )
+                createConsumeConnectionManager(it, connectionManagerConfiguration).use { consumeConnectionManager ->
+                    createPublishConnectionManager(it, connectionManagerConfiguration).use { publishConnectionManager ->
+                        var monitor: SubscriberMonitor? = null
+                        try {
+                            declareQueue(it, queueName)
+                            declareFanoutExchangeWithBinding(it, exchange, queueName)
+
+                            publishConnectionManager.basicPublish(
+                                exchange,
+                                routingKey,
+                                null,
+                                "Hello1".toByteArray(Charsets.UTF_8)
+                            )
 
-                        Thread.sleep(200)
-                        monitor = connectionManager.basicConsume(queueName, { _, delivery, ack ->
-                            LOGGER.info { "Received ${delivery.body.toString(Charsets.UTF_8)} from ${delivery.envelope.routingKey}" }
-                            counter.incrementAndGet()
-                            ack.confirm()
-                        }) {
-                            LOGGER.info { "Canceled $it" }
+                            Thread.sleep(200)
+                            monitor = consumeConnectionManager.basicConsume(queueName, { _, delivery, ack ->
+                                LOGGER.info { "Received ${delivery.body.toString(Charsets.UTF_8)} from ${delivery.envelope.routingKey}" }
+                                counter.incrementAndGet()
+                                ack.confirm()
+                            }) {
+                                LOGGER.info { "Canceled $it" }
+                            }
+                            Thread.sleep(200)
+
+                            assertEquals(1, getSubscribedChannelsCount(it, queueName))
+                            assertEquals(1, counter.get()) { "Wrong number of received messages" }
+                            assertTrue(
+                                getQueuesInfo(it).toString().contains("$queueName\t0")
+                            ) { "There should be no messages left in the queue" }
+                        } finally {
+                            Assertions.assertNotNull(monitor)
+                            Assertions.assertDoesNotThrow {
+                                monitor!!.unsubscribe()
+                            }
                         }
-                        Thread.sleep(200)
 
-                        assertEquals(1, getSubscribedChannelsCount(it, queueName))
-                        assertEquals(1, counter.get()) { "Wrong number of received messages" }
-                        assertTrue(
-                            getQueuesInfo(it).toString().contains("$queueName\t0")
-                        ) { "There should be no messages left in the queue" }
-                    } finally {
-                        Assertions.assertNotNull(monitor)
-                        Assertions.assertDoesNotThrow {
-                            monitor!!.unsubscribe()
-                        }
                     }
-
                 }
             }
     }
@@ -730,7 +742,6 @@ class TestConnectionManager {
         val exchange = "test-exchange7"
         val routingKey = "routingKey7"
 
-
         RabbitMQContainer(RABBITMQ_IMAGE_NAME)
             .withRabbitMQConfig(MountableFile.forClasspathResource(configFilename))
             .withExchange(exchange, BuiltinExchangeType.FANOUT.type, false, false, true, emptyMap())
@@ -740,56 +751,56 @@ class TestConnectionManager {
                 it.start()
                 LOGGER.info { "Started with port ${it.amqpPort}" }
                 val counter = AtomicInteger(0)
-                createConnectionManager(
-                    it,
-                    ConnectionManagerConfiguration(
-                        subscriberName = "test",
-                        prefetchCount = DEFAULT_PREFETCH_COUNT,
-                        confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT,
-                        minConnectionRecoveryTimeout = 100,
-                        maxConnectionRecoveryTimeout = 200,
-                        maxRecoveryAttempts = 5
-                    ),
-                ).use { connectionManager ->
-                    connectionManager.basicConsume(queueName, { _, delivery, ack ->
-                        LOGGER.info { "Received ${delivery.body.toString(Charsets.UTF_8)} " }
-                        if (counter.get() != 0) {
-                            ack.confirm()
-                            LOGGER.info { "Confirmed!" }
-                        } else {
-                            LOGGER.info { "Left this message unacked" }
+                val connectionManagerConfiguration = ConnectionManagerConfiguration(
+                    subscriberName = "test",
+                    prefetchCount = DEFAULT_PREFETCH_COUNT,
+                    confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT,
+                    minConnectionRecoveryTimeout = 100,
+                    maxConnectionRecoveryTimeout = 200,
+                    maxRecoveryAttempts = 5
+                )
+                createConsumeConnectionManager(it, connectionManagerConfiguration).use { consumeConnectionManager ->
+                    createPublishConnectionManager(it, connectionManagerConfiguration).use { publishConnectionManager ->
+                        consumeConnectionManager.basicConsume(queueName, { _, delivery, ack ->
+                            LOGGER.info { "Received ${delivery.body.toString(Charsets.UTF_8)} " }
+                            if (counter.get() != 0) {
+                                ack.confirm()
+                                LOGGER.info { "Confirmed!" }
+                            } else {
+                                LOGGER.info { "Left this message unacked" }
+                            }
+                            counter.incrementAndGet()
+                        }) {
+                            LOGGER.info { "Canceled $it" }
                         }
-                        counter.incrementAndGet()
-                    }) {
-                        LOGGER.info { "Canceled $it" }
-                    }
 
 
-                    LOGGER.info { "Sending the first message" }
-                    connectionManager.basicPublish(exchange, routingKey, null, "Hello1".toByteArray(Charsets.UTF_8))
+                        LOGGER.info { "Sending the first message" }
+                        publishConnectionManager.basicPublish(exchange, routingKey, null, "Hello1".toByteArray(Charsets.UTF_8))
 
-                    LOGGER.info { "queues list: \n ${getQueuesInfo(it)}" }
-                    LOGGER.info { "Sleeping..." }
-                    Thread.sleep(33000)
+                        LOGGER.info { "queues list: \n ${getQueuesInfo(it)}" }
+                        LOGGER.info { "Sleeping..." }
+                        Thread.sleep(33000)
 
 
-                    LOGGER.info { "Sending the second message" }
-                    connectionManager.basicPublish(exchange, routingKey, null, "Hello2".toByteArray(Charsets.UTF_8))
+                        LOGGER.info { "Sending the second message" }
+                        publishConnectionManager.basicPublish(exchange, routingKey, null, "Hello2".toByteArray(Charsets.UTF_8))
 
-                    Thread.sleep(30000)
+                        Thread.sleep(30000)
 
-                    LOGGER.info { "Sending the third message" }
-                    connectionManager.basicPublish(exchange, routingKey, null, "Hello3".toByteArray(Charsets.UTF_8))
+                        LOGGER.info { "Sending the third message" }
+                        publishConnectionManager.basicPublish(exchange, routingKey, null, "Hello3".toByteArray(Charsets.UTF_8))
 
-                    val queuesListExecResult = getQueuesInfo(it)
-                    LOGGER.info { "queues list: \n $queuesListExecResult" }
+                        val queuesListExecResult = getQueuesInfo(it)
+                        LOGGER.info { "queues list: \n $queuesListExecResult" }
 
-                    assertEquals(1, getSubscribedChannelsCount(it, queueName))
-                    assertEquals(4, counter.get()) { "Wrong number of received messages" }
-                    assertTrue(
-                        queuesListExecResult.toString().contains("$queueName\t0")
-                    ) { "There should be no messages left in the queue" }
+                        assertEquals(1, getSubscribedChannelsCount(it, queueName))
+                        assertEquals(4, counter.get()) { "Wrong number of received messages" }
+                        assertTrue(
+                            queuesListExecResult.toString().contains("$queueName\t0")
+                        ) { "There should be no messages left in the queue" }
 
+                    }
                 }
             }
     }
@@ -801,7 +812,7 @@ class TestConnectionManager {
             .let {
                 LOGGER.info { "Started with port ${it.amqpPort}" }
                 val counter = AtomicInteger(0)
-                createConnectionManager(
+                createConsumeConnectionManager(
                     it,
                     ConnectionManagerConfiguration(
                         subscriberName = "test",
@@ -852,7 +863,7 @@ class TestConnectionManager {
             .let {
                 LOGGER.info { "Started with port ${it.amqpPort}" }
                 val counter = AtomicInteger(0)
-                createConnectionManager(
+                createConsumeConnectionManager(
                     it,
                     ConnectionManagerConfiguration(
                         subscriberName = "test",
@@ -976,7 +987,7 @@ class TestConnectionManager {
                 )
 
                 val testCasesContexts: List<TestCaseContext> = testCases.map { params ->
-                    val connectionManager = createConnectionManager(
+                    val connectionManager = createConsumeConnectionManager(
                         container,
                         ConnectionManagerConfiguration(
                             subscriberName = params.subscriberName,
@@ -1099,16 +1110,17 @@ class TestConnectionManager {
         assertEquals(target, func(), message)
     }
 
-    private fun createConnectionManager(container: RabbitMQContainer, configuration: ConnectionManagerConfiguration) =
-        ConnectionManager(
-            "test-connection",
-            RabbitMQConfiguration(
-                host = container.host,
-                vHost = "",
-                port = container.amqpPort,
-                username = container.adminUsername,
-                password = container.adminPassword,
-            ),
+    private fun createPublishConnectionManager(container: RabbitMQContainer, configuration: ConnectionManagerConfiguration) =
+        PublishConnectionManager(
+            "test-publish-connection",
+            getRabbitMQConfiguration(container),
+            configuration
+        )
+
+    private fun createConsumeConnectionManager(container: RabbitMQContainer, configuration: ConnectionManagerConfiguration) =
+        ConsumeConnectionManager(
+            "test-consume-connection",
+            getRabbitMQConfiguration(container),
             configuration
         )
 
@@ -1118,24 +1130,26 @@ class TestConnectionManager {
             rabbitMQContainer.start()
             LOGGER.info { "Started with port ${rabbitMQContainer.amqpPort}" }
 
-            createConnectionManager(rabbitMQContainer).use { firstManager ->
-                createConnectionManager(rabbitMQContainer).use { secondManager ->
-                    val queue = firstManager.queueDeclare()
+            createConsumeConnectionManager(rabbitMQContainer).use { firstConsumeManager ->
+                createConsumeConnectionManager(rabbitMQContainer).use { secondConsumeManager ->
+                    createPublishConnectionManager(rabbitMQContainer).use { publishManager ->
+                        val queue = firstConsumeManager.queueDeclare()
 
-                    assertFailsWith<IOException>("Another connection can subscribe to the $queue queue") {
-                        secondManager.basicConsume(queue, { _, _, _ -> }, {})
-                    }
+                        assertFailsWith<IOException>("Another connection can subscribe to the $queue queue") {
+                            secondConsumeManager.basicConsume(queue, { _, _, _ -> }, {})
+                        }
 
-                    extracted(firstManager, secondManager, queue, 3)
-                    extracted(firstManager, secondManager, queue, 6)
+                        extracted(firstConsumeManager, publishManager, queue, 3)
+                        extracted(firstConsumeManager, publishManager, queue, 6)
+                    }
                 }
             }
         }
     }
 
     private fun extracted(
-        firstManager: ConnectionManager,
-        secondManager: ConnectionManager,
+        firstManager: ConsumeConnectionManager,
+        publishManager: PublishConnectionManager,
         queue: String,
         cycle: Int
     ) {
@@ -1151,7 +1165,7 @@ class TestConnectionManager {
         val secondMonitor = firstManager.basicConsume(queue, deliverCallback, cancelCallback)
 
         repeat(cycle) { index ->
-            secondManager.basicPublish(
+            publishManager.basicPublish(
                 "",
                 queue,
                 null,
@@ -1173,24 +1187,30 @@ class TestConnectionManager {
         secondMonitor.unsubscribe()
     }
 
-    private fun createConnectionManager(
+    private fun getConnectionManagerConfiguration(prefetchCount: Int, confirmationTimeout: Duration) = ConnectionManagerConfiguration(
+        subscriberName = "test",
+        prefetchCount = prefetchCount,
+        confirmationTimeout = confirmationTimeout
+    )
+
+    private fun createPublishConnectionManager(
         rabbitMQContainer: RabbitMQContainer,
         prefetchCount: Int = DEFAULT_PREFETCH_COUNT,
         confirmationTimeout: Duration = DEFAULT_CONFIRMATION_TIMEOUT,
-    ) = ConnectionManager(
-        "test-connection",
-        RabbitMQConfiguration(
-            host = rabbitMQContainer.host,
-            vHost = "",
-            port = rabbitMQContainer.amqpPort,
-            username = rabbitMQContainer.adminUsername,
-            password = rabbitMQContainer.adminPassword,
-        ),
-        ConnectionManagerConfiguration(
-            subscriberName = "test",
-            prefetchCount = prefetchCount,
-            confirmationTimeout = confirmationTimeout
-        )
+    ) = PublishConnectionManager(
+        "test-publish-connection",
+        getRabbitMQConfiguration(rabbitMQContainer),
+        getConnectionManagerConfiguration(prefetchCount, confirmationTimeout)
+    )
+
+    private fun createConsumeConnectionManager(
+        rabbitMQContainer: RabbitMQContainer,
+        prefetchCount: Int = DEFAULT_PREFETCH_COUNT,
+        confirmationTimeout: Duration = DEFAULT_CONFIRMATION_TIMEOUT,
+    ) = ConsumeConnectionManager(
+        "test-consume-connection",
+        getRabbitMQConfiguration(rabbitMQContainer),
+        getConnectionManagerConfiguration(prefetchCount, confirmationTimeout)
     )
 
     @Test
@@ -1209,83 +1229,89 @@ class TestConnectionManager {
             val blockAfter = 3
             val countDown = CountDownLatch(messagesCount)
             val messageSizeBytes = 7
-            createConnectionManager(
-                rabbit, ConnectionManagerConfiguration(
-                    subscriberName = "test",
-                    prefetchCount = DEFAULT_PREFETCH_COUNT,
-                    confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT,
-                    enablePublisherConfirmation = true,
-                    maxInflightPublicationsBytes = messagesCount * messageSizeBytes,
-                    heartbeatIntervalSeconds = 1,
-                    minConnectionRecoveryTimeout = 2000,
-                    maxConnectionRecoveryTimeout = 2000,
-                    // to avoid unexpected delays before recovery
-                    retryTimeDeviationPercent = 0
-                )
-            ).use { manager ->
-                repeat(messagesCount) { index ->
-                    if (index == blockAfter) {
-                        assertFalse(manager.isPublishingBlocked)
-
-                        // blocks all publishers ( https://www.rabbitmq.com/docs/memory )
-                        rabbit.executeInContainerWithLogging("rabbitmqctl", "set_vm_memory_high_watermark", "0")
-                    }
+            val connectionManagerConfiguration = ConnectionManagerConfiguration(
+                subscriberName = "test",
+                prefetchCount = DEFAULT_PREFETCH_COUNT,
+                confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT,
+                enablePublisherConfirmation = true,
+                maxInflightPublicationsBytes = messagesCount * messageSizeBytes,
+                heartbeatIntervalSeconds = 1,
+                minConnectionRecoveryTimeout = 2000,
+                maxConnectionRecoveryTimeout = 2000,
+                // to avoid unexpected delays before recovery
+                retryTimeDeviationPercent = 0
+            )
+            createConsumeConnectionManager(rabbit, connectionManagerConfiguration).use { consumeManager ->
+                createPublishConnectionManager(rabbit, connectionManagerConfiguration).use { publishManager ->
+                    repeat(messagesCount) { index ->
+                        if (index == blockAfter) {
+                            assertFalse(publishManager.isPublishingBlocked)
 
-                    manager.basicPublish(exchange, routingKey, null, "Hello $index".toByteArray(Charsets.UTF_8))
-                    LOGGER.info("Published $index")
+                            // blocks all publishers ( https://www.rabbitmq.com/docs/memory )
+                            rabbit.executeInContainerWithLogging("rabbitmqctl", "set_vm_memory_high_watermark", "0")
+                        }
 
-                    if (index == blockAfter) {
-                        // wait for blocking of publishing connection
-                        Awaitility.await("publishing blocked")
-                            .pollInterval(10L, TimeUnit.MILLISECONDS)
-                            .atMost(100L, TimeUnit.MILLISECONDS)
-                            .until { manager.isPublishingBlocked }
+                        publishManager.basicPublish(
+                            exchange,
+                            routingKey,
+                            null,
+                            "Hello $index".toByteArray(Charsets.UTF_8)
+                        )
+                        LOGGER.info("Published $index")
+
+                        if (index == blockAfter) {
+                            // wait for blocking of publishing connection
+                            Awaitility.await("publishing blocked")
+                                .pollInterval(10L, TimeUnit.MILLISECONDS)
+                                .atMost(100L, TimeUnit.MILLISECONDS)
+                                .until { publishManager.isPublishingBlocked }
+                        }
                     }
-                }
 
-                val receivedMessages = linkedSetOf<String>()
-                LOGGER.info { "creating consumer" }
-
-                val subscribeFuture = Executors.newSingleThreadExecutor().submit {
-                    manager.basicConsume(queueName, { _, delivery, ack ->
-                        val message = delivery.body.toString(Charsets.UTF_8)
-                        LOGGER.info { "Received $message from ${delivery.envelope.routingKey}" }
-                        if (receivedMessages.add(message)) {
-                            // decrement only unique messages
-                            countDown.countDown()
-                        } else {
-                            LOGGER.warn { "Duplicated $message for ${delivery.envelope.routingKey}" }
+                    val receivedMessages = linkedSetOf<String>()
+                    LOGGER.info { "creating consumer" }
+
+                    val subscribeFuture = Executors.newSingleThreadExecutor().submit {
+                        consumeManager.basicConsume(queueName, { _, delivery, ack ->
+                            val message = delivery.body.toString(Charsets.UTF_8)
+                            LOGGER.info { "Received $message from ${delivery.envelope.routingKey}" }
+                            if (receivedMessages.add(message)) {
+                                // decrement only unique messages
+                                countDown.countDown()
+                            } else {
+                                LOGGER.warn { "Duplicated $message for ${delivery.envelope.routingKey}" }
+                            }
+                            ack.confirm()
+                        }) {
+                            LOGGER.info { "Canceled $it" }
                         }
-                        ack.confirm()
-                    }) {
-                        LOGGER.info { "Canceled $it" }
                     }
-                }
 
-                assertDoesNotThrow("Failed to subscribe to queue") {
-                    // if subscription connection is blocked generates TimeoutException
-                    subscribeFuture.get(1, TimeUnit.SECONDS)
-                    subscribeFuture.cancel(true)
-                }
+                    assertDoesNotThrow("Failed to subscribe to queue") {
+                        // if subscription connection is blocked generates TimeoutException
+                        subscribeFuture.get(1, TimeUnit.SECONDS)
+                        subscribeFuture.cancel(true)
+                    }
 
-                Awaitility.await("receive messages sent before blocking")
-                    .pollInterval(10L, TimeUnit.MILLISECONDS)
-                    .atMost(100L, TimeUnit.MILLISECONDS)
-                    .until { blockAfter.toLong() == messagesCount - countDown.count }
+                    Awaitility.await("receive messages sent before blocking")
+                        .pollInterval(10L, TimeUnit.MILLISECONDS)
+                        .atMost(100L, TimeUnit.MILLISECONDS)
+                        .until { blockAfter.toLong() == messagesCount - countDown.count }
 
-                Thread.sleep(100) // ensure no more messages received
-                assertEquals(blockAfter.toLong(), messagesCount - countDown.count)
-                assertTrue(manager.isPublishingBlocked)
+                    Thread.sleep(100) // ensure no more messages received
+                    assertEquals(blockAfter.toLong(), messagesCount - countDown.count)
+                    assertTrue(publishManager.isPublishingBlocked)
 
-                // unblocks publishers
-                rabbit.executeInContainerWithLogging("rabbitmqctl", "set_vm_memory_high_watermark", "0.4")
-                assertFalse(manager.isPublishingBlocked)
+                    // unblocks publishers
+                    rabbit.executeInContainerWithLogging("rabbitmqctl", "set_vm_memory_high_watermark", "0.4")
+                    assertFalse(publishManager.isPublishingBlocked)
 
-                // delay receiving all messages
-                Awaitility.await("all messages received")
-                    .pollInterval(10L, TimeUnit.MILLISECONDS)
-                    .atMost(100L, TimeUnit.MILLISECONDS)
-                    .until { countDown.count == 0L }
+                    // delay receiving all messages
+                    Awaitility.await("all messages received")
+                        .pollInterval(10L, TimeUnit.MILLISECONDS)
+                        .atMost(100L, TimeUnit.MILLISECONDS)
+                        .until { countDown.count == 0L }
+                }
             }
         }
     }
diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/IntegrationTestRabbitMessageBatchRouter.kt b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/IntegrationTestRabbitMessageBatchRouter.kt
index 0dd44ec8b..34b0458ae 100644
--- a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/IntegrationTestRabbitMessageBatchRouter.kt
+++ b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/IntegrationTestRabbitMessageBatchRouter.kt
@@ -28,8 +28,9 @@ import com.exactpro.th2.common.schema.message.ContainerConstants.ROUTING_KEY
 import com.exactpro.th2.common.schema.message.configuration.MessageRouterConfiguration
 import com.exactpro.th2.common.schema.message.impl.context.DefaultMessageRouterContext
 import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.ConnectionManagerConfiguration
-import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.RabbitMQConfiguration
-import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.PublishConnectionManager
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConsumeConnectionManager
+import com.exactpro.th2.common.util.getRabbitMQConfiguration
 import com.rabbitmq.client.BuiltinExchangeType
 import mu.KotlinLogging
 import org.junit.jupiter.api.Test
@@ -49,18 +50,24 @@ class IntegrationTestRabbitMessageGroupBatchRouter {
                 rabbitMQContainer.start()
                 LOGGER.info { "Started with port ${rabbitMQContainer.amqpPort}" }
 
-                createConnectionManager(rabbitMQContainer).use { firstManager ->
-                    createRouter(firstManager).use { firstRouter ->
-                        createConnectionManager(rabbitMQContainer).use { secondManager ->
-                            createRouter(secondManager).use { secondRouter ->
-                                val counter = CountDownLatch(1)
-                                val monitor = firstRouter.subscribeExclusive { _, _ -> counter.countDown() }
-                                try {
-                                    secondRouter.sendExclusive(monitor.queue, MessageGroupBatch.getDefaultInstance())
-                                    assertTrue("Message is not received") { counter.await(1, TimeUnit.SECONDS) }
-
-                                } finally {
-                                    monitor.unsubscribe()
+                createConsumeConnectionManager(rabbitMQContainer).use { firstConsumeManager ->
+                    createPublishConnectionManager(rabbitMQContainer).use { firstPublishManager ->
+                        createRouter(firstPublishManager, firstConsumeManager).use { firstRouter ->
+                            createConsumeConnectionManager(rabbitMQContainer).use { secondConsumeManager ->
+                                createPublishConnectionManager(rabbitMQContainer).use { secondPublishManager ->
+                                    createRouter(secondPublishManager, secondConsumeManager).use { secondRouter ->
+                                        val counter = CountDownLatch(1)
+                                        val monitor = firstRouter.subscribeExclusive { _, _ -> counter.countDown() }
+                                        try {
+                                            secondRouter.sendExclusive(
+                                                monitor.queue,
+                                                MessageGroupBatch.getDefaultInstance()
+                                            )
+                                            assertTrue("Message is not received") { counter.await(1, TimeUnit.SECONDS) }
+                                        } finally {
+                                            monitor.unsubscribe()
+                                        }
+                                    }
                                 }
                             }
                         }
@@ -79,19 +86,23 @@ class IntegrationTestRabbitMessageGroupBatchRouter {
                 rabbitMQContainer.start()
                 LOGGER.info { "Started with port ${rabbitMQContainer.amqpPort}" }
 
-                createConnectionManager(rabbitMQContainer).use { firstManager ->
-                    createRouter(firstManager).use { firstRouter ->
-                        createConnectionManager(rabbitMQContainer).use { secondManager ->
-                            createRouter(secondManager).use { secondRouter ->
-                                val counter = CountDownLatch(1)
-                                val monitor = firstRouter.subscribeExclusive { _, _ -> counter.countDown() }
-                                try {
+                createConsumeConnectionManager(rabbitMQContainer).use { firstConsumeManager ->
+                    createPublishConnectionManager(rabbitMQContainer).use { firstPublishManager ->
+                        createRouter(firstPublishManager, firstConsumeManager).use { firstRouter ->
+                            createConsumeConnectionManager(rabbitMQContainer).use { secondConsumeManager ->
+                                createPublishConnectionManager(rabbitMQContainer).use { secondPublishManager ->
+                                    createRouter(secondPublishManager, secondConsumeManager).use { secondRouter ->
+                                        val counter = CountDownLatch(1)
+                                        val monitor = firstRouter.subscribeExclusive { _, _ -> counter.countDown() }
+                                        try {
 
-                                    secondRouter.sendExclusive(monitor.queue, MessageGroupBatch.getDefaultInstance())
-                                    assertTrue("Message is not received") { counter.await(1, TimeUnit.SECONDS) }
+                                            secondRouter.sendExclusive(monitor.queue, MessageGroupBatch.getDefaultInstance())
+                                            assertTrue("Message is not received") { counter.await(1, TimeUnit.SECONDS) }
 
-                                } finally {
-                                    monitor.unsubscribe()
+                                        } finally {
+                                            monitor.unsubscribe()
+                                        }
+                                    }
                                 }
                             }
                         }
@@ -100,12 +111,12 @@ class IntegrationTestRabbitMessageGroupBatchRouter {
             }
     }
 
-    private fun createRouter(connectionManager: ConnectionManager) = RabbitMessageGroupBatchRouter()
+    private fun createRouter(publishConnectionManager: PublishConnectionManager, consumeConnectionManager: ConsumeConnectionManager) = RabbitMessageGroupBatchRouter()
         .apply {
             init(
                 DefaultMessageRouterContext(
-                    connectionManager,
-                    connectionManager,
+                    publishConnectionManager,
+                    consumeConnectionManager,
                     mock { },
                     MessageRouterConfiguration(),
                     BoxConfiguration()
@@ -113,24 +124,30 @@ class IntegrationTestRabbitMessageGroupBatchRouter {
             )
         }
 
-    private fun createConnectionManager(
+    private fun getConnectionManagerConfiguration(prefetchCount: Int, confirmationTimeout: Duration) = ConnectionManagerConfiguration(
+        subscriberName = "test",
+        prefetchCount = prefetchCount,
+        confirmationTimeout = confirmationTimeout
+    )
+
+    private fun createPublishConnectionManager(
+        rabbitMQContainer: RabbitMQContainer,
+        prefetchCount: Int = DEFAULT_PREFETCH_COUNT,
+        confirmationTimeout: Duration = DEFAULT_CONFIRMATION_TIMEOUT
+    ) = PublishConnectionManager(
+        "test-publish-connection",
+        getRabbitMQConfiguration(rabbitMQContainer),
+        getConnectionManagerConfiguration(prefetchCount, confirmationTimeout)
+    )
+
+    private fun createConsumeConnectionManager(
         rabbitMQContainer: RabbitMQContainer,
         prefetchCount: Int = DEFAULT_PREFETCH_COUNT,
         confirmationTimeout: Duration = DEFAULT_CONFIRMATION_TIMEOUT
-    ) = ConnectionManager(
-        "test-connection",
-        RabbitMQConfiguration(
-            host = rabbitMQContainer.host,
-            vHost = "",
-            port = rabbitMQContainer.amqpPort,
-            username = rabbitMQContainer.adminUsername,
-            password = rabbitMQContainer.adminPassword,
-        ),
-        ConnectionManagerConfiguration(
-            subscriberName = "test",
-            prefetchCount = prefetchCount,
-            confirmationTimeout = confirmationTimeout,
-        ),
+    ) = ConsumeConnectionManager(
+        "test-consume-connection",
+        getRabbitMQConfiguration(rabbitMQContainer),
+        getConnectionManagerConfiguration(prefetchCount, confirmationTimeout)
     )
 
     companion object {
diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/TestRabbitMessageBatchRouter.kt b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/TestRabbitMessageBatchRouter.kt
index ed05dc4b2..49ff54078 100644
--- a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/TestRabbitMessageBatchRouter.kt
+++ b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/TestRabbitMessageBatchRouter.kt
@@ -1,5 +1,5 @@
 /*
- * Copyright 2021 Exactpro (Exactpro Systems Limited)
+ * Copyright 2021-2024 Exactpro (Exactpro Systems Limited)
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -33,7 +33,8 @@ import com.exactpro.th2.common.schema.message.configuration.MqRouterFilterConfig
 import com.exactpro.th2.common.schema.message.configuration.QueueConfiguration
 import com.exactpro.th2.common.schema.message.impl.context.DefaultMessageRouterContext
 import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.ConnectionManagerConfiguration
-import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.PublishConnectionManager
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConsumeConnectionManager
 import org.junit.jupiter.api.Assertions
 import org.junit.jupiter.api.Assertions.assertArrayEquals
 import org.junit.jupiter.api.Assertions.assertThrows
@@ -46,13 +47,15 @@ import org.mockito.kotlin.argumentCaptor
 import org.mockito.kotlin.eq
 import org.mockito.kotlin.mock
 import org.mockito.kotlin.never
-import org.mockito.kotlin.times
 import org.mockito.kotlin.verify
 
 class TestRabbitMessageGroupBatchRouter {
     private val connectionConfiguration = ConnectionManagerConfiguration()
     private val managerMonitor: ExclusiveSubscriberMonitor = mock { }
-    private val connectionManager: ConnectionManager = mock {
+    private val publishConnectionManager: PublishConnectionManager = mock {
+        on { configuration }.thenReturn(connectionConfiguration)
+    }
+    private val consumeConnectionManager: ConsumeConnectionManager = mock {
         on { configuration }.thenReturn(connectionConfiguration)
         on { basicConsume(any(), any(), any()) }.thenReturn(managerMonitor)
     }
@@ -116,7 +119,7 @@ class TestRabbitMessageGroupBatchRouter {
             router.send(batch, "test")
 
             val captor = argumentCaptor<ByteArray>()
-            verify(connectionManager).basicPublish(eq("test-exchange"), eq("test2"), anyOrNull(), captor.capture())
+            verify(publishConnectionManager).basicPublish(eq("test-exchange"), eq("test2"), anyOrNull(), captor.capture())
             val publishedBytes = captor.firstValue
             assertArrayEquals(batch.toByteArray(), publishedBytes) {
                 "Unexpected batch published: ${MessageGroupBatch.parseFrom(publishedBytes)}"
@@ -132,7 +135,7 @@ class TestRabbitMessageGroupBatchRouter {
                     ).build()
             )
 
-            verify(connectionManager, never()).basicPublish(any(), any(), anyOrNull(), any())
+            verify(publishConnectionManager, never()).basicPublish(any(), any(), anyOrNull(), any())
         }
 
         @Test
@@ -144,7 +147,7 @@ class TestRabbitMessageGroupBatchRouter {
             router.send(batch, "test")
 
             val captor = argumentCaptor<ByteArray>()
-            verify(connectionManager).basicPublish(eq("test-exchange"), eq("test2"), anyOrNull(), captor.capture())
+            verify(publishConnectionManager).basicPublish(eq("test-exchange"), eq("test2"), anyOrNull(), captor.capture())
             val publishedBytes = captor.firstValue
             assertArrayEquals(batch.toByteArray(), publishedBytes) {
                 "Unexpected batch published: ${MessageGroupBatch.parseFrom(publishedBytes)}"
@@ -192,8 +195,8 @@ class TestRabbitMessageGroupBatchRouter {
             router.sendAll(batch)
 
             val captor = argumentCaptor<ByteArray>()
-            verify(connectionManager).basicPublish(eq("test-exchange"), eq("test"), anyOrNull(), captor.capture())
-            verify(connectionManager).basicPublish(eq("test-exchange"), eq("test2"), anyOrNull(), captor.capture())
+            verify(publishConnectionManager).basicPublish(eq("test-exchange"), eq("test"), anyOrNull(), captor.capture())
+            verify(publishConnectionManager).basicPublish(eq("test-exchange"), eq("test2"), anyOrNull(), captor.capture())
             val originalBytes = batch.toByteArray()
             Assertions.assertAll(
                 Executable {
@@ -251,7 +254,7 @@ class TestRabbitMessageGroupBatchRouter {
             router.send(batch, "test")
 
             val captor = argumentCaptor<ByteArray>()
-            verify(connectionManager).basicPublish(eq("test-exchange"), eq("publish"), anyOrNull(), captor.capture())
+            verify(publishConnectionManager).basicPublish(eq("test-exchange"), eq("publish"), anyOrNull(), captor.capture())
             val publishedBytes = captor.firstValue
             assertArrayEquals(batch.toByteArray(), publishedBytes) {
                 "Unexpected batch published: ${MessageGroupBatch.parseFrom(publishedBytes)}"
@@ -297,11 +300,11 @@ class TestRabbitMessageGroupBatchRouter {
     private fun createRouter(pins: Map<String, QueueConfiguration>): MessageRouter<MessageGroupBatch> =
         RabbitMessageGroupBatchRouter().apply {
             init(DefaultMessageRouterContext(
-                connectionManager,
-                connectionManager,
+                publishConnectionManager,
+                consumeConnectionManager,
                 mock { },
                 MessageRouterConfiguration(pins, GlobalNotificationConfiguration()),
                 BOX_CONFIGURATION
             ))
         }
-}
+}
\ No newline at end of file
diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/TransportGroupBatchRouterIntegrationTest.kt b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/TransportGroupBatchRouterIntegrationTest.kt
index 60d61522e..2c666b18b 100644
--- a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/TransportGroupBatchRouterIntegrationTest.kt
+++ b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/TransportGroupBatchRouterIntegrationTest.kt
@@ -27,8 +27,9 @@ import com.exactpro.th2.common.schema.message.ContainerConstants.ROUTING_KEY
 import com.exactpro.th2.common.schema.message.configuration.MessageRouterConfiguration
 import com.exactpro.th2.common.schema.message.impl.context.DefaultMessageRouterContext
 import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.ConnectionManagerConfiguration
-import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.RabbitMQConfiguration
-import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.PublishConnectionManager
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConsumeConnectionManager
+import com.exactpro.th2.common.util.getRabbitMQConfiguration
 import com.rabbitmq.client.BuiltinExchangeType
 import mu.KotlinLogging
 import org.junit.jupiter.api.Test
@@ -49,21 +50,25 @@ class TransportGroupBatchRouterIntegrationTest {
                 rabbitMQContainer.start()
                 LOGGER.info { "Started with port ${rabbitMQContainer.amqpPort}" }
 
-                createConnectionManager(rabbitMQContainer).use { firstManager ->
-                    createRouter(firstManager).use { firstRouter ->
-                        createConnectionManager(rabbitMQContainer).use { secondManager ->
-                            createRouter(secondManager).use { secondRouter ->
-                                val counter = CountDownLatch(1)
-                                val monitor = firstRouter.subscribeExclusive { _, _ -> counter.countDown() }
-                                try {
-                                    secondRouter.sendExclusive(monitor.queue, GroupBatch.builder()
-                                        .setBook("")
-                                        .setSessionGroup("")
-                                        .build())
-                                    assertTrue("Message is not received") { counter.await(1, TimeUnit.SECONDS) }
+                createConsumeConnectionManager(rabbitMQContainer).use { firstConsumeManager ->
+                    createPublishConnectionManager(rabbitMQContainer).use { firstPublishManager ->
+                        createRouter(firstPublishManager, firstConsumeManager).use { firstRouter ->
+                            createConsumeConnectionManager(rabbitMQContainer).use { secondConsumeManager ->
+                                createPublishConnectionManager(rabbitMQContainer).use { secondPublishManager ->
+                                    createRouter(secondPublishManager, secondConsumeManager).use { secondRouter ->
+                                        val counter = CountDownLatch(1)
+                                        val monitor = firstRouter.subscribeExclusive { _, _ -> counter.countDown() }
+                                        try {
+                                            secondRouter.sendExclusive(monitor.queue, GroupBatch.builder()
+                                                .setBook("")
+                                                .setSessionGroup("")
+                                                .build())
+                                            assertTrue("Message is not received") { counter.await(1, TimeUnit.SECONDS) }
 
-                                } finally {
-                                    monitor.unsubscribe()
+                                        } finally {
+                                            monitor.unsubscribe()
+                                        }
+                                    }
                                 }
                             }
                         }
@@ -82,22 +87,26 @@ class TransportGroupBatchRouterIntegrationTest {
                 rabbitMQContainer.start()
                 LOGGER.info { "Started with port ${rabbitMQContainer.amqpPort}" }
 
-                createConnectionManager(rabbitMQContainer).use { firstManager ->
-                    createRouter(firstManager).use { firstRouter ->
-                        createConnectionManager(rabbitMQContainer).use { secondManager ->
-                            createRouter(secondManager).use { secondRouter ->
-                                val counter = CountDownLatch(1)
-                                val monitor = firstRouter.subscribeExclusive { _, _ -> counter.countDown() }
-                                try {
+                createConsumeConnectionManager(rabbitMQContainer).use { firstConsumeManager ->
+                    createPublishConnectionManager(rabbitMQContainer).use { firstPublishManager ->
+                        createRouter(firstPublishManager, firstConsumeManager).use { firstRouter ->
+                            createConsumeConnectionManager(rabbitMQContainer).use { secondConsumeManager ->
+                                createPublishConnectionManager(rabbitMQContainer).use { secondPublishManager ->
+                                    createRouter(secondPublishManager, secondConsumeManager).use { secondRouter ->
+                                        val counter = CountDownLatch(1)
+                                        val monitor = firstRouter.subscribeExclusive { _, _ -> counter.countDown() }
+                                        try {
 
-                                    secondRouter.sendExclusive(monitor.queue, GroupBatch.builder()
-                                        .setBook("")
-                                        .setSessionGroup("")
-                                        .build())
-                                    assertTrue("Message is not received") { counter.await(1, TimeUnit.SECONDS) }
+                                            secondRouter.sendExclusive(monitor.queue, GroupBatch.builder()
+                                                .setBook("")
+                                                .setSessionGroup("")
+                                                .build())
+                                            assertTrue("Message is not received") { counter.await(1, TimeUnit.SECONDS) }
 
-                                } finally {
-                                    monitor.unsubscribe()
+                                        } finally {
+                                            monitor.unsubscribe()
+                                        }
+                                    }
                                 }
                             }
                         }
@@ -106,12 +115,12 @@ class TransportGroupBatchRouterIntegrationTest {
             }
     }
 
-    private fun createRouter(connectionManager: ConnectionManager) = TransportGroupBatchRouter()
+    private fun createRouter(publishConnectionManager: PublishConnectionManager, consumeConnectionManager: ConsumeConnectionManager) = TransportGroupBatchRouter()
         .apply {
             init(
                 DefaultMessageRouterContext(
-                    connectionManager,
-                    connectionManager,
+                    publishConnectionManager,
+                    consumeConnectionManager,
                     mock { },
                     MessageRouterConfiguration(),
                     BoxConfiguration()
@@ -119,24 +128,30 @@ class TransportGroupBatchRouterIntegrationTest {
             )
         }
 
-    private fun createConnectionManager(
+    private fun getConnectionManagerConfiguration(prefetchCount: Int, confirmationTimeout: Duration) = ConnectionManagerConfiguration(
+        subscriberName = "test",
+        prefetchCount = prefetchCount,
+        confirmationTimeout = confirmationTimeout
+    )
+
+    private fun createPublishConnectionManager(
+        rabbitMQContainer: RabbitMQContainer,
+        prefetchCount: Int = DEFAULT_PREFETCH_COUNT,
+        confirmationTimeout: Duration = DEFAULT_CONFIRMATION_TIMEOUT
+    ) = PublishConnectionManager(
+        "test-publish-connection",
+        getRabbitMQConfiguration(rabbitMQContainer),
+        getConnectionManagerConfiguration(prefetchCount, confirmationTimeout)
+    )
+
+    private fun createConsumeConnectionManager(
         rabbitMQContainer: RabbitMQContainer,
         prefetchCount: Int = DEFAULT_PREFETCH_COUNT,
         confirmationTimeout: Duration = DEFAULT_CONFIRMATION_TIMEOUT
-    ) = ConnectionManager(
-        "test-connection",
-        RabbitMQConfiguration(
-            host = rabbitMQContainer.host,
-            vHost = "",
-            port = rabbitMQContainer.amqpPort,
-            username = rabbitMQContainer.adminUsername,
-            password = rabbitMQContainer.adminPassword,
-        ),
-        ConnectionManagerConfiguration(
-            subscriberName = "test",
-            prefetchCount = prefetchCount,
-            confirmationTimeout = confirmationTimeout,
-        ),
+    ) = ConsumeConnectionManager(
+        "test-consume-connection",
+        getRabbitMQConfiguration(rabbitMQContainer),
+        getConnectionManagerConfiguration(prefetchCount, confirmationTimeout)
     )
 
     companion object {
diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/TransportGroupBatchRouterTest.kt b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/TransportGroupBatchRouterTest.kt
index fc094109b..60fc434b1 100644
--- a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/TransportGroupBatchRouterTest.kt
+++ b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/TransportGroupBatchRouterTest.kt
@@ -1,5 +1,5 @@
 /*
- * Copyright 2023 Exactpro (Exactpro Systems Limited)
+ * Copyright 2023-2024 Exactpro (Exactpro Systems Limited)
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -20,15 +20,20 @@ import com.exactpro.th2.common.event.bean.BaseTest.*
 import com.exactpro.th2.common.grpc.MessageGroupBatch
 import com.exactpro.th2.common.schema.message.ExclusiveSubscriberMonitor
 import com.exactpro.th2.common.schema.message.MessageRouter
-import com.exactpro.th2.common.schema.message.configuration.*
+import com.exactpro.th2.common.schema.message.configuration.QueueConfiguration
+import com.exactpro.th2.common.schema.message.configuration.MqRouterFilterConfiguration
+import com.exactpro.th2.common.schema.message.configuration.FieldFilterConfiguration
+import com.exactpro.th2.common.schema.message.configuration.FieldFilterOperation
+import com.exactpro.th2.common.schema.message.configuration.MessageRouterConfiguration
+import com.exactpro.th2.common.schema.message.configuration.GlobalNotificationConfiguration
 import com.exactpro.th2.common.schema.message.impl.context.DefaultMessageRouterContext
 import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.ConnectionManagerConfiguration
-import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.PublishConnectionManager
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConsumeConnectionManager
 import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.TransportGroupBatchRouter.Companion.TRANSPORT_GROUP_ATTRIBUTE
 import io.netty.buffer.Unpooled
 import org.junit.jupiter.api.Assertions
 import org.junit.jupiter.api.Assertions.assertArrayEquals
-import org.junit.jupiter.api.Assertions.assertNotNull
 import org.junit.jupiter.api.Assertions.assertThrows
 import org.junit.jupiter.api.Nested
 import org.junit.jupiter.api.Test
@@ -39,7 +44,10 @@ import java.time.Instant
 class TransportGroupBatchRouterTest {
     private val connectionConfiguration = ConnectionManagerConfiguration()
     private val managerMonitor: ExclusiveSubscriberMonitor = mock { }
-    private val connectionManager: ConnectionManager = mock {
+    private val publishConnectionManager: PublishConnectionManager = mock {
+        on { configuration }.thenReturn(connectionConfiguration)
+    }
+    private val consumeConnectionManager: ConsumeConnectionManager = mock {
         on { configuration }.thenReturn(connectionConfiguration)
         on { basicConsume(any(), any(), any()) }.thenReturn(managerMonitor)
     }
@@ -98,7 +106,7 @@ class TransportGroupBatchRouterTest {
             router.send(batch, "test")
 
             val captor = argumentCaptor<ByteArray>()
-            verify(connectionManager).basicPublish(eq("test-exchange"), eq("test2"), anyOrNull(), captor.capture())
+            verify(publishConnectionManager).basicPublish(eq("test-exchange"), eq("test2"), anyOrNull(), captor.capture())
             val publishedBytes = captor.firstValue
             assertArrayEquals(batch.toByteArray(), publishedBytes) {
                 "Unexpected batch published: ${MessageGroupBatch.parseFrom(publishedBytes)}"
@@ -109,7 +117,7 @@ class TransportGroupBatchRouterTest {
         fun `does not publish anything if all messages are filtered`() {
             router.send(createGroupBatch("test-message1"))
 
-            verify(connectionManager, never()).basicPublish(any(), any(), anyOrNull(), any())
+            verify(publishConnectionManager, never()).basicPublish(any(), any(), anyOrNull(), any())
         }
 
         @Test
@@ -118,7 +126,7 @@ class TransportGroupBatchRouterTest {
             router.send(batch, "test")
 
             val captor = argumentCaptor<ByteArray>()
-            verify(connectionManager).basicPublish(eq("test-exchange"), eq("test2"), anyOrNull(), captor.capture())
+            verify(publishConnectionManager).basicPublish(eq("test-exchange"), eq("test2"), anyOrNull(), captor.capture())
             val publishedBytes = captor.firstValue
             assertArrayEquals(batch.toByteArray(), publishedBytes) {
                 "Unexpected batch published: ${MessageGroupBatch.parseFrom(publishedBytes)}"
@@ -155,8 +163,8 @@ class TransportGroupBatchRouterTest {
             router.sendAll(batch)
 
             val captor = argumentCaptor<ByteArray>()
-            verify(connectionManager).basicPublish(eq("test-exchange"), eq("test"), anyOrNull(), captor.capture())
-            verify(connectionManager).basicPublish(eq("test-exchange"), eq("test2"), anyOrNull(), captor.capture())
+            verify(publishConnectionManager).basicPublish(eq("test-exchange"), eq("test"), anyOrNull(), captor.capture())
+            verify(publishConnectionManager).basicPublish(eq("test-exchange"), eq("test2"), anyOrNull(), captor.capture())
             val originalBytes = batch.toByteArray()
             Assertions.assertAll(
                 Executable {
@@ -206,7 +214,7 @@ class TransportGroupBatchRouterTest {
             router.send(batch, "test")
 
             val captor = argumentCaptor<ByteArray>()
-            verify(connectionManager).basicPublish(eq("test-exchange"), eq("publish"), anyOrNull(), captor.capture())
+            verify(publishConnectionManager).basicPublish(eq("test-exchange"), eq("publish"), anyOrNull(), captor.capture())
             val publishedBytes = captor.firstValue
             assertArrayEquals(batch.toByteArray(), publishedBytes) {
                 "Unexpected batch published: ${MessageGroupBatch.parseFrom(publishedBytes)}"
@@ -283,8 +291,8 @@ class TransportGroupBatchRouterTest {
         TransportGroupBatchRouter().apply {
             init(
                 DefaultMessageRouterContext(
-                    connectionManager,
-                    connectionManager,
+                    publishConnectionManager,
+                    consumeConnectionManager,
                     mock { },
                     MessageRouterConfiguration(pins, GlobalNotificationConfiguration()),
                     BOX_CONFIGURATION
@@ -313,4 +321,4 @@ class TransportGroupBatchRouterTest {
             )
         )
     }
-}
+}
\ No newline at end of file
diff --git a/src/test/kotlin/com/exactpro/th2/common/util/RabbitTestContainerUtil.kt b/src/test/kotlin/com/exactpro/th2/common/util/RabbitTestContainerUtil.kt
index 8d5e82e01..8c28a3d63 100644
--- a/src/test/kotlin/com/exactpro/th2/common/util/RabbitTestContainerUtil.kt
+++ b/src/test/kotlin/com/exactpro/th2/common/util/RabbitTestContainerUtil.kt
@@ -1,5 +1,5 @@
 /*
- * Copyright 2020-2023 Exactpro (Exactpro Systems Limited)
+ * Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@ package com.exactpro.th2.common.util
 import kotlin.random.Random
 import org.testcontainers.containers.Container
 import org.testcontainers.containers.RabbitMQContainer
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.RabbitMQConfiguration
 
 fun declareQueue(rabbit: RabbitMQContainer, queueName: String): Container.ExecResult? =
     execCommandWithSplit(rabbit, "rabbitmqadmin declare queue name=$queueName durable=false")
@@ -65,6 +66,14 @@ fun restartContainer(rabbit: RabbitMQContainer) {
     rabbit.start()
 }
 
+fun getRabbitMQConfiguration(container: RabbitMQContainer) = RabbitMQConfiguration(
+    host = container.host,
+    vHost = "",
+    port = container.amqpPort,
+    username = container.adminUsername,
+    password = container.adminPassword,
+)
+
 private fun execCommandWithSplit(rabbit: RabbitMQContainer, command: String): Container.ExecResult? =
     rabbit.execInContainer(*command.split(" ").toTypedArray())