diff --git a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java deleted file mode 100644 index 67a5883e0..000000000 --- a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java +++ /dev/null @@ -1,596 +0,0 @@ -/* - * Copyright 2020-2022 Exactpro (Exactpro Systems Limited) - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.exactpro.th2.common.schema.message.impl.rabbitmq.connection; - -import com.exactpro.th2.common.metrics.HealthMetrics; -import com.exactpro.th2.common.schema.message.SubscriberMonitor; -import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.ConnectionManagerConfiguration; -import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.RabbitMQConfiguration; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.rabbitmq.client.AMQP.BasicProperties; -import com.rabbitmq.client.BlockedListener; -import com.rabbitmq.client.CancelCallback; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.client.Consumer; -import com.rabbitmq.client.DeliverCallback; -import com.rabbitmq.client.ExceptionHandler; -import com.rabbitmq.client.Recoverable; -import com.rabbitmq.client.RecoveryListener; -import com.rabbitmq.client.ShutdownNotifier; -import com.rabbitmq.client.TopologyRecoveryException; -import com.rabbitmq.http.client.Client; -import com.rabbitmq.http.client.ClientParameters; -import com.rabbitmq.http.client.domain.BindingInfo; -import com.rabbitmq.http.client.domain.QueueInfo; - -import org.apache.commons.lang3.ObjectUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.builder.EqualsBuilder; -import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.apache.commons.lang3.builder.ToStringBuilder; -import org.apache.commons.lang3.builder.ToStringStyle; -import org.jetbrains.annotations.NotNull; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.URISyntaxException; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.function.BiConsumer; -import java.util.function.Function; -import java.util.function.Supplier; - -import static com.rabbitmq.http.client.domain.DestinationType.QUEUE; -import static java.util.Objects.requireNonNull; -import static java.util.stream.Collectors.groupingBy; -import static java.util.stream.Collectors.toList; -import static java.util.stream.Collectors.toMap; - -public class ConnectionManager implements AutoCloseable { - private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionManager.class); - private static final String RABBITMQ_MANAGEMENT_URL = "http://%s:15672/api/"; - - private final Connection connection; - private final Map channelsByPin = new ConcurrentHashMap<>(); - private final AtomicInteger connectionRecoveryAttempts = new AtomicInteger(0); - private final AtomicBoolean connectionIsClosed = new AtomicBoolean(false); - private final RabbitMQConfiguration rabbitMQConfiguration; - private final ConnectionManagerConfiguration connectionManagerConfiguration; - private final String subscriberName; - private final AtomicInteger nextSubscriberId = new AtomicInteger(1); - private final ExecutorService sharedExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() - .setNameFormat("rabbitmq-shared-pool-%d") - .build()); - private final Client client; - private final ScheduledExecutorService sizeCheckExecutor = Executors.newScheduledThreadPool(1); - private final Map> knownExchangesToRoutingKeys = new ConcurrentHashMap<>(); - - private final HealthMetrics metrics = new HealthMetrics(this); - - private final RecoveryListener recoveryListener = new RecoveryListener() { - @Override - public void handleRecovery(Recoverable recoverable) { - LOGGER.debug("Count tries to recovery connection reset to 0"); - connectionRecoveryAttempts.set(0); - metrics.getReadinessMonitor().enable(); - LOGGER.debug("Set RabbitMQ readiness to true"); - } - - @Override - public void handleRecoveryStarted(Recoverable recoverable) {} - }; - - public ConnectionManagerConfiguration getConnectionManagerConfiguration() { - return connectionManagerConfiguration; - } - - public ConnectionManager( - @NotNull RabbitMQConfiguration rabbitMQConfiguration, - @NotNull ConnectionManagerConfiguration connectionManagerConfiguration, - Runnable onFailedRecoveryConnection - ) { - this.rabbitMQConfiguration = requireNonNull(rabbitMQConfiguration, "RabbitMQ configuration cannot be null"); - this.connectionManagerConfiguration = requireNonNull(connectionManagerConfiguration, "Connection manager configuration can not be null"); - - String subscriberNameTmp = ObjectUtils.defaultIfNull(connectionManagerConfiguration.getSubscriberName(), rabbitMQConfiguration.getSubscriberName()); - if (StringUtils.isBlank(subscriberNameTmp)) { - subscriberName = "rabbit_mq_subscriber." + System.currentTimeMillis(); - LOGGER.info("Subscribers will use default name: {}", subscriberName); - } else { - subscriberName = subscriberNameTmp + "." + System.currentTimeMillis(); - } - - var factory = new ConnectionFactory(); - var virtualHost = rabbitMQConfiguration.getVHost(); - var username = rabbitMQConfiguration.getUsername(); - var password = rabbitMQConfiguration.getPassword(); - - factory.setHost(rabbitMQConfiguration.getHost()); - factory.setPort(rabbitMQConfiguration.getPort()); - - if (StringUtils.isNotBlank(virtualHost)) { - factory.setVirtualHost(virtualHost); - } - - if (StringUtils.isNotBlank(username)) { - factory.setUsername(username); - } - - if (StringUtils.isNotBlank(password)) { - factory.setPassword(password); - } - - if (connectionManagerConfiguration.getConnectionTimeout() > 0) { - factory.setConnectionTimeout(connectionManagerConfiguration.getConnectionTimeout()); - } - - factory.setExceptionHandler(new ExceptionHandler() { - @Override - public void handleUnexpectedConnectionDriverException(Connection conn, Throwable exception) { - turnOffReadiness(exception); - } - - @Override - public void handleReturnListenerException(Channel channel, Throwable exception) { - turnOffReadiness(exception); - } - - @Override - public void handleConfirmListenerException(Channel channel, Throwable exception) { - turnOffReadiness(exception); - } - - @Override - public void handleBlockedListenerException(Connection connection, Throwable exception) { - turnOffReadiness(exception); - } - - @Override - public void handleConsumerException(Channel channel, Throwable exception, Consumer consumer, String consumerTag, String methodName) { - turnOffReadiness(exception); - } - - @Override - public void handleConnectionRecoveryException(Connection conn, Throwable exception) { - turnOffReadiness(exception); - } - - @Override - public void handleChannelRecoveryException(Channel ch, Throwable exception) { - turnOffReadiness(exception); - } - - @Override - public void handleTopologyRecoveryException(Connection conn, Channel ch, TopologyRecoveryException exception) { - turnOffReadiness(exception); - } - - private void turnOffReadiness(Throwable exception){ - metrics.getReadinessMonitor().disable(); - LOGGER.debug("Set RabbitMQ readiness to false. RabbitMQ error", exception); - } - }); - - factory.setAutomaticRecoveryEnabled(true); - factory.setConnectionRecoveryTriggeringCondition(shutdownSignal -> { - if (connectionIsClosed.get()) { - return false; - } - - int tmpCountTriesToRecovery = connectionRecoveryAttempts.get(); - - if (tmpCountTriesToRecovery < connectionManagerConfiguration.getMaxRecoveryAttempts()) { - LOGGER.info("Try to recovery connection to RabbitMQ. Count tries = {}", tmpCountTriesToRecovery + 1); - return true; - } - LOGGER.error("Can not connect to RabbitMQ. Count tries = {}", tmpCountTriesToRecovery); - if (onFailedRecoveryConnection != null) { - onFailedRecoveryConnection.run(); - } else { - // TODO: we should stop the execution of the application. Don't use System.exit!!! - throw new IllegalStateException("Cannot recover connection to RabbitMQ"); - } - return false; - }); - - factory.setRecoveryDelayHandler(recoveryAttempts -> { - int tmpCountTriesToRecovery = connectionRecoveryAttempts.getAndIncrement(); - - int recoveryDelay = connectionManagerConfiguration.getMinConnectionRecoveryTimeout() - + (connectionManagerConfiguration.getMaxRecoveryAttempts() > 1 - ? (connectionManagerConfiguration.getMaxConnectionRecoveryTimeout() - connectionManagerConfiguration.getMinConnectionRecoveryTimeout()) - / (connectionManagerConfiguration.getMaxRecoveryAttempts() - 1) - * tmpCountTriesToRecovery - : 0); - - LOGGER.info("Recovery delay for '{}' try = {}", tmpCountTriesToRecovery, recoveryDelay); - return recoveryDelay; - } - ); - factory.setSharedExecutor(sharedExecutor); - - try { - this.connection = factory.newConnection(); - client = new Client( - new ClientParameters() - .url(String.format(RABBITMQ_MANAGEMENT_URL, rabbitMQConfiguration.getHost())) - .username(rabbitMQConfiguration.getUsername()) - .password(rabbitMQConfiguration.getPassword()) - ); - metrics.getReadinessMonitor().enable(); - LOGGER.debug("Set RabbitMQ readiness to true"); - } catch (IOException | TimeoutException | URISyntaxException e) { - metrics.getReadinessMonitor().disable(); - LOGGER.debug("Set RabbitMQ readiness to false. Can not create connection", e); - throw new IllegalStateException("Failed to create RabbitMQ connection using configuration", e); - } - - this.connection.addBlockedListener(new BlockedListener() { - @Override - public void handleBlocked(String reason) throws IOException { - LOGGER.warn("RabbitMQ blocked connection: {}", reason); - } - - @Override - public void handleUnblocked() throws IOException { - LOGGER.warn("RabbitMQ unblocked connection"); - } - }); - - if (this.connection instanceof Recoverable) { - Recoverable recoverableConnection = (Recoverable) this.connection; - recoverableConnection.addRecoveryListener(recoveryListener); - LOGGER.debug("Recovery listener was added to connection."); - } else { - throw new IllegalStateException("Connection does not implement Recoverable. Can not add RecoveryListener to it"); - } - - sizeCheckExecutor.scheduleAtFixedRate( - this::lockSendingIfSizeLimitExceeded, - connectionManagerConfiguration.getSecondsToCheckVirtualPublishLimit(), // TODO another initial delay? - connectionManagerConfiguration.getSecondsToCheckVirtualPublishLimit(), - TimeUnit.SECONDS - ); - } - - public boolean isOpen() { - return connection.isOpen() && !connectionIsClosed.get(); - } - - @Override - public void close() { - if (connectionIsClosed.getAndSet(true)) { - return; - } - - int closeTimeout = connectionManagerConfiguration.getConnectionCloseTimeout(); - if (connection.isOpen()) { - try { - // We close the connection and don't close channels - // because when a channel's connection is closed, so is the channel - connection.close(closeTimeout); - } catch (IOException e) { - LOGGER.error("Cannot close connection", e); - } - } - - shutdownExecutor(sharedExecutor, closeTimeout); - shutdownExecutor(sizeCheckExecutor, closeTimeout); - } - - public void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException { - knownExchangesToRoutingKeys.computeIfAbsent(exchange, e -> new HashSet<>()).add(routingKey); - getChannelFor(PinId.forRoutingKey(routingKey)).publishWithLocks(exchange, routingKey, props, body); - } - - public SubscriberMonitor basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException { - ChannelHolder holder = getChannelFor(PinId.forQueue(queue)); - String tag = holder.mapWithLock(channel -> { - return channel.basicConsume(queue, false, subscriberName + "_" + nextSubscriberId.getAndIncrement(), (tagTmp, delivery) -> { - try { - try { - deliverCallback.handle(tagTmp, delivery); - } finally { - holder.withLock(ch -> basicAck(ch, delivery.getEnvelope().getDeliveryTag())); - } - } catch (IOException | RuntimeException e) { - LOGGER.error("Cannot handle delivery for tag {}: {}", tagTmp, e.getMessage(), e); - } - }, cancelCallback); - }); - - return new RabbitMqSubscriberMonitor(holder, tag, this::basicCancel); - } - - private void basicCancel(Channel channel, String consumerTag) throws IOException { - channel.basicCancel(consumerTag); - } - - private void shutdownExecutor(ExecutorService executor, int closeTimeout) { - executor.shutdown(); - try { - if (!executor.awaitTermination(closeTimeout, TimeUnit.MILLISECONDS)) { - LOGGER.error("Executor is not terminated during {} millis", closeTimeout); - List runnables = executor.shutdownNow(); - LOGGER.error("{} task(s) was(were) not finished", runnables.size()); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - - private ChannelHolder getChannelFor(PinId pinId) { - return channelsByPin.computeIfAbsent(pinId, ignore -> { - LOGGER.trace("Creating channel holder for {}", pinId); - return new ChannelHolder(this::createChannel, this::waitForConnectionRecovery); - }); - } - - private Channel createChannel() { - waitForConnectionRecovery(connection); - - try { - Channel channel = connection.createChannel(); - requireNonNull(channel, () -> "No channels are available in the connection. Max channel number: " + connection.getChannelMax()); - channel.basicQos(connectionManagerConfiguration.getPrefetchCount()); - channel.addReturnListener(ret -> - LOGGER.warn("Can not router message to exchange '{}', routing key '{}'. Reply code '{}' and text = {}", ret.getExchange(), ret.getRoutingKey(), ret.getReplyCode(), ret.getReplyText())); - return channel; - } catch (IOException e) { - throw new IllegalStateException("Can not create channel", e); - } - } - - private void waitForConnectionRecovery(ShutdownNotifier notifier) { - waitForConnectionRecovery(notifier, true); - } - - private void waitForConnectionRecovery(ShutdownNotifier notifier, boolean waitForRecovery) { - if (isConnectionRecovery(notifier)) { - if (waitForRecovery) { - waitForRecovery(notifier); - } else { - LOGGER.warn("Skip waiting for connection recovery"); - } - } - - if (connectionIsClosed.get()) { - throw new IllegalStateException("Connection is already closed"); - } - } - - private void waitForRecovery(ShutdownNotifier notifier) { - LOGGER.warn("Start waiting for connection recovery"); - while (isConnectionRecovery(notifier)) { - try { - Thread.sleep(1); - } catch (InterruptedException e) { - LOGGER.error("Wait for connection recovery was interrupted", e); - break; - } - } - LOGGER.info("Stop waiting for connection recovery"); - } - - private boolean isConnectionRecovery(ShutdownNotifier notifier) { - return !notifier.isOpen() && !connectionIsClosed.get(); - } - - /** - * @param channel pass channel witch used for basicConsume, because delivery tags are scoped per channel, - * deliveries must be acknowledged on the same channel they were received on. - * @throws IOException - */ - private void basicAck(Channel channel, long deliveryTag) throws IOException { - channel.basicAck(deliveryTag, false); - } - - public void lockSendingIfSizeLimitExceeded() { - try { - Map> routingKeyToBindings = knownExchangesToRoutingKeys.entrySet().stream() - .flatMap(entry -> { - String exchange = entry.getKey(); - Set routingKeys = entry.getValue(); - return client.getBindingsBySource(rabbitMQConfiguration.getVHost(), exchange).stream() - .filter(it -> it.getDestinationType() == QUEUE && routingKeys.contains(it.getRoutingKey())); - }) - .collect(groupingBy(BindingInfo::getRoutingKey)); - Map queueNameToInfo = client.getQueues().stream() - .collect(toMap(QueueInfo::getName, Function.identity())); - routingKeyToBindings.forEach((routingKey, bindings) -> { - QueuesWithVirtualPublishLimit queuesWithVirtualPublishLimit = new QueuesWithVirtualPublishLimit( - bindings.stream().map(bindingInfo -> queueNameToInfo.get(bindingInfo.getDestination())).collect(toList()), - connectionManagerConfiguration.getVirtualPublishLimit() - ); - ChannelHolder holder = getChannelFor(PinId.forRoutingKey(routingKey)); - if (queuesWithVirtualPublishLimit.isExceeded()) { - if (!holder.sizeLimitLock.isLocked()) { - holder.sizeLimitLock.lock(); - if (LOGGER.isInfoEnabled()) { - LOGGER.info( - "Sending via routing key '{}' is paused because there are {}", - routingKey, - queuesWithVirtualPublishLimit.getSizeDetails() - ); - } - } - } else { - if (holder.sizeLimitLock.isLocked()) { - holder.sizeLimitLock.unlock(); - if (LOGGER.isInfoEnabled()) { - LOGGER.info( - "Sending via routing key '{}' is resumed. There are {}", - routingKey, - queuesWithVirtualPublishLimit.getSizeDetails() - ); - } - } - } - }); - } catch (Throwable t) { - LOGGER.error("Error during check queue sizes", t); - } - } - - private static class RabbitMqSubscriberMonitor implements SubscriberMonitor { - - private final ChannelHolder holder; - private final String tag; - private final CancelAction action; - - public RabbitMqSubscriberMonitor(ChannelHolder holder, String tag, - CancelAction action) { - this.holder = holder; - this.tag = tag; - this.action = action; - } - - @Override - public void unsubscribe() throws Exception { - holder.withLock(false, channel -> action.execute(channel, tag)); - } - } - - private interface CancelAction { - void execute(Channel channel, String tag) throws IOException; - } - - private static class PinId { - private final String routingKey; - private final String queue; - - public static PinId forRoutingKey(String routingKey) { - return new PinId(routingKey, null); - } - - public static PinId forQueue(String queue) { - return new PinId(null, queue); - } - - private PinId(String routingKey, String queue) { - if (routingKey == null && queue == null) { - throw new NullPointerException("Either routingKey or queue must be set"); - } - this.routingKey = routingKey; - this.queue = queue; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - - if (o == null || getClass() != o.getClass()) return false; - - PinId pinId = (PinId) o; - - return new EqualsBuilder().append(routingKey, pinId.routingKey).append(queue, pinId.queue).isEquals(); - } - - @Override - public int hashCode() { - return new HashCodeBuilder(17, 37).append(routingKey).append(queue).toHashCode(); - } - - @Override - public String toString() { - return new ToStringBuilder(this, ToStringStyle.JSON_STYLE) - .append("routingKey", routingKey) - .append("queue", queue) - .toString(); - } - } - - private static class ChannelHolder { - private final ReentrantLock sizeLimitLock = new ReentrantLock(); - private final Lock lock = new ReentrantLock(); - private final Supplier supplier; - private final BiConsumer reconnectionChecker; - private Channel channel; - - public ChannelHolder( - Supplier supplier, - BiConsumer reconnectionChecker - ) { - this.supplier = requireNonNull(supplier, "'Supplier' parameter"); - this.reconnectionChecker = requireNonNull(reconnectionChecker, "'Reconnection checker' parameter"); - } - - public void publishWithLocks(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException { - sizeLimitLock.lock(); - try { - withLock(true, channel -> channel.basicPublish(exchange, routingKey, props, body)); - } finally { - sizeLimitLock.unlock(); - } - } - - public void withLock(ChannelConsumer consumer) throws IOException { - withLock(true, consumer); - } - - public void withLock(boolean waitForRecovery, ChannelConsumer consumer) throws IOException { - lock.lock(); - try { - consumer.consume(getChannel(waitForRecovery)); - } finally { - lock.unlock(); - } - } - - public T mapWithLock(ChannelMapper mapper) throws IOException { - lock.lock(); - try { - return mapper.map(getChannel()); - } finally { - lock.unlock(); - } - } - private Channel getChannel() { - return getChannel(true); - } - - - private Channel getChannel(boolean waitForRecovery) { - if (channel == null) { - channel = supplier.get(); - } - reconnectionChecker.accept(channel, waitForRecovery); - return channel; - } - } - - private interface ChannelMapper { - T map(Channel channel) throws IOException; - } - - private interface ChannelConsumer { - void consume(Channel channel) throws IOException; - } -} diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.kt new file mode 100644 index 000000000..174cc5a52 --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.kt @@ -0,0 +1,521 @@ +/* + * Copyright 2020-2022 Exactpro (Exactpro Systems Limited) + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.exactpro.th2.common.schema.message.impl.rabbitmq.connection + +import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.RabbitMQConfiguration +import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.ConnectionManagerConfiguration +import java.lang.Runnable +import java.lang.AutoCloseable +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors +import com.exactpro.th2.common.metrics.HealthMetrics +import com.rabbitmq.client.RecoveryListener +import com.rabbitmq.client.Recoverable +import com.rabbitmq.client.ConnectionFactory +import com.rabbitmq.client.TopologyRecoveryException +import com.rabbitmq.client.ShutdownSignalException +import java.lang.IllegalStateException +import com.rabbitmq.client.RecoveryDelayHandler +import com.rabbitmq.http.client.ClientParameters +import java.io.IOException +import java.net.URISyntaxException +import com.rabbitmq.client.BlockedListener +import kotlin.Throws +import java.util.concurrent.TimeUnit +import com.rabbitmq.client.AMQP +import com.rabbitmq.client.DeliverCallback +import com.rabbitmq.client.CancelCallback +import com.exactpro.th2.common.schema.message.SubscriberMonitor +import java.lang.RuntimeException +import java.lang.InterruptedException +import java.util.function.BiConsumer +import com.rabbitmq.client.ShutdownNotifier +import com.google.common.util.concurrent.ThreadFactoryBuilder +import com.rabbitmq.client.Channel +import com.rabbitmq.client.Connection +import com.rabbitmq.client.Consumer +import com.rabbitmq.client.ExceptionHandler +import com.rabbitmq.http.client.Client +import com.rabbitmq.http.client.domain.DestinationType +import com.rabbitmq.http.client.domain.QueueInfo +import mu.KotlinLogging +import org.apache.commons.lang3.builder.EqualsBuilder +import org.apache.commons.lang3.builder.HashCodeBuilder +import org.apache.commons.lang3.builder.ToStringBuilder +import org.apache.commons.lang3.builder.ToStringStyle +import java.lang.NullPointerException +import java.util.Objects +import java.util.concurrent.TimeoutException +import java.util.concurrent.locks.Lock +import java.util.concurrent.locks.ReentrantLock +import java.util.function.Supplier + +class ConnectionManager( + private val rabbitMQConfiguration: RabbitMQConfiguration, + val connectionManagerConfiguration: ConnectionManagerConfiguration, + onFailedRecoveryConnection: Runnable?, +) : AutoCloseable { + private val connection: Connection + private val channelsByPin: MutableMap = ConcurrentHashMap() + private val connectionRecoveryAttempts = AtomicInteger(0) + private val connectionIsClosed = AtomicBoolean(false) + + private val subscriberName: String + private val nextSubscriberId = AtomicInteger(1) + private val sharedExecutor = Executors.newSingleThreadExecutor(ThreadFactoryBuilder() + .setNameFormat("rabbitmq-shared-pool-%d") + .build()) + private val client: Client + private val sizeCheckExecutor = Executors.newScheduledThreadPool(1) + private val knownExchangesToRoutingKeys: MutableMap> = ConcurrentHashMap() + private val metrics = HealthMetrics(this) + private val recoveryListener: RecoveryListener = object : RecoveryListener { + override fun handleRecovery(recoverable: Recoverable) { + LOGGER.debug("Count tries to recovery connection reset to 0") + connectionRecoveryAttempts.set(0) + metrics.readinessMonitor.enable() + LOGGER.debug("Set RabbitMQ readiness to true") + } + + override fun handleRecoveryStarted(recoverable: Recoverable) {} + } + + init { + val subscriberNameTmp: String? = connectionManagerConfiguration.subscriberName + ?: rabbitMQConfiguration.subscriberName + if (subscriberNameTmp == null || subscriberNameTmp.isBlank()) { + subscriberName = "rabbit_mq_subscriber." + System.currentTimeMillis() + LOGGER.info("Subscribers will use default name: {}", subscriberName) + } else { + subscriberName = subscriberNameTmp + "." + System.currentTimeMillis() + } + val factory = ConnectionFactory().apply { + host = rabbitMQConfiguration.host + port = rabbitMQConfiguration.port + } + val virtualHost = rabbitMQConfiguration.vHost + if (virtualHost.isNotBlank()) { + factory.virtualHost = virtualHost + } + val username = rabbitMQConfiguration.username + if (username.isNotBlank()) { + factory.username = username + } + val password = rabbitMQConfiguration.password + if (password.isNotBlank()) { + factory.password = password + } + if (connectionManagerConfiguration.connectionTimeout > 0) { + factory.connectionTimeout = connectionManagerConfiguration.connectionTimeout + } + factory.exceptionHandler = object : ExceptionHandler { + override fun handleUnexpectedConnectionDriverException(conn: Connection, exception: Throwable) { + turnOffReadiness(exception) + } + + override fun handleReturnListenerException(channel: Channel, exception: Throwable) { + turnOffReadiness(exception) + } + + override fun handleConfirmListenerException(channel: Channel, exception: Throwable) { + turnOffReadiness(exception) + } + + override fun handleBlockedListenerException(connection: Connection, exception: Throwable) { + turnOffReadiness(exception) + } + + override fun handleConsumerException( + channel: Channel, + exception: Throwable, + consumer: Consumer, + consumerTag: String, + methodName: String, + ) { + turnOffReadiness(exception) + } + + override fun handleConnectionRecoveryException(conn: Connection, exception: Throwable) { + turnOffReadiness(exception) + } + + override fun handleChannelRecoveryException(ch: Channel, exception: Throwable) { + turnOffReadiness(exception) + } + + override fun handleTopologyRecoveryException( + conn: Connection, + ch: Channel, + exception: TopologyRecoveryException, + ) { + turnOffReadiness(exception) + } + + private fun turnOffReadiness(exception: Throwable) { + metrics.readinessMonitor.disable() + LOGGER.debug("Set RabbitMQ readiness to false. RabbitMQ error", exception) + } + } + factory.isAutomaticRecoveryEnabled = true + factory.setConnectionRecoveryTriggeringCondition { shutdownSignal: ShutdownSignalException? -> + if (connectionIsClosed.get()) { + return@setConnectionRecoveryTriggeringCondition false + } + val tmpCountTriesToRecovery = connectionRecoveryAttempts.get() + if (tmpCountTriesToRecovery < connectionManagerConfiguration.maxRecoveryAttempts) { + LOGGER.info("Try to recovery connection to RabbitMQ. Count tries = {}", tmpCountTriesToRecovery + 1) + return@setConnectionRecoveryTriggeringCondition true + } + LOGGER.error("Can not connect to RabbitMQ. Count tries = {}", tmpCountTriesToRecovery) + if (onFailedRecoveryConnection != null) { + onFailedRecoveryConnection.run() + } else { + // TODO: we should stop the execution of the application. Don't use System.exit!!! + throw IllegalStateException("Cannot recover connection to RabbitMQ") + } + false + } + factory.recoveryDelayHandler = RecoveryDelayHandler { recoveryAttempts: Int -> + val tmpCountTriesToRecovery = connectionRecoveryAttempts.getAndIncrement() + val recoveryDelay = (connectionManagerConfiguration.minConnectionRecoveryTimeout + + if (connectionManagerConfiguration.maxRecoveryAttempts > 1) (connectionManagerConfiguration.maxConnectionRecoveryTimeout - connectionManagerConfiguration.minConnectionRecoveryTimeout) + / (connectionManagerConfiguration.maxRecoveryAttempts - 1) + * tmpCountTriesToRecovery else 0) + LOGGER.info("Recovery delay for '{}' try = {}", tmpCountTriesToRecovery, recoveryDelay) + recoveryDelay.toLong() + } + factory.setSharedExecutor(sharedExecutor) + try { + connection = factory.newConnection() + client = Client( + ClientParameters() + .url(String.format(RABBITMQ_MANAGEMENT_URL, rabbitMQConfiguration.host)) + .username(rabbitMQConfiguration.username) + .password(rabbitMQConfiguration.password) + ) + metrics.readinessMonitor.enable() + LOGGER.debug("Set RabbitMQ readiness to true") + } catch (e: IOException) { + metrics.readinessMonitor.disable() + LOGGER.debug("Set RabbitMQ readiness to false. Can not create connection", e) + throw IllegalStateException("Failed to create RabbitMQ connection using configuration", e) + } catch (e: TimeoutException) { + metrics.readinessMonitor.disable() + LOGGER.debug("Set RabbitMQ readiness to false. Can not create connection", e) + throw IllegalStateException("Failed to create RabbitMQ connection using configuration", e) + } catch (e: URISyntaxException) { + metrics.readinessMonitor.disable() + LOGGER.debug("Set RabbitMQ readiness to false. Can not create connection", e) + throw IllegalStateException("Failed to create RabbitMQ connection using configuration", e) + } + connection.addBlockedListener(object : BlockedListener { + @Throws(IOException::class) + override fun handleBlocked(reason: String) { + LOGGER.warn("RabbitMQ blocked connection: {}", reason) + } + + @Throws(IOException::class) + override fun handleUnblocked() { + LOGGER.warn("RabbitMQ unblocked connection") + } + }) + if (connection is Recoverable) { + (connection as Recoverable).apply { + addRecoveryListener(recoveryListener) + } + LOGGER.debug("Recovery listener was added to connection.") + } else { + throw IllegalStateException("Connection does not implement Recoverable. Can not add RecoveryListener to it") + } + sizeCheckExecutor.scheduleAtFixedRate( + ::lockSendingIfSizeLimitExceeded, + connectionManagerConfiguration.secondsToCheckVirtualPublishLimit.toLong(), // TODO another initial delay? + connectionManagerConfiguration.secondsToCheckVirtualPublishLimit.toLong(), + TimeUnit.SECONDS + ) + } + + override fun close() { + if (connectionIsClosed.getAndSet(true)) { + return + } + val closeTimeout = connectionManagerConfiguration.connectionCloseTimeout + if (connection.isOpen) { + try { + // We close the connection and don't close channels + // because when a channel's connection is closed, so is the channel + connection.close(closeTimeout) + } catch (e: IOException) { + LOGGER.error("Cannot close connection", e) + } + } + shutdownExecutor(sharedExecutor, closeTimeout) + shutdownExecutor(sizeCheckExecutor, closeTimeout) + } + + @Throws(IOException::class) + fun basicPublish(exchange: String, routingKey: String, props: AMQP.BasicProperties?, body: ByteArray?) { + knownExchangesToRoutingKeys.computeIfAbsent(exchange) { mutableSetOf() }.add(routingKey) + getChannelFor(PinId.forRoutingKey(routingKey)).publishWithLocks(exchange, routingKey, props, body) + } + + @Throws(IOException::class) + fun basicConsume( + queue: String, + deliverCallback: DeliverCallback, + cancelCallback: CancelCallback?, + ): SubscriberMonitor { + val holder = getChannelFor(PinId.forQueue(queue)) + val tag = holder.mapWithLock { + it.basicConsume( + queue, + false, + subscriberName + "_" + nextSubscriberId.getAndIncrement(), + { tagTmp, delivery -> + try { + try { + deliverCallback.handle(tagTmp, delivery) + } finally { + holder.withLock { channel -> + basicAck(channel, delivery.envelope.deliveryTag) + } + } + } catch (e: IOException) { + LOGGER.error("Cannot handle delivery for tag {}: {}", tagTmp, e.message, e) + } catch (e: RuntimeException) { + LOGGER.error("Cannot handle delivery for tag {}: {}", tagTmp, e.message, e) + } + }, + cancelCallback + ) + } + return SubscriberMonitor { holder.withLock(false) { it.basicCancel(tag) } } + } + + private fun shutdownExecutor(executor: ExecutorService, closeTimeout: Int) { + executor.shutdown() + try { + if (!executor.awaitTermination(closeTimeout.toLong(), TimeUnit.MILLISECONDS)) { + LOGGER.error("Executor is not terminated during {} millis", closeTimeout) + val runnables = executor.shutdownNow() + LOGGER.error("{} task(s) was(were) not finished", runnables.size) + } + } catch (e: InterruptedException) { + Thread.currentThread().interrupt() + } + } + + private fun getChannelFor(pinId: PinId) = channelsByPin.computeIfAbsent(pinId) { + LOGGER.trace("Creating channel holder for {}", pinId) + ChannelHolder(::createChannel, ::waitForConnectionRecovery) + } + + private fun createChannel(): Channel { + waitForConnectionRecovery(connection) + return try { + val channel = connection.createChannel() + Objects.requireNonNull(channel) { "No channels are available in the connection. Max channel number: " + connection.channelMax } + channel.basicQos(connectionManagerConfiguration.prefetchCount) + channel.addReturnListener { ret -> + LOGGER.warn("Can not router message to exchange '{}', routing key '{}'. Reply code '{}' and text = {}", + ret.exchange, + ret.routingKey, + ret.replyCode, + ret.replyText) + } + channel + } catch (e: IOException) { + throw IllegalStateException("Can not create channel", e) + } + } + + private fun waitForConnectionRecovery(notifier: ShutdownNotifier, waitForRecovery: Boolean = true) { + if (isConnectionRecovery(notifier)) { + if (waitForRecovery) { + waitForRecovery(notifier) + } else { + LOGGER.warn("Skip waiting for connection recovery") + } + } + check(!connectionIsClosed.get()) { "Connection is already closed" } + } + + private fun waitForRecovery(notifier: ShutdownNotifier) { + LOGGER.warn("Start waiting for connection recovery") + while (isConnectionRecovery(notifier)) { + try { + Thread.sleep(1) + } catch (e: InterruptedException) { + LOGGER.error("Wait for connection recovery was interrupted", e) + break + } + } + LOGGER.info("Stop waiting for connection recovery") + } + + private fun isConnectionRecovery(notifier: ShutdownNotifier): Boolean { + return !notifier.isOpen && !connectionIsClosed.get() + } + + /** + * @param channel pass channel witch used for basicConsume, because delivery tags are scoped per channel, + * deliveries must be acknowledged on the same channel they were received on. + * @throws IOException + */ + @Throws(IOException::class) + private fun basicAck(channel: Channel, deliveryTag: Long) { + channel.basicAck(deliveryTag, false) + } + + fun lockSendingIfSizeLimitExceeded() { + try { + val queueNameToInfo = client.queues.associateBy { it.name } + knownExchangesToRoutingKeys.entries.asSequence() + .flatMap { (exchange, routingKeys) -> + client.getBindingsBySource(rabbitMQConfiguration.vHost, exchange).asSequence() + .filter { it.destinationType == DestinationType.QUEUE && routingKeys.contains(it.routingKey) } + } + .groupBy { it.routingKey } + .forEach { (routingKey, bindings) -> + val queues = bindings.asSequence().mapNotNull { queueNameToInfo[it.destination] } + val limit = connectionManagerConfiguration.virtualPublishLimit + val holder = getChannelFor(PinId.forRoutingKey(routingKey)) + if (queues.sumOf { it.totalMessages } > limit) { + if (!holder.sizeLimitLock.isLocked) { + holder.sizeLimitLock.lock() + LOGGER.info { "Sending via routing key '$routingKey' is paused because there are ${queues.sizeDetails()}. Virtual publish limit is $limit" } + } + } else { + if (holder.sizeLimitLock.isLocked) { + holder.sizeLimitLock.unlock() + LOGGER.info { "Sending via routing key '$routingKey' is resumed. There are ${queues.sizeDetails()}. Virtual publish limit is $limit" } + } + } + } + } catch (t: Throwable) { + LOGGER.error("Error during check queue sizes", t) + } + } + + private class PinId private constructor(routingKey: String?, queue: String?) { + private val routingKey: String? + private val queue: String? + + init { + if (routingKey == null && queue == null) { + throw NullPointerException("Either routingKey or queue must be set") + } + this.routingKey = routingKey + this.queue = queue + } + + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (other == null || javaClass != other.javaClass) return false + val pinId = other as PinId + return EqualsBuilder() + .append(routingKey, pinId.routingKey) + .append(queue, pinId.queue) + .isEquals + } + + override fun hashCode(): Int = HashCodeBuilder(17, 37) + .append(routingKey) + .append(queue) + .toHashCode() + + override fun toString(): String = ToStringBuilder(this, ToStringStyle.JSON_STYLE) + .append("routingKey", routingKey) + .append("queue", queue) + .toString() + + companion object { + fun forRoutingKey(routingKey: String): PinId { + return PinId(routingKey, null) + } + + fun forQueue(queue: String): PinId { + return PinId(null, queue) + } + } + } + + private class ChannelHolder( + private val supplier: Supplier, + private val reconnectionChecker: BiConsumer, + ) { + val sizeLimitLock = ReentrantLock() + private val lock: Lock = ReentrantLock() + private var channel: Channel? = null + + @Throws(IOException::class) + fun publishWithLocks(exchange: String, routingKey: String, props: AMQP.BasicProperties?, body: ByteArray?) { + sizeLimitLock.lock() + try { + withLock(true) { it.basicPublish(exchange, routingKey, props, body) } + } finally { + sizeLimitLock.unlock() + } + } + + @Throws(IOException::class) + fun withLock(consumer: (Channel) -> Unit) { + withLock(true, consumer) + } + + @Throws(IOException::class) + fun withLock(waitForRecovery: Boolean, consumer: (Channel) -> Unit) { + lock.lock() + try { + consumer(getChannel(waitForRecovery)) + } finally { + lock.unlock() + } + } + + @Throws(IOException::class) + fun mapWithLock(mapper: (Channel) -> T): T { + lock.lock() + return try { + mapper(getChannel()) + } finally { + lock.unlock() + } + } + + private fun getChannel(): Channel { + return getChannel(true) + } + + private fun getChannel(waitForRecovery: Boolean): Channel { + if (channel == null) { + channel = supplier.get() + } + reconnectionChecker.accept(channel!!, waitForRecovery) + return channel!! + } + } + + companion object { + private val LOGGER = KotlinLogging.logger {} + private const val RABBITMQ_MANAGEMENT_URL = "http://%s:15672/api/" + private fun Sequence.sizeDetails() = joinToString { + "${it.totalMessages} message(s) in '${it.name}'" + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/QueuesWithVirtualPublishLimit.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/QueuesWithVirtualPublishLimit.kt deleted file mode 100644 index 268909a10..000000000 --- a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/QueuesWithVirtualPublishLimit.kt +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright 2022 Exactpro (Exactpro Systems Limited) - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.exactpro.th2.common.schema.message.impl.rabbitmq.connection - -import com.rabbitmq.http.client.domain.QueueInfo - -class QueuesWithVirtualPublishLimit(queues: List, virtualPublishLimit: Long) { - val isExceeded = queues.sumOf { it.totalMessages } > virtualPublishLimit - val sizeDetails by lazy { - queues.joinToString { "${it.totalMessages} message(s) in '${it.name}'" } + ". Virtual publish limit is $virtualPublishLimit" - } -} \ No newline at end of file diff --git a/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 000000000..ca6ee9cea --- /dev/null +++ b/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1 @@ +mock-maker-inline \ No newline at end of file