From b859c5927c504d8314c118b6cd35aa4d81bf41c7 Mon Sep 17 00:00:00 2001 From: Oleg Date: Mon, 3 Jun 2024 18:01:55 +0400 Subject: [PATCH] Correct work with confirmation flag. Use correct channel variable when publishing payload --- .../impl/rabbitmq/connection/ConnectionManager.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java index 1cd0c1086..f3444e72d 100644 --- a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java +++ b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java @@ -380,9 +380,12 @@ public void close() { LOGGER.warn("Some messages were not confirmed by broken in channel {} and were not republished. Try to republish messages", id); channelHolder.publishUnconfirmedMessages(); } + LOGGER.info("Waiting for messages confirmation in channel {}", id); try { channelHolder.awaitConfirmations(closeTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException ignored) { + LOGGER.warn("Waiting for messages confirmation in channel {} was interrupted", id); + Thread.currentThread().interrupt(); } } if (channelHolder.hasUnconfirmedMessages()) { @@ -844,7 +847,7 @@ public void retryingPublishWithLock(ConnectionManagerConfiguration configuration // will drain message to queue on next iteration continue; } - currentPayload.publish(channel); + currentPayload.publish(tempChannel); if (redeliveryQueue.isEmpty()) { break; } @@ -1076,9 +1079,6 @@ public void publishUnconfirmedMessages() throws IOException { publishConfirmationListener.transferUnconfirmedTo(redeliveryQueue); while (!redeliveryQueue.isEmpty()) { PublicationHolder holder = redeliveryQueue.pollFirst(); - if (holder == null) { - break; - } holder.publish(channel); } } finally { @@ -1289,9 +1289,9 @@ public void transferUnconfirmedTo(Deque redelivery) { } inflightRequests.clear(); inflightBytes = 0; + noConfirmationWillBeReceived = false; hasSpaceToWriteCondition.signalAll(); allMessagesConfirmed.signalAll(); - noConfirmationWillBeReceived = false; } finally { lock.writeLock().unlock(); }