Skip to content

Commit

Permalink
[ML] Expose some ML metrics via APM
Browse files Browse the repository at this point in the history
This change adds two types of ML metrics to the meter registry,
such that they can be collected by Elastic APM.

1. Per-node ML native memory statistics for ML nodes
2. Cluster-wide job/model statuses for master-eligible nodes

The memory metrics relate solely to the ML node they are
collected from.

The job/model metrics are cluster-wide because a key problem
we want to be able to detect is when there are jobs or models
that are not assigned to any node. The consumer of the data
needs to account for the fact that multiple master-eligible
nodes are reporting the same information. The `is_master`
attribute in the records indicates which one was actually
master, so can be used to deduplicate.
  • Loading branch information
droberts195 committed Nov 24, 2023
1 parent e649a08 commit 2f21dc0
Show file tree
Hide file tree
Showing 9 changed files with 552 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,10 @@ public static SnapshotUpgradeState getSnapshotUpgradeState(@Nullable PersistentT

public static DatafeedState getDatafeedState(String datafeedId, @Nullable PersistentTasksCustomMetadata tasks) {
PersistentTasksCustomMetadata.PersistentTask<?> task = getDatafeedTask(datafeedId, tasks);
return getDatafeedState(task);
}

public static DatafeedState getDatafeedState(PersistentTasksCustomMetadata.PersistentTask<?> task) {
if (task == null) {
// If we haven't started a datafeed then there will be no persistent task,
// which is the same as if the datafeed was't started
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ public int getTargetAllocations() {
return targetAllocations;
}

public int getFailedAllocations() {
return state == RoutingState.FAILED ? targetAllocations : 0;
}

public RoutingState getState() {
return state;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,10 @@ public int totalTargetAllocations() {
return nodeRoutingTable.values().stream().mapToInt(RoutingInfo::getTargetAllocations).sum();
}

public int totalFailedAllocations() {
return nodeRoutingTable.values().stream().mapToInt(RoutingInfo::getFailedAllocations).sum();
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,17 @@ public void testIsRoutable_GivenStartedWithNonZeroAllocations() {
RoutingInfo routingInfo = new RoutingInfo(randomIntBetween(1, 10), 1, RoutingState.STARTED, "");
assertThat(routingInfo.isRoutable(), is(true));
}

public void testGetFailedAllocations() {
int targetAllocations = randomIntBetween(1, 10);
RoutingInfo routingInfo = new RoutingInfo(
randomIntBetween(0, targetAllocations),
targetAllocations,
randomFrom(RoutingState.STARTING, RoutingState.STARTED, RoutingState.STOPPING),
""
);
assertThat(routingInfo.getFailedAllocations(), is(0));
routingInfo = new RoutingInfo(randomIntBetween(0, targetAllocations), targetAllocations, RoutingState.FAILED, "");
assertThat(routingInfo.getFailedAllocations(), is(targetAllocations));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.elasticsearch.plugins.SystemIndexPlugin;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.telemetry.TelemetryProvider;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.ScalingExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -884,6 +885,7 @@ public Collection<?> createComponents(PluginServices services) {
Environment environment = services.environment();
NamedXContentRegistry xContentRegistry = services.xContentRegistry();
IndexNameExpressionResolver indexNameExpressionResolver = services.indexNameExpressionResolver();
TelemetryProvider telemetryProvider = services.telemetryProvider();

if (enabled == false) {
// Holders for @link(MachineLearningFeatureSetUsage) which needs access to job manager and ML extension,
Expand Down Expand Up @@ -1220,6 +1222,14 @@ public Collection<?> createComponents(PluginServices services) {
machineLearningExtension.get().isNlpEnabled()
);

MlMetrics mlMetrics = new MlMetrics(
telemetryProvider.getMeterRegistry(),
clusterService,
settings,
autodetectProcessManager,
dataFrameAnalyticsManager
);

return List.of(
mlLifeCycleService,
new MlControllerHolder(mlController),
Expand Down Expand Up @@ -1251,7 +1261,8 @@ public Collection<?> createComponents(PluginServices services) {
trainedModelAllocationClusterServiceSetOnce.get(),
deploymentManager.get(),
nodeAvailabilityZoneMapper,
new MachineLearningExtensionHolder(machineLearningExtension.get())
new MachineLearningExtensionHolder(machineLearningExtension.get()),
mlMetrics
);
}

Expand Down
Loading

0 comments on commit 2f21dc0

Please sign in to comment.