Skip to content

Commit

Permalink
Add test case for republishing on close. Correct close method to allo…
Browse files Browse the repository at this point in the history
…w republisihing
  • Loading branch information
OptimumCode committed Jun 5, 2024
1 parent 45412f7 commit 84a094b
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 9 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ dependencies {
testImplementation("org.junit-pioneer:junit-pioneer:2.2.0") {
because("system property tests")
}
testImplementation("org.awaitility:awaitility:4.2.1")

testFixturesImplementation "org.jetbrains.kotlin:kotlin-test-junit5:$kotlin_version"
testFixturesImplementation "org.junit.jupiter:junit-jupiter:$junitVersion"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
Expand All @@ -89,7 +89,7 @@ public class ConnectionManager implements AutoCloseable {

private final Connection connection;
private final Map<PinId, ChannelHolder> channelsByPin = new ConcurrentHashMap<>();
private final AtomicBoolean connectionIsClosed = new AtomicBoolean(false);
private final AtomicReference<State> connectionIsClosed = new AtomicReference<>(State.OPEN);
private final ConnectionManagerConfiguration configuration;
private final String subscriberName;
private final AtomicInteger nextSubscriberId = new AtomicInteger(1);
Expand Down Expand Up @@ -118,6 +118,8 @@ public ConnectionManagerConfiguration getConfiguration() {
return configuration;
}

private enum State { OPEN, CLOSING, CLOSED }

public ConnectionManager(@NotNull String connectionName, @NotNull RabbitMQConfiguration rabbitMQConfiguration, @NotNull ConnectionManagerConfiguration connectionManagerConfiguration) {
Objects.requireNonNull(rabbitMQConfiguration, "RabbitMQ configuration cannot be null");
this.configuration = Objects.requireNonNull(connectionManagerConfiguration, "Connection manager configuration can not be null");
Expand Down Expand Up @@ -205,7 +207,7 @@ private void turnOffReadiness(Throwable exception) {
}
});

factory.setConnectionRecoveryTriggeringCondition(shutdownSignal -> !connectionIsClosed.get());
factory.setConnectionRecoveryTriggeringCondition(shutdownSignal -> connectionIsClosed.get() != State.CLOSED);

factory.setRecoveryDelayHandler(recoveryAttempts -> {
int minTime = connectionManagerConfiguration.getMinConnectionRecoveryTimeout();
Expand Down Expand Up @@ -358,12 +360,12 @@ public void handleUnblocked() {
}

public boolean isOpen() {
return connection.isOpen() && !connectionIsClosed.get();
return connection.isOpen() && connectionIsClosed.get() == State.OPEN;
}

@Override
public void close() {
if (connectionIsClosed.getAndSet(true)) {
if (!connectionIsClosed.compareAndSet(State.OPEN, State.CLOSING)) {
LOGGER.info("Connection manager already closed");
return;
}
Expand Down Expand Up @@ -397,6 +399,8 @@ public void close() {
}
}

connectionIsClosed.set(State.CLOSED);

if (connection.isOpen()) {
try {
connection.close(closeTimeout);
Expand Down Expand Up @@ -602,7 +606,7 @@ private void waitForConnectionRecovery(ShutdownNotifier notifier, boolean waitFo
}
}

if (connectionIsClosed.get()) {
if (connectionIsClosed.get() == State.CLOSED) {
throw new IllegalStateException("Connection is already closed");
}
}
Expand All @@ -621,7 +625,8 @@ private void waitForRecovery(ShutdownNotifier notifier) {
}

private boolean isConnectionRecovery(ShutdownNotifier notifier) {
return !(notifier instanceof AutorecoveringChannel) && !notifier.isOpen() && !connectionIsClosed.get();
return !(notifier instanceof AutorecoveringChannel) && !notifier.isOpen()
&& connectionIsClosed.get() != State.CLOSED;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import com.rabbitmq.client.BuiltinExchangeType
import com.rabbitmq.client.CancelCallback
import com.rabbitmq.client.Delivery
import mu.KotlinLogging
import org.awaitility.Awaitility
import org.junit.jupiter.api.AfterAll
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Assertions
Expand Down Expand Up @@ -110,7 +111,10 @@ class TestConnectionManager {
repeat(messagesCount) { index ->
if (index == 1) {
// delay should allow ack for the first message be received
Thread.sleep(100)
Awaitility.await("first message is confirmed")
.pollInterval(10, TimeUnit.MILLISECONDS)
.atMost(100, TimeUnit.MILLISECONDS)
.until { countDown.count == messagesCount - 1L }
// Man pages:
// https://man7.org/linux/man-pages/man8/tc-netem.8.html
// https://man7.org/linux/man-pages/man8/ifconfig.8.html
Expand All @@ -127,7 +131,9 @@ class TestConnectionManager {

// More than 2 HB will be missed
// This is enough for rabbitmq server to understand the connection is lost
Thread.sleep(3_000)
Awaitility.await("connection is closed")
.atMost(3, TimeUnit.SECONDS)
.until { !manager.isOpen }
// enabling network interface back
rabbit.executeInContainerWithLogging("ifconfig", "eth0", "up")
}
Expand All @@ -149,6 +155,99 @@ class TestConnectionManager {
}
}

@Test
fun `connection manager redelivers unconfirmed messages on close`() {
val routingKey = "routingKey1"
val queueName = "queue1"
val exchange = "test-exchange1"
rabbit
.let { rabbit ->
declareQueue(rabbit, queueName)
declareFanoutExchangeWithBinding(rabbit, exchange, queueName)
LOGGER.info { "Started with port ${rabbit.amqpPort}" }
val messagesCount = 10
val countDown = CountDownLatch(messagesCount)
val messageSizeBytes = 7
createConnectionManager(
rabbit, ConnectionManagerConfiguration(
subscriberName = "test",
prefetchCount = DEFAULT_PREFETCH_COUNT,
confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT,
minConnectionRecoveryTimeout = 2000,
maxConnectionRecoveryTimeout = 2000,
// to avoid unexpected delays before recovery
retryTimeDeviationPercent = 0,
)
).use { manager ->
val receivedMessages = linkedSetOf<String>()
manager.basicConsume(queueName, { _, delivery, ack ->
val message = delivery.body.toString(Charsets.UTF_8)
LOGGER.info { "Received $message from ${delivery.envelope.routingKey}" }
if (receivedMessages.add(message)) {
// decrement only unique messages
countDown.countDown()
} else {
LOGGER.warn { "Duplicated $message for ${delivery.envelope.routingKey}" }
}
ack.confirm()
}) {
LOGGER.info { "Canceled $it" }
}

createConnectionManager(
rabbit, ConnectionManagerConfiguration(
prefetchCount = DEFAULT_PREFETCH_COUNT,
confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT,
enablePublisherConfirmation = true,
maxInflightPublicationsBytes = messagesCount * 2 * messageSizeBytes,
heartbeatIntervalSeconds = 1,
minConnectionRecoveryTimeout = 2000,
maxConnectionRecoveryTimeout = 2000,
// to avoid unexpected delays before recovery
retryTimeDeviationPercent = 0,
)
).use { managerWithConfirmation ->
repeat(messagesCount) { index ->
if (index == 1) {
// delay should allow ack for the first message be received
Awaitility.await("first message is confirmed")
.pollInterval(10, TimeUnit.MILLISECONDS)
.atMost(100, TimeUnit.MILLISECONDS)
.until { countDown.count == messagesCount - 1L }
// looks like if nothing is sent yet through the channel
// it will detekt connection lost right away and start waiting for recovery
rabbit.executeInContainerWithLogging("ifconfig", "eth0", "down")
}
managerWithConfirmation.basicPublish(
exchange,
routingKey,
null,
"Hello $index".toByteArray(Charsets.UTF_8)
)
}
// ensure connection is closed because of HB timeout
Awaitility.await("connection is closed")
.atMost(4, TimeUnit.SECONDS)
.until { !managerWithConfirmation.isOpen }
rabbit.executeInContainerWithLogging("ifconfig", "eth0", "up")
// wait for connection to recover before closing
Awaitility.await("connection is recovered")
.atMost(4, TimeUnit.SECONDS)
.until { managerWithConfirmation.isOpen }
}

countDown.assertComplete { "Not all messages were received: $receivedMessages" }
assertEquals(
(0 until messagesCount).map {
"Hello $it"
},
receivedMessages.toList(),
"messages received in unexpected order",
)
}
}
}

@Test
fun `connection manager reports unacked messages when confirmation timeout elapsed`() {
val routingKey = "routingKey1"
Expand Down

0 comments on commit 84a094b

Please sign in to comment.