Skip to content

Commit

Permalink
Migrate inflight limit from messages count to payload size in bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
OptimumCode committed Jun 2, 2024
1 parent c3acdfe commit 8c20522
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 43 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ dependencies {
+ Added functionality for publisher confirmations to mitigate network issues for message producers.
+ New parameters are added to connection manager configuration:
+ enablePublisherConfirmation - enables publisher confirmation. `false` by default.
+ maxInflightPublications - the max number of unconfirmed published messages per channel. `200`, by default.
+ maxInflightPublicationsBytes - the max number of unconfirmed published messages per channel. `52428800` (50 MB), by default.
+ heartbeatIntervalSeconds - rabbitmq connection heartbeat interval in seconds.
`0` by default (that means the default interval will be set by the internal library used to communicate with RabbitMQ).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ private ChannelHolderOptions configurationToOptions() {
return new ChannelHolderOptions(
configuration.getPrefetchCount(),
configuration.getEnablePublisherConfirmation(),
configuration.getMaxInflightPublications()
configuration.getMaxInflightPublicationsBytes()
);
}

Expand Down Expand Up @@ -718,12 +718,12 @@ public String toString() {
private static class ChannelHolderOptions {
private final int maxCount;
private final boolean enablePublisherConfirmation;
private final int maxInflightRequests;
private final int maxInflightRequestsBytes;

private ChannelHolderOptions(int maxCount, boolean enablePublisherConfirmation, int maxInflightRequests) {
private ChannelHolderOptions(int maxCount, boolean enablePublisherConfirmation, int maxInflightRequestsBytes) {
this.maxCount = maxCount;
this.enablePublisherConfirmation = enablePublisherConfirmation;
this.maxInflightRequests = maxInflightRequests;
this.maxInflightRequestsBytes = maxInflightRequestsBytes;
}

public int getMaxCount() {
Expand All @@ -734,8 +734,8 @@ public boolean isEnablePublisherConfirmation() {
return enablePublisherConfirmation;
}

public int getMaxInflightRequests() {
return maxInflightRequests;
public int getMaxInflightRequestsBytes() {
return maxInflightRequestsBytes;
}
}

Expand Down Expand Up @@ -780,7 +780,7 @@ public ChannelHolder(
this.subscriptionCallbacks = subscriptionCallbacks;
publishConfirmationListener = new PublisherConfirmationListener(
options.isEnablePublisherConfirmation(),
options.getMaxInflightRequests()
options.getMaxInflightRequestsBytes()
);
}

Expand Down Expand Up @@ -842,7 +842,6 @@ public void retryingPublishWithLock(ConnectionManagerConfiguration configuration
publishConfirmationListener.put(msgSeq, currentPayload);
if (publishConfirmationListener.isNoConfirmationWillBeReceived()) {
// will drain message to queue on next iteration
publishConfirmationListener.remove(msgSeq);
continue;
}
currentPayload.publish(channel);
Expand All @@ -854,9 +853,6 @@ public void retryingPublishWithLock(ConnectionManagerConfiguration configuration
var recoveryDelay = currentValue.getDelay();
LOGGER.warn("Retrying publishing #{}, waiting for {}ms. Reason: {}", currentValue.getTryNumber(), recoveryDelay, e);
TimeUnit.MILLISECONDS.sleep(recoveryDelay);
// cleanup after failure
publishConfirmationListener.remove(msgSeq);
redeliveryQueue.addFirst(currentPayload);

// We should not recover the channel if its connection is closed
// If we do that the channel will be also auto recovered by RabbitMQ client
Expand All @@ -866,6 +862,10 @@ public void retryingPublishWithLock(ConnectionManagerConfiguration configuration
// so we should redeliver all inflight requests
publishConfirmationListener.transferUnconfirmedTo(redeliveryQueue);
tempChannel = recreateChannel();
} else {
// cleanup after failure
publishConfirmationListener.remove(msgSeq);
redeliveryQueue.addFirst(currentPayload);
}
}
}
Expand Down Expand Up @@ -1098,6 +1098,10 @@ private PublicationHolder(ChannelPublisher publisher, byte[] payload) {
this.payload = payload;
}

int size() {
return payload.length;
}

boolean isConfirmed() {
return confirmed;
}
Expand All @@ -1121,17 +1125,20 @@ private static class PublisherConfirmationListener implements ConfirmListener {
private final Condition allMessagesConfirmed = lock.writeLock().newCondition();
@GuardedBy("lock")
private final NavigableMap<Long, PublicationHolder> inflightRequests = new TreeMap<>();
private final int maxInflightRequests;
private final int maxInflightRequestsBytes;
private final boolean enablePublisherConfirmation;
private final boolean hasLimit;
private volatile boolean noConfirmationWillBeReceived;
@GuardedBy("lock")
private boolean noConfirmationWillBeReceived;
@GuardedBy("lock")
private int inflightBytes;

private PublisherConfirmationListener(boolean enablePublisherConfirmation, int maxInflightRequests) {
if (maxInflightRequests <= 0 && maxInflightRequests != ConnectionManagerConfiguration.NO_LIMIT_INFLIGHT_REQUESTS) {
throw new IllegalArgumentException("invalid maxInflightRequests: " + maxInflightRequests);
private PublisherConfirmationListener(boolean enablePublisherConfirmation, int maxInflightRequestsBytes) {
if (maxInflightRequestsBytes <= 0 && maxInflightRequestsBytes != ConnectionManagerConfiguration.NO_LIMIT_INFLIGHT_REQUESTS) {
throw new IllegalArgumentException("invalid maxInflightRequests: " + maxInflightRequestsBytes);
}
hasLimit = maxInflightRequests != ConnectionManagerConfiguration.NO_LIMIT_INFLIGHT_REQUESTS;
this.maxInflightRequests = maxInflightRequests;
hasLimit = maxInflightRequestsBytes != ConnectionManagerConfiguration.NO_LIMIT_INFLIGHT_REQUESTS;
this.maxInflightRequestsBytes = maxInflightRequestsBytes;
this.enablePublisherConfirmation = enablePublisherConfirmation;
}

Expand All @@ -1141,14 +1148,26 @@ public void put(long deliveryTag, PublicationHolder payload) throws InterruptedE
}
lock.writeLock().lock();
try {
// there is only one thread at a time that tries to publish message
// so it is safe to check the limit only once
if (hasLimit && inflightRequests.size() >= maxInflightRequests) {
LOGGER.warn("blocking because inflight requests size is above limit {} for publication channel", maxInflightRequests);
hasSpaceToWriteCondition.await();
LOGGER.info("unblocking because inflight requests size is below limit {} for publication channel", maxInflightRequests);
int payloadSize = payload.size();
if (hasLimit) {
int newSize = inflightBytes + payloadSize;
if (newSize > maxInflightRequestsBytes) {
LOGGER.warn("blocking because {} inflight requests bytes size is above limit {} bytes for publication channel",
newSize, maxInflightRequestsBytes);
do {
hasSpaceToWriteCondition.await();
newSize = inflightBytes + payloadSize;
} while (newSize > maxInflightRequestsBytes && !noConfirmationWillBeReceived);
if (noConfirmationWillBeReceived) {
LOGGER.warn("unblocking because no confirmation will be received and inflight request size will not change");
} else {
LOGGER.info("unblocking because {} inflight requests bytes size is below limit {} bytes for publication channel",
newSize, maxInflightRequestsBytes);
}
}
}
inflightRequests.put(deliveryTag, payload);
inflightBytes += payloadSize;
} finally {
lock.writeLock().unlock();
}
Expand All @@ -1162,6 +1181,7 @@ public void remove(long deliveryTag) {
public void handleAck(long deliveryTag, boolean multiple) {
LOGGER.trace("Delivery ack received for tag {} (multiple:{})", deliveryTag, multiple);
removeInflightRequests(deliveryTag, multiple);
LOGGER.trace("Delivery ack processed for tag {} (multiple:{})", deliveryTag, multiple);
}

@Override
Expand All @@ -1170,6 +1190,7 @@ public void handleNack(long deliveryTag, boolean multiple) {
// we cannot do match with nack because this is an internal error in rabbitmq
// we can try to republish the message but there is no guarantee that the message will be accepted
removeInflightRequests(deliveryTag, multiple);
LOGGER.warn("Delivery nack processed for tag {} (multiple:{})", deliveryTag, multiple);
}

private void removeInflightRequests(long deliveryTag, boolean multiple) {
Expand All @@ -1178,9 +1199,13 @@ private void removeInflightRequests(long deliveryTag, boolean multiple) {
}
lock.writeLock().lock();
try {
int initialSize = inflightRequests.size();
int currentSize = inflightBytes;
if (multiple) {
inflightRequests.headMap(deliveryTag, true).clear();
Map<Long, PublicationHolder> headMap = inflightRequests.headMap(deliveryTag, true);
for (Map.Entry<Long, PublicationHolder> entry : headMap.entrySet()) {
currentSize -= entry.getValue().size();
}
headMap.clear();
} else {
long oldestPublication = Objects.requireNonNullElse(inflightRequests.firstKey(), deliveryTag);
if (oldestPublication == deliveryTag) {
Expand All @@ -1195,6 +1220,7 @@ private void removeInflightRequests(long deliveryTag, boolean multiple) {
if (key > deliveryTag && !holder.isConfirmed()) {
break;
}
currentSize -= holder.size();
tailIterator.remove();
}
} else {
Expand All @@ -1206,12 +1232,14 @@ private void removeInflightRequests(long deliveryTag, boolean multiple) {
}
}
}
if (inflightBytes != currentSize) {
inflightBytes = currentSize;
hasSpaceToWriteCondition.signalAll();
}
if (inflightRequests.isEmpty()) {
inflightBytes = 0;
allMessagesConfirmed.signalAll();
}
if (inflightRequests.size() != initialSize) {
hasSpaceToWriteCondition.signalAll();
}
} finally {
lock.writeLock().unlock();
}
Expand Down Expand Up @@ -1244,6 +1272,7 @@ public void transferUnconfirmedTo(Deque<PublicationHolder> redelivery) {
redelivery.addFirst(payload);
}
inflightRequests.clear();
inflightBytes = 0;
hasSpaceToWriteCondition.signalAll();
allMessagesConfirmed.signalAll();
noConfirmationWillBeReceived = false;
Expand All @@ -1270,6 +1299,7 @@ public boolean isNoConfirmationWillBeReceived() {
public void noConfirmationWillBeReceived() {
lock.writeLock().lock();
try {
LOGGER.warn("Publication listener was notified that no confirmations will be received");
noConfirmationWillBeReceived = true;
// we need to unlock possible locked publisher so it can check that nothing will be confirmed
// and retry the publication
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ data class ConnectionManagerConfiguration(
val workingThreads: Int = 1,
val confirmationTimeout: Duration = Duration.ofMinutes(5),
val enablePublisherConfirmation: Boolean = false,
// Default value is taken based on measurement done in ConnectionManualBenchmark class
val maxInflightPublications: Int = 200,
// Default value 50MB is taken based on measurement done in ConnectionManualBenchmark class
val maxInflightPublicationsBytes: Int = 50 * 1024 * 1024,
val heartbeatIntervalSeconds: Int = DEFAULT_HB_INTERVAL_SECONDS,
) : Configuration() {
init {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ object ConnectionManualBenchmark {
prefetchCount = 1000,
confirmationTimeout = Duration.ofSeconds(5),
enablePublisherConfirmation = true,
maxInflightPublications = 100,
maxInflightPublicationsBytes = 1024 * 1024 * 25,
)
)

Expand All @@ -57,7 +57,7 @@ object ConnectionManualBenchmark {
prefetchCount = 100,
confirmationTimeout = Duration.ofSeconds(5),
enablePublisherConfirmation = true,
maxInflightPublications = 200,
maxInflightPublicationsBytes = 1024 * 1024 * 1,
)
)
}
Expand All @@ -69,7 +69,7 @@ object ConnectionManualBenchmark {
prefetchCount = 100,
confirmationTimeout = Duration.ofSeconds(5),
enablePublisherConfirmation = false,
maxInflightPublications = -1,
maxInflightPublicationsBytes = -1,
)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import org.testcontainers.utility.MountableFile
import java.io.IOException
import java.time.Duration
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
Expand All @@ -74,13 +75,14 @@ class TestConnectionManager {
LOGGER.info { "Started with port ${rabbit.amqpPort}" }
val messagesCount = 10
val countDown = CountDownLatch(messagesCount)
val messageSizeBytes = 7
createConnectionManager(
rabbit, ConnectionManagerConfiguration(
subscriberName = "test",
prefetchCount = DEFAULT_PREFETCH_COUNT,
confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT,
enablePublisherConfirmation = true,
maxInflightPublications = 5,
maxInflightPublicationsBytes = 5 * messageSizeBytes,
heartbeatIntervalSeconds = 1,
minConnectionRecoveryTimeout = 2000,
maxConnectionRecoveryTimeout = 2000,
Expand All @@ -104,6 +106,7 @@ class TestConnectionManager {
}


var future: CompletableFuture<*>? = null
repeat(messagesCount) { index ->
if (index == 1) {
// delay should allow ack for the first message be received
Expand All @@ -119,16 +122,22 @@ class TestConnectionManager {
// So, we will have to deal with it on the consumer side
rabbit.executeInContainerWithLogging("ifconfig", "eth0", "down")
} else if (index == 4) {
// More than 2 HB will be missed
// This is enough for rabbitmq server to understand the connection is lost
Thread.sleep(4_000)
// enabling network interface back
rabbit.executeInContainerWithLogging("ifconfig", "eth0", "up")
future = CompletableFuture.supplyAsync {
// Interface is unblock in separate thread to emulate more realistic scenario

// More than 2 HB will be missed
// This is enough for rabbitmq server to understand the connection is lost
Thread.sleep(3_000)
// enabling network interface back
rabbit.executeInContainerWithLogging("ifconfig", "eth0", "up")
}
}
manager.basicPublish(exchange, routingKey, null, "Hello $index".toByteArray(Charsets.UTF_8))
}

countDown.assertComplete("Not all messages were received: $receivedMessages")
future?.get(30, TimeUnit.SECONDS)

countDown.assertComplete { "Not all messages were received: $receivedMessages" }
assertEquals(
(0 until messagesCount).map {
"Hello $it"
Expand Down Expand Up @@ -959,12 +968,16 @@ class TestConnectionManager {
}

private fun CountDownLatch.assertComplete(message: String) {
assertComplete { message }
}

private fun CountDownLatch.assertComplete(messageSupplier: () -> String) {
assertTrue(
await(
1L,
TimeUnit.SECONDS
)
) { "$message, actual count: $count" }
) { "${messageSupplier()}, actual count: $count" }
}

private fun <T> assertTarget(target: T, timeout: Long = 1_000, message: String, func: () -> T) {
Expand Down

0 comments on commit 8c20522

Please sign in to comment.