diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlMetrics.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlMetrics.java
index e1e41763808f9..5daf4426d8b61 100644
--- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlMetrics.java
+++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlMetrics.java
@@ -7,12 +7,15 @@
package org.elasticsearch.xpack.ml;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.GatewayService;
@@ -28,6 +31,8 @@
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.utils.NativeMemoryCalculator;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -40,24 +45,28 @@
/**
* This class adds two types of ML metrics to the meter registry, such that they can be collected by Elastic APM.
- *
+ *
* 1. Per-node ML native memory statistics for ML nodes
* 2. Cluster-wide job/model statuses for master-eligible nodes
- *
+ *
* The memory metrics relate solely to the ML node they are collected from.
- *
+ *
* The job/model metrics are cluster-wide because a key problem we want to be able to detect is when there are
* jobs or models that are not assigned to any node. The consumer of the data needs to account for the fact that
* multiple master-eligible nodes are reporting the same information. The is_master attribute in the records
* indicates which one was actually master, so can be used to deduplicate.
*/
-public class MlMetrics implements ClusterStateListener {
+public class MlMetrics extends AbstractLifecycleComponent implements ClusterStateListener {
+ private static final Logger logger = LogManager.getLogger(MlMetrics.class);
+
+ private final MeterRegistry meterRegistry;
private final ClusterService clusterService;
private final AutodetectProcessManager autodetectProcessManager;
private final DataFrameAnalyticsManager dataFrameAnalyticsManager;
private final boolean hasMasterRole;
private final boolean hasMlRole;
+ private final List metrics = new ArrayList<>();
private static final Map MASTER_TRUE_MAP = Map.of("is_master", Boolean.TRUE);
private static final Map MASTER_FALSE_MAP = Map.of("is_master", Boolean.FALSE);
@@ -80,156 +89,216 @@ public MlMetrics(
AutodetectProcessManager autodetectProcessManager,
DataFrameAnalyticsManager dataFrameAnalyticsManager
) {
+ this.meterRegistry = meterRegistry;
this.clusterService = clusterService;
this.autodetectProcessManager = autodetectProcessManager;
this.dataFrameAnalyticsManager = dataFrameAnalyticsManager;
hasMasterRole = DiscoveryNode.hasRole(settings, DiscoveryNodeRole.MASTER_ROLE);
- if (hasMasterRole) {
- registerMasterNodeMetrics(meterRegistry);
- }
hasMlRole = DiscoveryNode.hasRole(settings, DiscoveryNodeRole.ML_ROLE);
- if (hasMlRole) {
- registerMlNodeMetrics(meterRegistry);
- }
if (hasMasterRole || hasMlRole) {
clusterService.addListener(this);
}
}
private void registerMlNodeMetrics(MeterRegistry meterRegistry) {
- // Ignore the AutoCloseable warnings here - the registry is responsible for closing these gauges
- meterRegistry.registerLongGauge(
- "es.ml.native_memory.limit",
- "ML native memory limit on this node.",
- "bytes",
- () -> new LongWithAttributes(nativeMemLimit, Map.of())
+ metrics.add(
+ meterRegistry.registerLongGauge(
+ "es.ml.native_memory.limit",
+ "ML native memory limit on this node.",
+ "bytes",
+ () -> new LongWithAttributes(nativeMemLimit, Map.of())
+ )
);
- meterRegistry.registerLongGauge(
- "es.ml.native_memory.usage.anomaly_detectors",
- "ML native memory used by anomaly detection jobs on this node.",
- "bytes",
- () -> new LongWithAttributes(nativeMemAdUsage, Map.of())
+ metrics.add(
+ meterRegistry.registerLongGauge(
+ "es.ml.native_memory.usage.anomaly_detectors",
+ "ML native memory used by anomaly detection jobs on this node.",
+ "bytes",
+ () -> new LongWithAttributes(nativeMemAdUsage, Map.of())
+ )
);
- meterRegistry.registerLongGauge(
- "es.ml.native_memory.usage.data_frame_analytics",
- "ML native memory used by data frame analytics jobs on this node.",
- "bytes",
- () -> new LongWithAttributes(nativeMemDfaUsage, Map.of())
+ metrics.add(
+ meterRegistry.registerLongGauge(
+ "es.ml.native_memory.usage.data_frame_analytics",
+ "ML native memory used by data frame analytics jobs on this node.",
+ "bytes",
+ () -> new LongWithAttributes(nativeMemDfaUsage, Map.of())
+ )
);
- meterRegistry.registerLongGauge(
- "es.ml.native_memory.usage.trained_models",
- "ML native memory used by trained models on this node.",
- "bytes",
- () -> new LongWithAttributes(nativeMemTrainedModelUsage, Map.of())
+ metrics.add(
+ meterRegistry.registerLongGauge(
+ "es.ml.native_memory.usage.trained_models",
+ "ML native memory used by trained models on this node.",
+ "bytes",
+ () -> new LongWithAttributes(nativeMemTrainedModelUsage, Map.of())
+ )
);
- meterRegistry.registerLongGauge(
- "es.ml.native_memory.free",
- "Free ML native memory on this node.",
- "bytes",
- () -> new LongWithAttributes(nativeMemFree, Map.of())
+ metrics.add(
+ meterRegistry.registerLongGauge(
+ "es.ml.native_memory.free",
+ "Free ML native memory on this node.",
+ "bytes",
+ () -> new LongWithAttributes(nativeMemFree, Map.of())
+ )
);
}
private void registerMasterNodeMetrics(MeterRegistry meterRegistry) {
- // Ignore the AutoCloseable warnings here - the registry is responsible for closing these gauges
- meterRegistry.registerLongGauge(
- "es.ml.anomaly_detectors.opening.count",
- "Count of anomaly detection jobs in the opening state cluster-wide.",
- "jobs",
- () -> new LongWithAttributes(mlTaskStatusCounts.adOpeningCount, isMasterMap)
+ metrics.add(
+ meterRegistry.registerLongGauge(
+ "es.ml.anomaly_detectors.opening.count",
+ "Count of anomaly detection jobs in the opening state cluster-wide.",
+ "jobs",
+ () -> new LongWithAttributes(mlTaskStatusCounts.adOpeningCount, isMasterMap)
+ )
);
- meterRegistry.registerLongGauge(
- "es.ml.anomaly_detectors.opened.count",
- "Count of anomaly detection jobs in the opened state cluster-wide.",
- "jobs",
- () -> new LongWithAttributes(mlTaskStatusCounts.adOpenedCount, isMasterMap)
+ metrics.add(
+ meterRegistry.registerLongGauge(
+ "es.ml.anomaly_detectors.opened.count",
+ "Count of anomaly detection jobs in the opened state cluster-wide.",
+ "jobs",
+ () -> new LongWithAttributes(mlTaskStatusCounts.adOpenedCount, isMasterMap)
+ )
);
- meterRegistry.registerLongGauge(
- "es.ml.anomaly_detectors.closing.count",
- "Count of anomaly detection jobs in the closing state cluster-wide.",
- "jobs",
- () -> new LongWithAttributes(mlTaskStatusCounts.adClosingCount, isMasterMap)
+ metrics.add(
+ meterRegistry.registerLongGauge(
+ "es.ml.anomaly_detectors.closing.count",
+ "Count of anomaly detection jobs in the closing state cluster-wide.",
+ "jobs",
+ () -> new LongWithAttributes(mlTaskStatusCounts.adClosingCount, isMasterMap)
+ )
);
- meterRegistry.registerLongGauge(
- "es.ml.anomaly_detectors.failed.count",
- "Count of anomaly detection jobs in the failed state cluster-wide.",
- "jobs",
- () -> new LongWithAttributes(mlTaskStatusCounts.adFailedCount, isMasterMap)
+ metrics.add(
+ meterRegistry.registerLongGauge(
+ "es.ml.anomaly_detectors.failed.count",
+ "Count of anomaly detection jobs in the failed state cluster-wide.",
+ "jobs",
+ () -> new LongWithAttributes(mlTaskStatusCounts.adFailedCount, isMasterMap)
+ )
);
- meterRegistry.registerLongGauge(
- "es.ml.anomaly_detectors.starting.count",
- "Count of datafeeds in the starting state cluster-wide.",
- "datafeeds",
- () -> new LongWithAttributes(mlTaskStatusCounts.datafeedStartingCount, isMasterMap)
+ metrics.add(
+ meterRegistry.registerLongGauge(
+ "es.ml.anomaly_detectors.starting.count",
+ "Count of datafeeds in the starting state cluster-wide.",
+ "datafeeds",
+ () -> new LongWithAttributes(mlTaskStatusCounts.datafeedStartingCount, isMasterMap)
+ )
);
- meterRegistry.registerLongGauge(
- "es.ml.anomaly_detectors.started.count",
- "Count of datafeeds in the started state cluster-wide.",
- "datafeeds",
- () -> new LongWithAttributes(mlTaskStatusCounts.datafeedStartedCount, isMasterMap)
+ metrics.add(
+ meterRegistry.registerLongGauge(
+ "es.ml.anomaly_detectors.started.count",
+ "Count of datafeeds in the started state cluster-wide.",
+ "datafeeds",
+ () -> new LongWithAttributes(mlTaskStatusCounts.datafeedStartedCount, isMasterMap)
+ )
);
- meterRegistry.registerLongGauge(
- "es.ml.anomaly_detectors.stopping.count",
- "Count of datafeeds in the stopping state cluster-wide.",
- "datafeeds",
- () -> new LongWithAttributes(mlTaskStatusCounts.datafeedStoppingCount, isMasterMap)
+ metrics.add(
+ meterRegistry.registerLongGauge(
+ "es.ml.anomaly_detectors.stopping.count",
+ "Count of datafeeds in the stopping state cluster-wide.",
+ "datafeeds",
+ () -> new LongWithAttributes(mlTaskStatusCounts.datafeedStoppingCount, isMasterMap)
+ )
);
- meterRegistry.registerLongGauge(
- "es.ml.data_frame_analytics.starting.count",
- "Count of data frame analytics jobs in the starting state cluster-wide.",
- "jobs",
- () -> new LongWithAttributes(mlTaskStatusCounts.dfaStartingCount, isMasterMap)
+ metrics.add(
+ meterRegistry.registerLongGauge(
+ "es.ml.data_frame_analytics.starting.count",
+ "Count of data frame analytics jobs in the starting state cluster-wide.",
+ "jobs",
+ () -> new LongWithAttributes(mlTaskStatusCounts.dfaStartingCount, isMasterMap)
+ )
);
- meterRegistry.registerLongGauge(
- "es.ml.data_frame_analytics.started.count",
- "Count of data frame analytics jobs in the started state cluster-wide.",
- "jobs",
- () -> new LongWithAttributes(mlTaskStatusCounts.dfaStartedCount, isMasterMap)
+ metrics.add(
+ meterRegistry.registerLongGauge(
+ "es.ml.data_frame_analytics.started.count",
+ "Count of data frame analytics jobs in the started state cluster-wide.",
+ "jobs",
+ () -> new LongWithAttributes(mlTaskStatusCounts.dfaStartedCount, isMasterMap)
+ )
);
- meterRegistry.registerLongGauge(
- "es.ml.data_frame_analytics.reindexing.count",
- "Count of data frame analytics jobs in the reindexing state cluster-wide.",
- "jobs",
- () -> new LongWithAttributes(mlTaskStatusCounts.dfaReindexingCount, isMasterMap)
+ metrics.add(
+ meterRegistry.registerLongGauge(
+ "es.ml.data_frame_analytics.reindexing.count",
+ "Count of data frame analytics jobs in the reindexing state cluster-wide.",
+ "jobs",
+ () -> new LongWithAttributes(mlTaskStatusCounts.dfaReindexingCount, isMasterMap)
+ )
);
- meterRegistry.registerLongGauge(
- "es.ml.data_frame_analytics.analyzing.count",
- "Count of data frame analytics jobs in the analyzing state cluster-wide.",
- "jobs",
- () -> new LongWithAttributes(mlTaskStatusCounts.dfaAnalyzingCount, isMasterMap)
+ metrics.add(
+ meterRegistry.registerLongGauge(
+ "es.ml.data_frame_analytics.analyzing.count",
+ "Count of data frame analytics jobs in the analyzing state cluster-wide.",
+ "jobs",
+ () -> new LongWithAttributes(mlTaskStatusCounts.dfaAnalyzingCount, isMasterMap)
+ )
);
- meterRegistry.registerLongGauge(
- "es.ml.data_frame_analytics.stopping.count",
- "Count of data frame analytics jobs in the stopping state cluster-wide.",
- "jobs",
- () -> new LongWithAttributes(mlTaskStatusCounts.dfaStoppingCount, isMasterMap)
+ metrics.add(
+ meterRegistry.registerLongGauge(
+ "es.ml.data_frame_analytics.stopping.count",
+ "Count of data frame analytics jobs in the stopping state cluster-wide.",
+ "jobs",
+ () -> new LongWithAttributes(mlTaskStatusCounts.dfaStoppingCount, isMasterMap)
+ )
);
- meterRegistry.registerLongGauge(
- "es.ml.data_frame_analytics.failed.count",
- "Count of data frame analytics jobs in the failed state cluster-wide.",
- "jobs",
- () -> new LongWithAttributes(mlTaskStatusCounts.dfaFailedCount, isMasterMap)
+ metrics.add(
+ meterRegistry.registerLongGauge(
+ "es.ml.data_frame_analytics.failed.count",
+ "Count of data frame analytics jobs in the failed state cluster-wide.",
+ "jobs",
+ () -> new LongWithAttributes(mlTaskStatusCounts.dfaFailedCount, isMasterMap)
+ )
);
- meterRegistry.registerLongGauge(
- "es.ml.trained_models.deployment.target_allocations.count",
- "Sum of target trained model allocations across all deployments cluster-wide.",
- "allocations",
- () -> new LongWithAttributes(trainedModelAllocationCounts.trainedModelsTargetAllocations, isMasterMap)
+ metrics.add(
+ meterRegistry.registerLongGauge(
+ "es.ml.trained_models.deployment.target_allocations.count",
+ "Sum of target trained model allocations across all deployments cluster-wide.",
+ "allocations",
+ () -> new LongWithAttributes(trainedModelAllocationCounts.trainedModelsTargetAllocations, isMasterMap)
+ )
);
- meterRegistry.registerLongGauge(
- "es.ml.trained_models.deployment.current_allocations.count",
- "Sum of current trained model allocations across all deployments cluster-wide.",
- "allocations",
- () -> new LongWithAttributes(trainedModelAllocationCounts.trainedModelsCurrentAllocations, isMasterMap)
+ metrics.add(
+ meterRegistry.registerLongGauge(
+ "es.ml.trained_models.deployment.current_allocations.count",
+ "Sum of current trained model allocations across all deployments cluster-wide.",
+ "allocations",
+ () -> new LongWithAttributes(trainedModelAllocationCounts.trainedModelsCurrentAllocations, isMasterMap)
+ )
);
- meterRegistry.registerLongGauge(
- "es.ml.trained_models.deployment.failed_allocations.count",
- "Sum of failed trained model allocations across all deployments cluster-wide.",
- "allocations",
- () -> new LongWithAttributes(trainedModelAllocationCounts.trainedModelsFailedAllocations, isMasterMap)
+ metrics.add(
+ meterRegistry.registerLongGauge(
+ "es.ml.trained_models.deployment.failed_allocations.count",
+ "Sum of failed trained model allocations across all deployments cluster-wide.",
+ "allocations",
+ () -> new LongWithAttributes(trainedModelAllocationCounts.trainedModelsFailedAllocations, isMasterMap)
+ )
);
}
+ @Override
+ protected void doStart() {
+ metrics.clear();
+ if (hasMasterRole) {
+ registerMasterNodeMetrics(meterRegistry);
+ }
+ if (hasMlRole) {
+ registerMlNodeMetrics(meterRegistry);
+ }
+ }
+
+ @Override
+ protected void doStop() {}
+
+ @Override
+ protected void doClose() {
+ metrics.forEach(metric -> {
+ try {
+ metric.close();
+ } catch (Exception e) {
+ logger.warn("metrics close() method should not throw Exception", e);
+ }
+ });
+ }
+
/**
* Metric values are recalculated in response to cluster state changes and then cached.
* This means that the telemetry provider can poll the metrics registry as often as it
@@ -314,11 +383,11 @@ private void memoryLimitClusterSettingUpdated() {
* - Anomaly detection jobs
* - Datafeeds
* - Data frame analytics jobs
- *
+ *
* In the future it could possibly also include model snapshot upgrade tasks.
- *
+ *
* These stats relate to the whole cluster and not just the current node.
- *
+ *
* The caller is expected to cache the returned stats to avoid unnecessary recalculation.
*/
static MlTaskStatusCounts findTaskStatuses(PersistentTasksCustomMetadata tasks) {
@@ -406,9 +475,9 @@ static long findDfaMemoryUsage(DataFrameAnalyticsManager dataFrameAnalyticsManag
/**
* Returns up-to-date stats about the numbers of allocations of ML trained models.
- *
+ *
* These stats relate to the whole cluster and not just the current node.
- *
+ *
* The caller is expected to cache the returned stats to avoid unnecessary recalculation.
*/
static TrainedModelAllocationCounts findTrainedModelAllocationCounts(TrainedModelAssignmentMetadata metadata) {