From 8e20f14602248b0e834a0206a943a2d072874a0e Mon Sep 17 00:00:00 2001 From: Oleg Smelov Date: Wed, 10 Jul 2024 20:04:55 +0400 Subject: [PATCH] after review --- gradle.properties | 2 +- .../connection/ConnectionManager.java | 20 ++-------- .../connection/ConsumeConnectionManager.kt | 37 ++++++++++--------- .../connection/PublishConnectionManager.kt | 2 +- 4 files changed, 26 insertions(+), 35 deletions(-) diff --git a/gradle.properties b/gradle.properties index 5c8ad927..f8a2b4af 100644 --- a/gradle.properties +++ b/gradle.properties @@ -17,4 +17,4 @@ release_version=5.14.0 kotlin_version=1.8.22 description='th2 common library (Java)' vcs_url=https://github.com/th2-net/th2-common-j -kapt.include.compile.classpath=true +kapt.include.compile.classpath=false diff --git a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java index 99d41622..6e006b0e 100644 --- a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java +++ b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java @@ -38,7 +38,6 @@ import com.rabbitmq.client.TopologyRecoveryException; import com.rabbitmq.client.impl.AMQImpl; import com.rabbitmq.client.impl.recovery.AutorecoveringChannel; -import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; @@ -66,7 +65,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @@ -84,8 +82,6 @@ public abstract class ConnectionManager implements AutoCloseable { protected final Map channelsByPin = new ConcurrentHashMap<>(); private final AtomicReference connectionState = new AtomicReference<>(State.OPEN); private final ConnectionManagerConfiguration configuration; - protected final String subscriberName; - protected final AtomicInteger nextSubscriberId = new AtomicInteger(1); private final ExecutorService sharedExecutor; protected final ScheduledExecutorService channelChecker = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("channel-checker-%d").build() @@ -117,14 +113,6 @@ public ConnectionManager(@NotNull String connectionName, @NotNull RabbitMQConfig Objects.requireNonNull(rabbitMQConfiguration, "RabbitMQ configuration cannot be null"); this.configuration = Objects.requireNonNull(connectionManagerConfiguration, "Connection manager configuration can not be null"); - String subscriberNameTmp = ObjectUtils.defaultIfNull(connectionManagerConfiguration.getSubscriberName(), connectionManagerConfiguration.getSubscriberName()); - if (StringUtils.isBlank(subscriberNameTmp)) { - subscriberName = "rabbit_mq_subscriber." + System.currentTimeMillis(); - LOGGER.info("Subscribers will use default name: {}", subscriberName); - } else { - subscriberName = subscriberNameTmp + "." + System.currentTimeMillis(); - } - var factory = new ConnectionFactory(); var virtualHost = rabbitMQConfiguration.getVHost(); var username = rabbitMQConfiguration.getUsername(); @@ -316,10 +304,10 @@ private void addRecoveryListenerToConnection(Connection conn) { } } - protected abstract BlockedListener getBlockedListener(); + protected abstract BlockedListener createBlockedListener(); private void addBlockedListenersToConnection(Connection conn) { - conn.addBlockedListener(getBlockedListener()); + conn.addBlockedListener(createBlockedListener()); } public boolean isOpen() { @@ -406,8 +394,8 @@ private void shutdownExecutor(ExecutorService executor, int closeTimeout, String } protected static final class SubscriptionCallbacks { - protected final ManualAckDeliveryCallback deliverCallback; - protected final CancelCallback cancelCallback; + final ManualAckDeliveryCallback deliverCallback; + final CancelCallback cancelCallback; public SubscriptionCallbacks(ManualAckDeliveryCallback deliverCallback, CancelCallback cancelCallback) { this.deliverCallback = deliverCallback; diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConsumeConnectionManager.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConsumeConnectionManager.kt index 00683ed3..0bf920a2 100644 --- a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConsumeConnectionManager.kt +++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConsumeConnectionManager.kt @@ -27,20 +27,29 @@ import com.rabbitmq.client.Channel import com.rabbitmq.client.Delivery import com.rabbitmq.client.CancelCallback import com.rabbitmq.client.BlockedListener -import com.rabbitmq.client.ShutdownNotifier import com.rabbitmq.client.ShutdownSignalException import mu.KotlinLogging import java.io.IOException import java.util.concurrent.TimeUnit import java.util.concurrent.TimeoutException +import java.util.concurrent.atomic.AtomicInteger class ConsumeConnectionManager( connectionName: String, rabbitMQConfiguration: RabbitMQConfiguration, connectionManagerConfiguration: ConnectionManagerConfiguration ) : ConnectionManager(connectionName, rabbitMQConfiguration, connectionManagerConfiguration) { - override fun getBlockedListener(): BlockedListener = object : BlockedListener { + private val subscriberName = if (connectionManagerConfiguration.subscriberName.isNullOrBlank()) { + (DEFAULT_SUBSCRIBER_NAME_PREFIX + System.currentTimeMillis()).also { + LOGGER.info { "Subscribers will use the default name: $it" } + } + } else { + connectionManagerConfiguration.subscriberName + "." + System.currentTimeMillis() + } + private val nextSubscriberId: AtomicInteger = AtomicInteger(1) + + override fun createBlockedListener(): BlockedListener = object : BlockedListener { override fun handleBlocked(reason: String) { LOGGER.info { "RabbitMQ blocked consume connection: $reason" } } @@ -52,25 +61,18 @@ class ConsumeConnectionManager( @Throws(IOException::class) fun queueDeclare(): String { - val holder = ChannelHolder({ this.createChannel() }, - { notifier: ShutdownNotifier, waitForRecovery: Boolean -> - this.waitForConnectionRecovery( - notifier, - waitForRecovery - ) - }, configurationToOptions() - ) - return holder.mapWithLock { channel: Channel -> - val queue = channel.queueDeclare( + val holder = ChannelHolder(this::createChannel, this::waitForConnectionRecovery, configurationToOptions()) + return holder.mapWithLock { channel -> + channel.queueDeclare( "", // queue name false, // durable true, // exclusive false, // autoDelete emptyMap() - ).queue - LOGGER.info { "Declared exclusive '$queue' queue" } - putChannelFor(PinId.forQueue(queue), holder) - queue + ).queue.also { queue -> + LOGGER.info { "Declared exclusive '$queue' queue" } + putChannelFor(PinId.forQueue(queue), holder) + } } } @@ -107,7 +109,7 @@ class ConsumeConnectionManager( override fun reject() { holder.withLock { ch: Channel -> try { - channel.basicReject(deliveryTag, false) + ch.basicReject(deliveryTag, false) } catch (e: IOException) { LOGGER.warn { "Error during basicReject of message with deliveryTag = $deliveryTag inside channel #${ch.channelNumber}: $e" } throw e @@ -213,5 +215,6 @@ class ConsumeConnectionManager( companion object { private val LOGGER = KotlinLogging.logger {} + private const val DEFAULT_SUBSCRIBER_NAME_PREFIX = "rabbit_mq_subscriber." } } \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/PublishConnectionManager.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/PublishConnectionManager.kt index 0cfb15cf..4e69e3e4 100644 --- a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/PublishConnectionManager.kt +++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/PublishConnectionManager.kt @@ -39,7 +39,7 @@ class PublishConnectionManager( } } - override fun getBlockedListener(): BlockedListener = object : BlockedListener { + override fun createBlockedListener(): BlockedListener = object : BlockedListener { override fun handleBlocked(reason: String) { isPublishingBlocked = true LOGGER.info { "RabbitMQ blocked publish connection: $reason" }