Skip to content

Commit

Permalink
Correct work with confirmation flag. Use correct channel variable whe…
Browse files Browse the repository at this point in the history
…n publishing payload
  • Loading branch information
OptimumCode committed Jun 3, 2024
1 parent a832f9c commit b859c59
Showing 1 changed file with 5 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -380,9 +380,12 @@ public void close() {
LOGGER.warn("Some messages were not confirmed by broken in channel {} and were not republished. Try to republish messages", id);
channelHolder.publishUnconfirmedMessages();
}
LOGGER.info("Waiting for messages confirmation in channel {}", id);
try {
channelHolder.awaitConfirmations(closeTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignored) {
LOGGER.warn("Waiting for messages confirmation in channel {} was interrupted", id);
Thread.currentThread().interrupt();
}
}
if (channelHolder.hasUnconfirmedMessages()) {
Expand Down Expand Up @@ -844,7 +847,7 @@ public void retryingPublishWithLock(ConnectionManagerConfiguration configuration
// will drain message to queue on next iteration
continue;
}
currentPayload.publish(channel);
currentPayload.publish(tempChannel);
if (redeliveryQueue.isEmpty()) {
break;
}
Expand Down Expand Up @@ -1076,9 +1079,6 @@ public void publishUnconfirmedMessages() throws IOException {
publishConfirmationListener.transferUnconfirmedTo(redeliveryQueue);
while (!redeliveryQueue.isEmpty()) {
PublicationHolder holder = redeliveryQueue.pollFirst();
if (holder == null) {
break;
}
holder.publish(channel);
}
} finally {
Expand Down Expand Up @@ -1289,9 +1289,9 @@ public void transferUnconfirmedTo(Deque<PublicationHolder> redelivery) {
}
inflightRequests.clear();
inflightBytes = 0;
noConfirmationWillBeReceived = false;
hasSpaceToWriteCondition.signalAll();
allMessagesConfirmed.signalAll();
noConfirmationWillBeReceived = false;
} finally {
lock.writeLock().unlock();
}
Expand Down

0 comments on commit b859c59

Please sign in to comment.