Skip to content

Commit

Permalink
HealthMetrics fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Oleg Smelov committed Jul 4, 2024
1 parent cdfbbd0 commit 492c387
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,11 @@ public class ConnectionManager implements AutoCloseable {
public static final String EMPTY_ROUTING_KEY = "";
private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionManager.class);

enum ConnectionType {
PUBLISH("-publish"),
CONSUME("-consume");
private enum ConnectionType {
PUBLISH("publish"),
CONSUME("consume");

private final String nameSuffix;
private final HealthMetrics metrics = new HealthMetrics(this);

ConnectionType(String nameSuffix) {
this.nameSuffix = nameSuffix;
Expand All @@ -101,14 +100,12 @@ enum ConnectionType {
public String getNameSuffix() {
return nameSuffix;
}

public HealthMetrics getMetrics() {
return metrics;
}
}

private final Connection publishConnection;
private final Connection consumeConnection;
private final HealthMetrics publishMetrics = new HealthMetrics(this, ConnectionType.PUBLISH.getNameSuffix());
private final HealthMetrics consumeMetrics = new HealthMetrics(this, ConnectionType.CONSUME.getNameSuffix());
private final Map<PinId, ChannelHolder> channelsByPin = new ConcurrentHashMap<>();
private final AtomicReference<State> connectionState = new AtomicReference<>(State.OPEN);
private final ConnectionManagerConfiguration configuration;
Expand Down Expand Up @@ -174,7 +171,7 @@ public ConnectionManager(@NotNull String connectionName, @NotNull RabbitMQConfig
}

private Connection createConnection(ConnectionFactory factory, String connectionName, ConnectionType connectionType) {
HealthMetrics metrics = connectionType.getMetrics();
HealthMetrics metrics = connectionType == ConnectionType.PUBLISH ? publishMetrics : consumeMetrics;

factory.setExceptionHandler(new ExceptionHandler() {
@Override
Expand Down Expand Up @@ -245,7 +242,7 @@ private void turnOffReadiness(Throwable exception) {
Connection connection;

try {
connection = factory.newConnection(connectionName + connectionType.getNameSuffix());
connection = factory.newConnection(connectionName + '-' + connectionType.getNameSuffix());
LOGGER.info("Created RabbitMQ connection {} [{}]", connection, connection.hashCode());
addShutdownListenerToConnection(connection);
addBlockedListenersToConnection(connection);
Expand Down Expand Up @@ -491,7 +488,7 @@ public void reject() throws IOException {
LOGGER.warn("Error during basicReject of message with deliveryTag = {} inside channel #{}: {}", deliveryTag, ch.getChannelNumber(), e);
throw e;
} finally {
holder.release(() -> ConnectionType.CONSUME.metrics.getReadinessMonitor().enable());
holder.release(() -> consumeMetrics.getReadinessMonitor().enable());
}
});
}
Expand All @@ -505,7 +502,7 @@ public void confirm() throws IOException {
LOGGER.warn("Error during basicAck of message with deliveryTag = {} inside channel #{}: {}", deliveryTag, ch.getChannelNumber(), e);
throw e;
} finally {
holder.release(() -> ConnectionType.CONSUME.metrics.getReadinessMonitor().enable());
holder.release(() -> consumeMetrics.getReadinessMonitor().enable());
}
});
}
Expand All @@ -519,7 +516,7 @@ public void confirm() throws IOException {
LOGGER.warn("The confirmation for delivery {} in queue={} routing_key={} was not invoked within the specified delay",
deliveryTag, queue, routingKey);
if (holder.reachedPendingLimit()) {
ConnectionType.CONSUME.metrics.getReadinessMonitor().disable();
consumeMetrics.getReadinessMonitor().disable();
}
});
return false; // to cast to Callable
Expand All @@ -536,11 +533,11 @@ public void confirm() throws IOException {
}

boolean isReady() {
return ConnectionType.PUBLISH.metrics.getReadinessMonitor().isEnabled() && ConnectionType.CONSUME.metrics.getReadinessMonitor().isEnabled();
return publishMetrics.getReadinessMonitor().isEnabled() && consumeMetrics.getReadinessMonitor().isEnabled();
}

boolean isAlive() {
return ConnectionType.PUBLISH.metrics.getLivenessMonitor().isEnabled() && ConnectionType.CONSUME.metrics.getLivenessMonitor().isEnabled();
return publishMetrics.getLivenessMonitor().isEnabled() && consumeMetrics.getLivenessMonitor().isEnabled();
}

private ChannelHolderOptions configurationToOptions() {
Expand Down
11 changes: 8 additions & 3 deletions src/main/kotlin/com/exactpro/th2/common/metrics/CommonMetrics.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2022 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand Down Expand Up @@ -51,8 +51,12 @@ private val ALL_READINESS = CopyOnWriteArrayList(listOf(RABBITMQ_READINESS, GRPC
fun registerLiveness(name: String) = LIVENESS_ARBITER.createMonitor(name)
fun registerReadiness(name: String) = READINESS_ARBITER.createMonitor(name)

fun registerLiveness(obj: Any) = LIVENESS_ARBITER.createMonitor("${obj::class.simpleName}_liveness_${obj.hashCode()}")
fun registerReadiness(obj: Any) = READINESS_ARBITER.createMonitor("${obj::class.simpleName}_readiness_${obj.hashCode()}")
@JvmOverloads
fun registerLiveness(obj: Any, suffix: String = "") = LIVENESS_ARBITER.createMonitor(getMonitorName(obj, suffix))
@JvmOverloads
fun registerReadiness(obj: Any, suffix: String = "") = READINESS_ARBITER.createMonitor(getMonitorName(obj, suffix))

private fun getMonitorName(obj: Any, suffix: String) = "${obj::class.simpleName}_liveness_${obj.hashCode()}" + if (suffix.isEmpty()) "" else "_$suffix"

@JvmField
val LIVENESS_MONITOR = registerLiveness("user_liveness")
Expand Down Expand Up @@ -131,6 +135,7 @@ class HealthMetrics @JvmOverloads constructor(
) {
@JvmOverloads
constructor(parent: Any, attempts: Int = 10) : this(registerLiveness(parent), registerReadiness(parent), attempts)
constructor(parent: Any, suffix: String = "") : this(registerLiveness(parent, suffix), registerReadiness(parent, suffix))

private val attempts = AtomicInteger(0)

Expand Down

0 comments on commit 492c387

Please sign in to comment.