Skip to content

Commit

Permalink
fixes after review
Browse files Browse the repository at this point in the history
  • Loading branch information
Oleg Smelov committed Oct 26, 2023
1 parent 0b4ccbf commit 767ee07
Show file tree
Hide file tree
Showing 9 changed files with 148 additions and 167 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
* Copyright 2020-2023 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand Down Expand Up @@ -84,9 +85,9 @@ public class ConnectionManager implements AutoCloseable {
private final String subscriberName;
private final AtomicInteger nextSubscriberId = new AtomicInteger(1);
private final ExecutorService sharedExecutor;
private final ScheduledExecutorService channelChecker = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
.setNameFormat("channel-checker-%d")
.build());
private final ScheduledExecutorService channelChecker = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("channel-checker-%d").build()
);

private final HealthMetrics metrics = new HealthMetrics(this);

Expand Down Expand Up @@ -191,7 +192,6 @@ private void turnOffReadiness(Throwable exception) {
}
});

factory.setAutomaticRecoveryEnabled(configuration.isAutomaticRecoveryEnabled());
factory.setConnectionRecoveryTriggeringCondition(shutdownSignal -> !connectionIsClosed.get());

factory.setRecoveryDelayHandler(recoveryAttempts -> {
Expand All @@ -201,7 +201,7 @@ private void turnOffReadiness(Throwable exception) {
int deviationPercent = connectionManagerConfiguration.getRetryTimeDeviationPercent();

LOGGER.debug("Try to recovery connection to RabbitMQ. Count tries = {}", recoveryAttempts);
int recoveryDelay = RetryingDelay.Companion.getRecoveryDelay(recoveryAttempts, minTime, maxTime, maxRecoveryAttempts, deviationPercent);
int recoveryDelay = RetryingDelay.getRecoveryDelay(recoveryAttempts, minTime, maxTime, maxRecoveryAttempts, deviationPercent);
if (recoveryAttempts >= maxRecoveryAttempts && metrics.getLivenessMonitor().isEnabled()) {
LOGGER.info("Set RabbitMQ liveness to false. Can't recover connection");
metrics.getLivenessMonitor().disable();
Expand Down Expand Up @@ -247,23 +247,22 @@ private void addShutdownListenerToChannel(Channel channel, Boolean withRecovery)
String errorString = errorBuilder.toString();
LOGGER.warn(errorString);
if (withRecovery && errorString.contains("PRECONDITION_FAILED")) {
recoverSubscriptionsOfChannel(channel);
recoverSubscriptionsOfChannel(channel.getChannelNumber());
}
}
}
}
});
}

private void recoverSubscriptionsOfChannel(Channel channel) {
private void recoverSubscriptionsOfChannel(int channelNumber) {
channelChecker.execute(() -> {
try {
var pinToChannelHolderOptional =
channelsByPin
.entrySet()
.stream()
.filter(entry -> Objects.nonNull(entry.getValue().channel) && channel.getChannelNumber() == entry.getValue().channel.getChannelNumber())
.findAny();
var pinToChannelHolderOptional = channelsByPin
.entrySet()
.stream()
.filter(entry -> Objects.nonNull(entry.getValue().channel) && channelNumber == entry.getValue().channel.getChannelNumber())
.findAny();

var pinIdToChannelHolder = pinToChannelHolderOptional.orElse(null);
if (pinIdToChannelHolder != null) {
Expand Down Expand Up @@ -341,6 +340,10 @@ public void close() throws IOException {

LOGGER.info("Closing connection manager");

for (ChannelHolder channelHolder: channelsByPin.values()) {
channelHolder.channel.abort();
}

int closeTimeout = configuration.getConnectionCloseTimeout();
if (connection.isOpen()) {
try {
Expand All @@ -352,10 +355,6 @@ public void close() throws IOException {
}
}

for (ChannelHolder channelHolder: channelsByPin.values()) {
channelHolder.channel.abort();
}

shutdownExecutor(sharedExecutor, closeTimeout, "rabbit-shared");
shutdownExecutor(channelChecker, closeTimeout, "channel-checker");
}
Expand Down Expand Up @@ -423,7 +422,6 @@ public void confirm() throws IOException {

Confirmation confirmation = OnlyOnceConfirmation.wrap("from " + routingKey + " to " + queue, wrappedConfirmation);


holder.withLock(() -> holder.acquireAndSubmitCheck(() ->
channelChecker.schedule(() -> {
holder.withLock(() -> {
Expand Down Expand Up @@ -728,16 +726,16 @@ public void withLock(boolean waitForRecovery, ChannelConsumer consumer) throws I
}

public void retryingPublishWithLock(ChannelConsumer consumer, ConnectionManagerConfiguration configuration) throws InterruptedException {
Iterator<RetryingDelay> iterator = configuration.createRetryingDelaySequence().iterator();
lock.lock();
Iterator<RetryingDelay> iterator = configuration.createRetryingDelaySequence().iterator();
try {
var currentValue = iterator.next();
Channel tempChannel = getChannel(true);
while (true) {
try {
consumer.consume(tempChannel);
break;
} catch (IOException | ShutdownSignalException e) {
var currentValue = iterator.next();
int recoveryDelay = currentValue.getDelay();
LOGGER.warn("Retrying publishing #{}, waiting for {}ms, then recreating channel. Reason: {}", currentValue.getTryNumber(), recoveryDelay, e);
TimeUnit.MILLISECONDS.sleep(recoveryDelay);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ data class ConnectionManagerConfiguration(
val messageRecursionLimit: Int = 100,
val workingThreads: Int = 1,
val confirmationTimeout: Duration = Duration.ofMinutes(5),
val isAutomaticRecoveryEnabled: Boolean = true
) : Configuration() {
init {
check(workingThreads > 0) { "expected 'workingThreads' greater than 0 but was $workingThreads" }
Expand All @@ -60,7 +59,6 @@ data class ConnectionManagerConfiguration(
))
}
}

}

data class RetryingDelay(val tryNumber: Int, val delay: Int) {
Expand All @@ -75,7 +73,9 @@ data class RetryingDelay(val tryNumber: Int, val delay: Int) {
): Int {
return if (numberOfTries <= maxRecoveryAttempts) {
getRecoveryDelayWithIncrement(numberOfTries, minTime, maxTime, maxRecoveryAttempts)
} else getRecoveryDelayWithDeviation(maxTime, deviationPercent)
} else {
getRecoveryDelayWithDeviation(maxTime, deviationPercent)
}
}

private fun getRecoveryDelayWithDeviation(maxTime: Int, deviationPercent: Int): Int {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,30 @@
/*
* Copyright 2023 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.exactpro.th2.common.schema.message

import org.testcontainers.utility.DockerImageName
import java.time.Duration

class ContainerConstants {
companion object {
val RABBITMQ_IMAGE_NAME: DockerImageName = DockerImageName.parse("rabbitmq:3.12.7-management-alpine")
const val ROUTING_KEY = "routingKey"
const val QUEUE_NAME = "queue"
const val EXCHANGE = "test-exchange"
object ContainerConstants {
@JvmField val RABBITMQ_IMAGE_NAME: DockerImageName = DockerImageName.parse("rabbitmq:3.12.7-management-alpine")
const val ROUTING_KEY = "routingKey"
const val QUEUE_NAME = "queue"
const val EXCHANGE = "test-exchange"

const val DEFAULT_PREFETCH_COUNT = 10
val DEFAULT_CONFIRMATION_TIMEOUT: Duration = Duration.ofSeconds(1)
}
const val DEFAULT_PREFETCH_COUNT = 10
@JvmField val DEFAULT_CONFIRMATION_TIMEOUT: Duration = Duration.ofSeconds(1)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ package com.exactpro.th2.common.schema.message.impl.rabbitmq

import com.exactpro.th2.common.annotations.IntegrationTest
import com.exactpro.th2.common.schema.box.configuration.BoxConfiguration
import com.exactpro.th2.common.schema.message.ContainerConstants.Companion.DEFAULT_CONFIRMATION_TIMEOUT
import com.exactpro.th2.common.schema.message.ContainerConstants.Companion.DEFAULT_PREFETCH_COUNT
import com.exactpro.th2.common.schema.message.ContainerConstants.Companion.EXCHANGE
import com.exactpro.th2.common.schema.message.ContainerConstants.Companion.QUEUE_NAME
import com.exactpro.th2.common.schema.message.ContainerConstants.Companion.RABBITMQ_IMAGE_NAME
import com.exactpro.th2.common.schema.message.ContainerConstants.Companion.ROUTING_KEY
import com.exactpro.th2.common.schema.message.ContainerConstants.DEFAULT_CONFIRMATION_TIMEOUT
import com.exactpro.th2.common.schema.message.ContainerConstants.DEFAULT_PREFETCH_COUNT
import com.exactpro.th2.common.schema.message.ContainerConstants.EXCHANGE
import com.exactpro.th2.common.schema.message.ContainerConstants.QUEUE_NAME
import com.exactpro.th2.common.schema.message.ContainerConstants.RABBITMQ_IMAGE_NAME
import com.exactpro.th2.common.schema.message.ContainerConstants.ROUTING_KEY
import com.exactpro.th2.common.schema.message.ManualAckDeliveryCallback
import com.exactpro.th2.common.schema.message.configuration.MessageRouterConfiguration
import com.exactpro.th2.common.schema.message.configuration.QueueConfiguration
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 Exactpro (Exactpro Systems Limited)
* Copyright 2022-2023 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,20 +17,20 @@
package com.exactpro.th2.common.schema.message.impl.rabbitmq.connection

import com.exactpro.th2.common.annotations.IntegrationTest
import com.exactpro.th2.common.schema.message.ContainerConstants.Companion.DEFAULT_CONFIRMATION_TIMEOUT
import com.exactpro.th2.common.schema.message.ContainerConstants.Companion.DEFAULT_PREFETCH_COUNT
import com.exactpro.th2.common.schema.message.ContainerConstants.Companion.RABBITMQ_IMAGE_NAME
import com.exactpro.th2.common.schema.message.ContainerConstants.DEFAULT_CONFIRMATION_TIMEOUT
import com.exactpro.th2.common.schema.message.ContainerConstants.DEFAULT_PREFETCH_COUNT
import com.exactpro.th2.common.schema.message.ContainerConstants.RABBITMQ_IMAGE_NAME
import com.exactpro.th2.common.schema.message.ManualAckDeliveryCallback
import com.exactpro.th2.common.schema.message.SubscriberMonitor
import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.ConnectionManagerConfiguration
import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.RabbitMQConfiguration
import com.exactpro.th2.common.util.RabbitTestContainerUtil.Companion.declareFanoutExchangeWithBinding
import com.exactpro.th2.common.util.RabbitTestContainerUtil.Companion.declareQueue
import com.exactpro.th2.common.util.RabbitTestContainerUtil.Companion.getChannelsInfo
import com.exactpro.th2.common.util.RabbitTestContainerUtil.Companion.getQueuesInfo
import com.exactpro.th2.common.util.RabbitTestContainerUtil.Companion.getSubscribedChannelsCount
import com.exactpro.th2.common.util.RabbitTestContainerUtil.Companion.putMessageInQueue
import com.exactpro.th2.common.util.RabbitTestContainerUtil.Companion.restartContainer
import com.exactpro.th2.common.util.declareFanoutExchangeWithBinding
import com.exactpro.th2.common.util.declareQueue
import com.exactpro.th2.common.util.getChannelsInfo
import com.exactpro.th2.common.util.getQueuesInfo
import com.exactpro.th2.common.util.getSubscribedChannelsCount
import com.exactpro.th2.common.util.putMessageInQueue
import com.exactpro.th2.common.util.restartContainer
import com.rabbitmq.client.BuiltinExchangeType
import java.time.Duration
import java.util.concurrent.ArrayBlockingQueue
Expand All @@ -47,19 +47,12 @@ import org.junit.jupiter.api.Assertions.assertFalse
import org.junit.jupiter.api.Assertions.assertNotEquals
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
import org.testcontainers.containers.RabbitMQContainer
import org.testcontainers.utility.MountableFile
import java.io.IOException
import kotlin.test.assertFailsWith

private val LOGGER = KotlinLogging.logger { }


@IntegrationTest
class TestConnectionManager {

@Test
fun `connection manager reports unacked messages when confirmation timeout elapsed`() {
val routingKey = "routingKey1"
Expand Down Expand Up @@ -467,12 +460,10 @@ class TestConnectionManager {
LOGGER.info { "Publication finished!" }
LOGGER.info { getQueuesInfo(it) }


consume.assertComplete("Wrong number of received messages")
assertTrue(
getQueuesInfo(it).toString().contains("$queueName\t0")
) { "There should be no messages left in the queue" }

assertTrue(getQueuesInfo(it).toString().contains("$queueName\t0")) {
"There should be no messages left in the queue"
}
}
}
}
Expand Down Expand Up @@ -694,6 +685,7 @@ class TestConnectionManager {
}
for (i in 1..5) {
putMessageInQueue(it, queueName)

Thread.sleep(1000)
}

Expand Down Expand Up @@ -777,9 +769,9 @@ class TestConnectionManager {

assertEquals(0, getSubscribedChannelsCount(it, queueName))
assertEquals(2, counter.get()) { "Wrong number of received messages" }
assertTrue(
queuesListExecResult.toString().contains("$queueName\t1")
) { "There should a message left in the queue" }
assertTrue(queuesListExecResult.toString().contains("$queueName\t1")) {
"There should a message left in the queue"
}
} finally {
Assertions.assertNotNull(thread)
Assertions.assertDoesNotThrow {
Expand Down Expand Up @@ -824,29 +816,25 @@ class TestConnectionManager {
)

@Test
@Disabled
// TODO: this test is no more relevant
// TODO: we need to change test scenario or remove it
fun `connection manager exclusive queue test`() {
RabbitMQContainer(RABBITMQ_IMAGE_NAME)
.use { rabbitMQContainer ->
rabbitMQContainer.start()
LOGGER.info { "Started with port ${rabbitMQContainer.amqpPort}" }

createConnectionManager(rabbitMQContainer).use { firstManager ->
createConnectionManager(rabbitMQContainer).use { secondManager ->
val queue = firstManager.queueDeclare()

assertFailsWith<IOException>("Another connection can subscribe to the $queue queue") {
secondManager.basicConsume(queue, { _, _, _ -> }, {})
}

extracted(firstManager, secondManager, queue, 3)
extracted(firstManager, secondManager, queue, 6)
}
RabbitMQContainer(RABBITMQ_IMAGE_NAME).use { rabbitMQContainer ->
rabbitMQContainer.start()
LOGGER.info { "Started with port ${rabbitMQContainer.amqpPort}" }

createConnectionManager(rabbitMQContainer).use { firstManager ->
createConnectionManager(rabbitMQContainer).use { secondManager ->
val queue = firstManager.queueDeclare()
val consumerThread = thread { secondManager.basicConsume(queue, { _, _, _ -> }, {}) }
consumerThread.join(5_000)
val isAlive = consumerThread.isAlive
consumerThread.stop()
assertTrue(isAlive) { "Another connection can subscribe to the $queue queue" }

extracted(firstManager, secondManager, queue, 3)
extracted(firstManager, secondManager, queue, 6)
}

}
}
}

private fun extracted(
Expand Down Expand Up @@ -893,7 +881,6 @@ class TestConnectionManager {
rabbitMQContainer: RabbitMQContainer,
prefetchCount: Int = DEFAULT_PREFETCH_COUNT,
confirmationTimeout: Duration = DEFAULT_CONFIRMATION_TIMEOUT,
isAutomaticRecoveryEnabled: Boolean = true
) = ConnectionManager(
RabbitMQConfiguration(
host = rabbitMQContainer.host,
Expand All @@ -905,12 +892,13 @@ class TestConnectionManager {
ConnectionManagerConfiguration(
subscriberName = "test",
prefetchCount = prefetchCount,
confirmationTimeout = confirmationTimeout,
isAutomaticRecoveryEnabled = isAutomaticRecoveryEnabled
),
confirmationTimeout = confirmationTimeout
)
)

companion object {
private val LOGGER = KotlinLogging.logger { }

private lateinit var rabbit: RabbitMQContainer

@BeforeAll
Expand Down
Loading

0 comments on commit 767ee07

Please sign in to comment.