Skip to content

Commit

Permalink
Fix payload lost during retry attempt
Browse files Browse the repository at this point in the history
  • Loading branch information
OptimumCode committed Jun 2, 2024
1 parent 8c20522 commit a832f9c
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,9 @@ public void retryingPublishWithLock(ConnectionManagerConfiguration configuration
var recoveryDelay = currentValue.getDelay();
LOGGER.warn("Retrying publishing #{}, waiting for {}ms. Reason: {}", currentValue.getTryNumber(), recoveryDelay, e);
TimeUnit.MILLISECONDS.sleep(recoveryDelay);
// cleanup after failure
publishConfirmationListener.remove(msgSeq);
redeliveryQueue.addFirst(currentPayload);

// We should not recover the channel if its connection is closed
// If we do that the channel will be also auto recovered by RabbitMQ client
Expand All @@ -862,10 +865,6 @@ public void retryingPublishWithLock(ConnectionManagerConfiguration configuration
// so we should redeliver all inflight requests
publishConfirmationListener.transferUnconfirmedTo(redeliveryQueue);
tempChannel = recreateChannel();
} else {
// cleanup after failure
publishConfirmationListener.remove(msgSeq);
redeliveryQueue.addFirst(currentPayload);
}
}
}
Expand Down Expand Up @@ -1174,7 +1173,24 @@ public void put(long deliveryTag, PublicationHolder payload) throws InterruptedE
}

public void remove(long deliveryTag) {
removeInflightRequests(deliveryTag, false);
if (!enablePublisherConfirmation) {
return;
}
lock.writeLock().lock();
try {
PublicationHolder holder = inflightRequests.remove(deliveryTag);
if (holder == null) {
return;
}
inflightBytes -= holder.size();
hasSpaceToWriteCondition.signalAll();
if (inflightRequests.isEmpty()) {
inflightBytes = 0;
allMessagesConfirmed.signalAll();
}
} finally {
lock.writeLock().unlock();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,8 @@ class TestConnectionManager {
.let {
declareQueue(rabbit, queueName)
LOGGER.info { "Started with port ${it.amqpPort}" }
val counter = AtomicInteger(0)
val counter = AtomicInteger()
val downLatch = CountDownLatch(1)
createConnectionManager(
it,
ConnectionManagerConfiguration(
Expand All @@ -298,6 +299,7 @@ class TestConnectionManager {
try {
monitor = connectionManager.basicConsume(queueName, { _, delivery, _ ->
counter.incrementAndGet()
downLatch.countDown()
LOGGER.info { "Received ${delivery.body.toString(Charsets.UTF_8)} from \"${delivery.envelope.routingKey}\"" }
}) {
LOGGER.info { "Canceled $it" }
Expand All @@ -309,19 +311,18 @@ class TestConnectionManager {
LOGGER.info { "Publication finished!" }
assertEquals(
0,
counter.get()
counter.get(),
) { "Unexpected number of messages received. The first message shouldn't be received" }
Thread.sleep(200)
LOGGER.info { "Creating the correct exchange..." }
declareFanoutExchangeWithBinding(it, exchange, queueName)
Thread.sleep(200)
LOGGER.info { "Exchange created!" }

Assertions.assertDoesNotThrow {
connectionManager.basicPublish(exchange, "", null, "Hello2".toByteArray(Charsets.UTF_8))
}

Thread.sleep(200)
downLatch.assertComplete(1L, TimeUnit.SECONDS) { "no messages were received" }

assertEquals(
1,
counter.get()
Expand Down Expand Up @@ -967,16 +968,21 @@ class TestConnectionManager {
}
}

private fun CountDownLatch.assertComplete(message: String) {
assertComplete { message }
private fun CountDownLatch.assertComplete(
message: String,
timeout: Long = 1,
timeUnit: TimeUnit = TimeUnit.SECONDS,
) {
assertComplete(timeout, timeUnit) { message }
}

private fun CountDownLatch.assertComplete(messageSupplier: () -> String) {
private fun CountDownLatch.assertComplete(
timeout: Long = 1,
timeUnit: TimeUnit = TimeUnit.SECONDS,
messageSupplier: () -> String,
) {
assertTrue(
await(
1L,
TimeUnit.SECONDS
)
await(timeout, timeUnit)
) { "${messageSupplier()}, actual count: $count" }
}

Expand Down

0 comments on commit a832f9c

Please sign in to comment.