Skip to content

Commit

Permalink
connection manager receives messages when publishing is blocked log…
Browse files Browse the repository at this point in the history
…ic rollback

isPublishingBlocked property
Awaitility used
  • Loading branch information
Oleg Smelov committed Jul 8, 2024
1 parent b041ca6 commit 2acea6e
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
Expand Down Expand Up @@ -108,6 +109,7 @@ public String getNameSuffix() {
private final HealthMetrics consumeMetrics = new HealthMetrics(this, ConnectionType.CONSUME.getNameSuffix());
private final Map<PinId, ChannelHolder> channelsByPin = new ConcurrentHashMap<>();
private final AtomicReference<State> connectionState = new AtomicReference<>(State.OPEN);
private final AtomicBoolean isPublishingBlocked = new AtomicBoolean(false);
private final ConnectionManagerConfiguration configuration;
private final String subscriberName;
private final AtomicInteger nextSubscriberId = new AtomicInteger(1);
Expand Down Expand Up @@ -245,7 +247,7 @@ private void turnOffReadiness(Throwable exception) {
connection = factory.newConnection(connectionName + '-' + connectionType.getNameSuffix());
LOGGER.info("Created RabbitMQ connection {} [{}]", connection, connection.hashCode());
addShutdownListenerToConnection(connection);
addBlockedListenersToConnection(connection);
addBlockedListenersToConnection(connection, connectionType);
addRecoveryListenerToConnection(connection, metrics);
metrics.getReadinessMonitor().enable();
LOGGER.debug("Set RabbitMQ readiness to true");
Expand Down Expand Up @@ -373,18 +375,34 @@ private void addRecoveryListenerToConnection(Connection conn, HealthMetrics metr
}
}

private void addBlockedListenersToConnection(Connection conn) {
conn.addBlockedListener(new BlockedListener() {
@Override
public void handleBlocked(String reason) {
LOGGER.warn("RabbitMQ blocked connection: {}", reason);
}
private void addBlockedListenersToConnection(Connection conn, ConnectionType type) {
BlockedListener listener = (type == ConnectionType.PUBLISH) ?
new BlockedListener() {
@Override
public void handleBlocked(String reason) {
isPublishingBlocked.set(true);
LOGGER.warn("RabbitMQ blocked publish connection: {}", reason);
}

@Override
public void handleUnblocked() {
LOGGER.warn("RabbitMQ unblocked connection");
}
});
@Override
public void handleUnblocked() {
isPublishingBlocked.set(false);
LOGGER.warn("RabbitMQ unblocked publish connection");
}
} :
new BlockedListener() {
@Override
public void handleBlocked(String reason) {
LOGGER.error("RabbitMQ blocked consumer connection: {}", reason);
}

@Override
public void handleUnblocked() {
LOGGER.error("RabbitMQ unblocked consumer connection");
}
};

conn.addBlockedListener(listener);
}

public boolean isOpen() {
Expand Down Expand Up @@ -449,7 +467,12 @@ private void closeConnection(Connection connection, int timeout) {
public void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException, InterruptedException {
ChannelHolder holder = getOrCreateChannelFor(PinId.forRoutingKey(exchange, routingKey), publishConnection);
holder.retryingPublishWithLock(configuration, body,
(channel, payload) -> channel.basicPublish(exchange, routingKey, props, payload));
(channel, payload) -> {
long start = System.currentTimeMillis();
channel.basicPublish(exchange, routingKey, props, payload);
long delay = System.currentTimeMillis() - start;
LOGGER.error("delay = {}", delay);
});
}

public String queueDeclare() throws IOException {
Expand Down Expand Up @@ -540,6 +563,10 @@ boolean isAlive() {
return publishMetrics.getLivenessMonitor().isEnabled() && consumeMetrics.getLivenessMonitor().isEnabled();
}

boolean isPublishingBlocked() {
return isPublishingBlocked.get();
}

private ChannelHolderOptions configurationToOptions() {
return new ChannelHolderOptions(
configuration.getPrefetchCount(),
Expand Down Expand Up @@ -892,7 +919,7 @@ public void retryingPublishWithLock(ConnectionManagerConfiguration configuration
} catch (IOException | ShutdownSignalException e) {
var currentValue = iterator.next();
var recoveryDelay = currentValue.getDelay();
LOGGER.warn("Retrying publishing #{}, waiting for {}ms. Reason: {}", currentValue.getTryNumber(), recoveryDelay, e);
LOGGER.error("Retrying publishing #{}, waiting for {}ms. Reason: {}", currentValue.getTryNumber(), recoveryDelay, e);
TimeUnit.MILLISECONDS.sleep(recoveryDelay);
// cleanup after failure
publishConfirmationListener.remove(msgSeq);
Expand Down Expand Up @@ -1353,7 +1380,7 @@ public void noConfirmationWillBeReceived() {
try {
LOGGER.warn("Publication listener was notified that no confirmations will be received");
noConfirmationWillBeReceived = true;
// we need to unlock possible locked publisher so it can check that nothing will be confirmed
// we need to unlock possible locked publisher, so it can check that nothing will be confirmed
// and retry the publication
hasSpaceToWriteCondition.signalAll();
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1204,6 +1204,7 @@ class TestConnectionManager {
LOGGER.info { "Started with port ${rabbit.amqpPort}" }
LOGGER.info { "Started with port ${rabbit.amqpPort}" }
val messagesCount = 10
val blockAfter = 3
val countDown = CountDownLatch(messagesCount)
val messageSizeBytes = 7
createConnectionManager(
Expand All @@ -1221,13 +1222,24 @@ class TestConnectionManager {
)
).use { manager ->
repeat(messagesCount) { index ->
if (index == blockAfter) {
assertFalse(manager.isPublishingBlocked)

// blocks all publishers ( https://www.rabbitmq.com/docs/memory )
rabbit.executeInContainerWithLogging("rabbitmqctl", "set_vm_memory_high_watermark", "0")
}

manager.basicPublish(exchange, routingKey, null, "Hello $index".toByteArray(Charsets.UTF_8))
LOGGER.info("Published $index")
}

// blocks all publishers ( https://www.rabbitmq.com/docs/memory )
rabbit.executeInContainerWithLogging("rabbitmqctl", "set_vm_memory_high_watermark", "0")
manager.basicPublish(exchange, routingKey, null, "Final message.".toByteArray(Charsets.UTF_8)) // this message initiates publishers blocking
if (index == blockAfter) {
// wait for blocking of publishing connection
Awaitility.await("publishing blocked")
.pollInterval(10L, TimeUnit.MILLISECONDS)
.atMost(100L, TimeUnit.MILLISECONDS)
.until { manager.isPublishingBlocked }
}
}

val receivedMessages = linkedSetOf<String>()
LOGGER.info { "creating consumer" }
Expand All @@ -1254,6 +1266,19 @@ class TestConnectionManager {
subscribeFuture.cancel(true)
}

Awaitility.await("receive messages sent before blocking")
.pollInterval(10L, TimeUnit.MILLISECONDS)
.atMost(100L, TimeUnit.MILLISECONDS)
.until { blockAfter.toLong() == messagesCount - countDown.count }

Thread.sleep(100) // ensure no more messages received
assertEquals(blockAfter.toLong(), messagesCount - countDown.count)
assertTrue(manager.isPublishingBlocked)

// unblocks publishers
rabbit.executeInContainerWithLogging("rabbitmqctl", "set_vm_memory_high_watermark", "0.4")
assertFalse(manager.isPublishingBlocked)

// delay receiving all messages
Awaitility.await("all messages received")
.pollInterval(10L, TimeUnit.MILLISECONDS)
Expand Down

0 comments on commit 2acea6e

Please sign in to comment.