Skip to content

Commit

Permalink
after review
Browse files Browse the repository at this point in the history
  • Loading branch information
Oleg Smelov committed Jul 10, 2024
1 parent 0be84cf commit 8e20f14
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 35 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ release_version=5.14.0
kotlin_version=1.8.22
description='th2 common library (Java)'
vcs_url=https://github.com/th2-net/th2-common-j
kapt.include.compile.classpath=true
kapt.include.compile.classpath=false
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import com.rabbitmq.client.TopologyRecoveryException;
import com.rabbitmq.client.impl.AMQImpl;
import com.rabbitmq.client.impl.recovery.AutorecoveringChannel;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
Expand Down Expand Up @@ -66,7 +65,6 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
Expand All @@ -84,8 +82,6 @@ public abstract class ConnectionManager implements AutoCloseable {
protected final Map<PinId, ChannelHolder> channelsByPin = new ConcurrentHashMap<>();
private final AtomicReference<State> connectionState = new AtomicReference<>(State.OPEN);
private final ConnectionManagerConfiguration configuration;
protected final String subscriberName;
protected final AtomicInteger nextSubscriberId = new AtomicInteger(1);
private final ExecutorService sharedExecutor;
protected final ScheduledExecutorService channelChecker = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("channel-checker-%d").build()
Expand Down Expand Up @@ -117,14 +113,6 @@ public ConnectionManager(@NotNull String connectionName, @NotNull RabbitMQConfig
Objects.requireNonNull(rabbitMQConfiguration, "RabbitMQ configuration cannot be null");
this.configuration = Objects.requireNonNull(connectionManagerConfiguration, "Connection manager configuration can not be null");

String subscriberNameTmp = ObjectUtils.defaultIfNull(connectionManagerConfiguration.getSubscriberName(), connectionManagerConfiguration.getSubscriberName());
if (StringUtils.isBlank(subscriberNameTmp)) {
subscriberName = "rabbit_mq_subscriber." + System.currentTimeMillis();
LOGGER.info("Subscribers will use default name: {}", subscriberName);
} else {
subscriberName = subscriberNameTmp + "." + System.currentTimeMillis();
}

var factory = new ConnectionFactory();
var virtualHost = rabbitMQConfiguration.getVHost();
var username = rabbitMQConfiguration.getUsername();
Expand Down Expand Up @@ -316,10 +304,10 @@ private void addRecoveryListenerToConnection(Connection conn) {
}
}

protected abstract BlockedListener getBlockedListener();
protected abstract BlockedListener createBlockedListener();

private void addBlockedListenersToConnection(Connection conn) {
conn.addBlockedListener(getBlockedListener());
conn.addBlockedListener(createBlockedListener());
}

public boolean isOpen() {
Expand Down Expand Up @@ -406,8 +394,8 @@ private void shutdownExecutor(ExecutorService executor, int closeTimeout, String
}

protected static final class SubscriptionCallbacks {
protected final ManualAckDeliveryCallback deliverCallback;
protected final CancelCallback cancelCallback;
final ManualAckDeliveryCallback deliverCallback;
final CancelCallback cancelCallback;

public SubscriptionCallbacks(ManualAckDeliveryCallback deliverCallback, CancelCallback cancelCallback) {
this.deliverCallback = deliverCallback;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,29 @@ import com.rabbitmq.client.Channel
import com.rabbitmq.client.Delivery
import com.rabbitmq.client.CancelCallback
import com.rabbitmq.client.BlockedListener
import com.rabbitmq.client.ShutdownNotifier
import com.rabbitmq.client.ShutdownSignalException

import mu.KotlinLogging
import java.io.IOException
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicInteger

class ConsumeConnectionManager(
connectionName: String,
rabbitMQConfiguration: RabbitMQConfiguration,
connectionManagerConfiguration: ConnectionManagerConfiguration
) : ConnectionManager(connectionName, rabbitMQConfiguration, connectionManagerConfiguration) {
override fun getBlockedListener(): BlockedListener = object : BlockedListener {
private val subscriberName = if (connectionManagerConfiguration.subscriberName.isNullOrBlank()) {
(DEFAULT_SUBSCRIBER_NAME_PREFIX + System.currentTimeMillis()).also {
LOGGER.info { "Subscribers will use the default name: $it" }
}
} else {
connectionManagerConfiguration.subscriberName + "." + System.currentTimeMillis()
}
private val nextSubscriberId: AtomicInteger = AtomicInteger(1)

override fun createBlockedListener(): BlockedListener = object : BlockedListener {
override fun handleBlocked(reason: String) {
LOGGER.info { "RabbitMQ blocked consume connection: $reason" }
}
Expand All @@ -52,25 +61,18 @@ class ConsumeConnectionManager(

@Throws(IOException::class)
fun queueDeclare(): String {
val holder = ChannelHolder({ this.createChannel() },
{ notifier: ShutdownNotifier, waitForRecovery: Boolean ->
this.waitForConnectionRecovery(
notifier,
waitForRecovery
)
}, configurationToOptions()
)
return holder.mapWithLock { channel: Channel ->
val queue = channel.queueDeclare(
val holder = ChannelHolder(this::createChannel, this::waitForConnectionRecovery, configurationToOptions())
return holder.mapWithLock { channel ->
channel.queueDeclare(
"", // queue name
false, // durable
true, // exclusive
false, // autoDelete
emptyMap()
).queue
LOGGER.info { "Declared exclusive '$queue' queue" }
putChannelFor(PinId.forQueue(queue), holder)
queue
).queue.also { queue ->
LOGGER.info { "Declared exclusive '$queue' queue" }
putChannelFor(PinId.forQueue(queue), holder)
}
}
}

Expand Down Expand Up @@ -107,7 +109,7 @@ class ConsumeConnectionManager(
override fun reject() {
holder.withLock { ch: Channel ->
try {
channel.basicReject(deliveryTag, false)
ch.basicReject(deliveryTag, false)
} catch (e: IOException) {
LOGGER.warn { "Error during basicReject of message with deliveryTag = $deliveryTag inside channel #${ch.channelNumber}: $e" }
throw e
Expand Down Expand Up @@ -213,5 +215,6 @@ class ConsumeConnectionManager(

companion object {
private val LOGGER = KotlinLogging.logger {}
private const val DEFAULT_SUBSCRIBER_NAME_PREFIX = "rabbit_mq_subscriber."
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class PublishConnectionManager(
}
}

override fun getBlockedListener(): BlockedListener = object : BlockedListener {
override fun createBlockedListener(): BlockedListener = object : BlockedListener {
override fun handleBlocked(reason: String) {
isPublishingBlocked = true
LOGGER.info { "RabbitMQ blocked publish connection: $reason" }
Expand Down

0 comments on commit 8e20f14

Please sign in to comment.