diff --git a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java index 01a3113c..1cd0c108 100644 --- a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java +++ b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java @@ -853,6 +853,9 @@ public void retryingPublishWithLock(ConnectionManagerConfiguration configuration var recoveryDelay = currentValue.getDelay(); LOGGER.warn("Retrying publishing #{}, waiting for {}ms. Reason: {}", currentValue.getTryNumber(), recoveryDelay, e); TimeUnit.MILLISECONDS.sleep(recoveryDelay); + // cleanup after failure + publishConfirmationListener.remove(msgSeq); + redeliveryQueue.addFirst(currentPayload); // We should not recover the channel if its connection is closed // If we do that the channel will be also auto recovered by RabbitMQ client @@ -862,10 +865,6 @@ public void retryingPublishWithLock(ConnectionManagerConfiguration configuration // so we should redeliver all inflight requests publishConfirmationListener.transferUnconfirmedTo(redeliveryQueue); tempChannel = recreateChannel(); - } else { - // cleanup after failure - publishConfirmationListener.remove(msgSeq); - redeliveryQueue.addFirst(currentPayload); } } } @@ -1174,7 +1173,24 @@ public void put(long deliveryTag, PublicationHolder payload) throws InterruptedE } public void remove(long deliveryTag) { - removeInflightRequests(deliveryTag, false); + if (!enablePublisherConfirmation) { + return; + } + lock.writeLock().lock(); + try { + PublicationHolder holder = inflightRequests.remove(deliveryTag); + if (holder == null) { + return; + } + inflightBytes -= holder.size(); + hasSpaceToWriteCondition.signalAll(); + if (inflightRequests.isEmpty()) { + inflightBytes = 0; + allMessagesConfirmed.signalAll(); + } + } finally { + lock.writeLock().unlock(); + } } @Override diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt index da536e51..da1ecfc3 100644 --- a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt +++ b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt @@ -282,7 +282,8 @@ class TestConnectionManager { .let { declareQueue(rabbit, queueName) LOGGER.info { "Started with port ${it.amqpPort}" } - val counter = AtomicInteger(0) + val counter = AtomicInteger() + val downLatch = CountDownLatch(1) createConnectionManager( it, ConnectionManagerConfiguration( @@ -298,6 +299,7 @@ class TestConnectionManager { try { monitor = connectionManager.basicConsume(queueName, { _, delivery, _ -> counter.incrementAndGet() + downLatch.countDown() LOGGER.info { "Received ${delivery.body.toString(Charsets.UTF_8)} from \"${delivery.envelope.routingKey}\"" } }) { LOGGER.info { "Canceled $it" } @@ -309,19 +311,18 @@ class TestConnectionManager { LOGGER.info { "Publication finished!" } assertEquals( 0, - counter.get() + counter.get(), ) { "Unexpected number of messages received. The first message shouldn't be received" } - Thread.sleep(200) LOGGER.info { "Creating the correct exchange..." } declareFanoutExchangeWithBinding(it, exchange, queueName) - Thread.sleep(200) LOGGER.info { "Exchange created!" } Assertions.assertDoesNotThrow { connectionManager.basicPublish(exchange, "", null, "Hello2".toByteArray(Charsets.UTF_8)) } - Thread.sleep(200) + downLatch.assertComplete(1L, TimeUnit.SECONDS) { "no messages were received" } + assertEquals( 1, counter.get() @@ -967,16 +968,21 @@ class TestConnectionManager { } } - private fun CountDownLatch.assertComplete(message: String) { - assertComplete { message } + private fun CountDownLatch.assertComplete( + message: String, + timeout: Long = 1, + timeUnit: TimeUnit = TimeUnit.SECONDS, + ) { + assertComplete(timeout, timeUnit) { message } } - private fun CountDownLatch.assertComplete(messageSupplier: () -> String) { + private fun CountDownLatch.assertComplete( + timeout: Long = 1, + timeUnit: TimeUnit = TimeUnit.SECONDS, + messageSupplier: () -> String, + ) { assertTrue( - await( - 1L, - TimeUnit.SECONDS - ) + await(timeout, timeUnit) ) { "${messageSupplier()}, actual count: $count" } }