Skip to content

Commit

Permalink
[ML] Include ML processor limits in _ml/info response
Browse files Browse the repository at this point in the history
The _ml/info response now includes two extra fields in its
`limits`:

1. `max_single_ml_node_processors`
2. `total_ml_processors`

These fields are _only_ included if they can be accurately
calculated. If autoscaling is enabled and the ML nodes are
not at their maximum size then these fields _cannot_
currently be accurately calculated. (This could potentially
be improved in the future with additional settings set by
the control plane.)
  • Loading branch information
droberts195 committed Oct 26, 2023
1 parent 18b1bf5 commit f89746f
Show file tree
Hide file tree
Showing 4 changed files with 307 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.Processors;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
Expand All @@ -26,14 +28,20 @@
import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits;
import org.elasticsearch.xpack.core.ml.job.config.CategorizationAnalyzerConfig;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.NodeLoadDetector;
import org.elasticsearch.xpack.ml.process.MlControllerHolder;
import org.elasticsearch.xpack.ml.utils.MlProcessors;
import org.elasticsearch.xpack.ml.utils.NativeMemoryCalculator;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.concurrent.TimeoutException;

public class TransportMlInfoAction extends HandledTransportAction<MlInfoAction.Request, MlInfoAction.Response> {
Expand Down Expand Up @@ -137,6 +145,39 @@ private Map<String, Object> limits() {
limits.put("effective_max_model_memory_limit", effectiveMaxModelMemoryLimit.getStringRep());
}
limits.put("total_ml_memory", NativeMemoryCalculator.calculateTotalMlMemory(clusterSettings, nodes).getStringRep());

// Add processor information _if_ known with certainty. It won't be known with certainty if autoscaling is enabled.
// If we can scale up in terms of memory, assume we can also scale up in terms of processors.
List<DiscoveryNode> mlNodes = nodes.stream().filter(MachineLearning::isMlNode).toList();
if (areMlNodesBiggestSize(clusterSettings.get(MachineLearning.MAX_ML_NODE_SIZE), mlNodes)) {
Processors singleNodeProcessors = MlProcessors.getMaxMlNodeProcessors(
nodes,
clusterSettings.get(MachineLearning.ALLOCATED_PROCESSORS_SCALE)
);
if (singleNodeProcessors.count() > 0) {
limits.put("max_single_ml_node_processors", singleNodeProcessors.roundDown());
}
Processors totalMlProcessors = MlProcessors.getTotalMlNodeProcessors(
nodes,
clusterSettings.get(MachineLearning.ALLOCATED_PROCESSORS_SCALE)
);
if (totalMlProcessors.count() > 0) {
int potentialExtraProcessors = Math.max(0, clusterSettings.get(MachineLearning.MAX_LAZY_ML_NODES) - mlNodes.size())
* singleNodeProcessors.roundDown();
limits.put("total_ml_processors", totalMlProcessors.roundDown() + potentialExtraProcessors);
}
}
return limits;
}

static boolean areMlNodesBiggestSize(ByteSizeValue maxMLNodeSize, Collection<DiscoveryNode> mlNodes) {
if (maxMLNodeSize.getBytes() == 0) {
return true;
}

OptionalLong smallestMLNode = mlNodes.stream().map(NodeLoadDetector::getNodeSize).flatMapToLong(OptionalLong::stream).min();

// Can we scale vertically?
return smallestMLNode.isPresent() && smallestMLNode.getAsLong() < maxMLNodeSize.getBytes();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
package org.elasticsearch.xpack.ml.utils;

import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.unit.Processors;
import org.elasticsearch.xpack.ml.MachineLearning;

Expand Down Expand Up @@ -44,4 +46,29 @@ public static Processors get(DiscoveryNode node, Integer allocatedProcessorScale
return Processors.ZERO;
}
}

public static Processors getMaxMlNodeProcessors(DiscoveryNodes nodes, Integer allocatedProcessorScale) {
Processors answer = Processors.ZERO;
for (DiscoveryNode node : nodes) {
if (node.getRoles().contains(DiscoveryNodeRole.ML_ROLE)) {
Processors nodeProcessors = get(node, allocatedProcessorScale);
if (answer.compareTo(nodeProcessors) < 0) {
answer = nodeProcessors;
}
}
}
return answer;
}

public static Processors getTotalMlNodeProcessors(DiscoveryNodes nodes, Integer allocatedProcessorScale) {
int total = 0;
for (DiscoveryNode node : nodes) {
if (node.getRoles().contains(DiscoveryNodeRole.ML_ROLE)) {
Processors nodeProcessors = get(node, allocatedProcessorScale);
// Round down before summing, because ML only uses whole processors
total += nodeProcessors.roundDown();
}
}
return Processors.of((double) total);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.ml.action;

import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.MachineLearning;

import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.hamcrest.Matchers.is;

public class TransportMlInfoActionTests extends ESTestCase {

public void testAreMlNodesBiggestSize() {
boolean expectedResult = randomBoolean();
long mlNodeSize = randomLongBetween(10000000L, 10000000000L);
long biggestSize = expectedResult ? mlNodeSize : mlNodeSize * randomLongBetween(2, 5);
long otherNodeSize = randomLongBetween(mlNodeSize / 2, biggestSize * 2);
var nodes = List.of(
DiscoveryNodeUtils.builder("n1")
.roles(Set.of(DiscoveryNodeRole.ML_ROLE))
.attributes(Map.of(MachineLearning.MACHINE_MEMORY_NODE_ATTR, Long.toString(mlNodeSize)))
.build(),
DiscoveryNodeUtils.builder("n2")
.roles(Set.of(DiscoveryNodeRole.DATA_ROLE))
.attributes(Map.of(MachineLearning.MACHINE_MEMORY_NODE_ATTR, Long.toString(otherNodeSize)))
.build(),
DiscoveryNodeUtils.builder("n3")
.roles(Set.of(DiscoveryNodeRole.ML_ROLE))
.attributes(Map.of(MachineLearning.MACHINE_MEMORY_NODE_ATTR, Long.toString(mlNodeSize)))
.build(),
DiscoveryNodeUtils.builder("n4")
.roles(Set.of(DiscoveryNodeRole.MASTER_ROLE))
.attributes(Map.of(MachineLearning.MACHINE_MEMORY_NODE_ATTR, Long.toString(otherNodeSize)))
.build()
);
assertThat(TransportMlInfoAction.areMlNodesBiggestSize(ByteSizeValue.ofBytes(biggestSize), nodes), is(expectedResult));

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@

package org.elasticsearch.xpack.ml.utils;

import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.MachineLearning;

import java.util.Map;
import java.util.Set;

import static org.hamcrest.Matchers.equalTo;

Expand All @@ -34,4 +37,190 @@ public void testGetWithNull() {
var processor = MlProcessors.get(node, null);
assertThat(processor.count(), equalTo(8.0));
}

public void testGetMaxMlNodeProcessors() {
var nodes = DiscoveryNodes.builder()
.add(
DiscoveryNodeUtils.builder("n1")
.roles(Set.of(DiscoveryNodeRole.ML_ROLE))
.attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "8.0"))
.build()
)
.add(
DiscoveryNodeUtils.builder("n2")
.roles(Set.of(DiscoveryNodeRole.DATA_ROLE))
.attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "9.0"))
.build()
)
.add(
DiscoveryNodeUtils.builder("n3")
.roles(Set.of(DiscoveryNodeRole.ML_ROLE))
.attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "7.0"))
.build()
)
.add(
DiscoveryNodeUtils.builder("n4")
.roles(Set.of(DiscoveryNodeRole.MASTER_ROLE))
.attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "6.0"))
.build()
)
.build();
var processor = MlProcessors.getMaxMlNodeProcessors(nodes, 1);
assertThat(processor.count(), equalTo(8.0));
}

public void testGetMaxMlNodeProcessorsWithScale() {
var nodes = DiscoveryNodes.builder()
.add(
DiscoveryNodeUtils.builder("n1")
.roles(Set.of(DiscoveryNodeRole.ML_ROLE))
.attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "8.0"))
.build()
)
.add(
DiscoveryNodeUtils.builder("n2")
.roles(Set.of(DiscoveryNodeRole.DATA_ROLE))
.attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "9.0"))
.build()
)
.add(
DiscoveryNodeUtils.builder("n3")
.roles(Set.of(DiscoveryNodeRole.ML_ROLE))
.attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "12.0"))
.build()
)
.add(
DiscoveryNodeUtils.builder("n4")
.roles(Set.of(DiscoveryNodeRole.MASTER_ROLE))
.attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "10.0"))
.build()
)
.build();
var processor = MlProcessors.getMaxMlNodeProcessors(nodes, 2);
assertThat(processor.count(), equalTo(6.0));
}

public void testGetMaxMlNodeProcessorsWithNull() {
var nodes = DiscoveryNodes.builder()
.add(
DiscoveryNodeUtils.builder("n1")
.roles(Set.of(DiscoveryNodeRole.ML_ROLE))
.attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "6.0"))
.build()
)
.add(
DiscoveryNodeUtils.builder("n2")
.roles(Set.of(DiscoveryNodeRole.DATA_ROLE))
.attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "9.0"))
.build()
)
.add(
DiscoveryNodeUtils.builder("n3")
.roles(Set.of(DiscoveryNodeRole.ML_ROLE))
.attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "7.0"))
.build()
)
.add(
DiscoveryNodeUtils.builder("n4")
.roles(Set.of(DiscoveryNodeRole.MASTER_ROLE))
.attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "6.0"))
.build()
)
.build();
var processor = MlProcessors.getMaxMlNodeProcessors(nodes, null);
assertThat(processor.count(), equalTo(7.0));
}

public void testGetTotalMlNodeProcessors() {
var nodes = DiscoveryNodes.builder()
.add(
DiscoveryNodeUtils.builder("n1")
.roles(Set.of(DiscoveryNodeRole.ML_ROLE))
.attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "8.0"))
.build()
)
.add(
DiscoveryNodeUtils.builder("n2")
.roles(Set.of(DiscoveryNodeRole.DATA_ROLE))
.attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "9.0"))
.build()
)
.add(
DiscoveryNodeUtils.builder("n3")
.roles(Set.of(DiscoveryNodeRole.ML_ROLE))
.attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "7.0"))
.build()
)
.add(
DiscoveryNodeUtils.builder("n4")
.roles(Set.of(DiscoveryNodeRole.MASTER_ROLE))
.attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "6.0"))
.build()
)
.build();
var processor = MlProcessors.getTotalMlNodeProcessors(nodes, 1);
assertThat(processor.count(), equalTo(15.0));
}

public void testGetTotalMlNodeProcessorsWithScale() {
var nodes = DiscoveryNodes.builder()
.add(
DiscoveryNodeUtils.builder("n1")
.roles(Set.of(DiscoveryNodeRole.ML_ROLE))
.attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "8.0"))
.build()
)
.add(
DiscoveryNodeUtils.builder("n2")
.roles(Set.of(DiscoveryNodeRole.DATA_ROLE))
.attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "9.0"))
.build()
)
.add(
DiscoveryNodeUtils.builder("n3")
.roles(Set.of(DiscoveryNodeRole.ML_ROLE))
.attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "7.0"))
.build()
)
.add(
DiscoveryNodeUtils.builder("n4")
.roles(Set.of(DiscoveryNodeRole.MASTER_ROLE))
.attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "6.0"))
.build()
)
.build();
var processor = MlProcessors.getTotalMlNodeProcessors(nodes, 2);
assertThat(processor.count(), equalTo(7.0));
}

public void testGetTotalMlNodeProcessorsWithNull() {
var nodes = DiscoveryNodes.builder()
.add(
DiscoveryNodeUtils.builder("n1")
.roles(Set.of(DiscoveryNodeRole.ML_ROLE))
.attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "6.5"))
.build()
)
.add(
DiscoveryNodeUtils.builder("n2")
.roles(Set.of(DiscoveryNodeRole.DATA_ROLE))
.attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "9.0"))
.build()
)
.add(
DiscoveryNodeUtils.builder("n3")
.roles(Set.of(DiscoveryNodeRole.ML_ROLE))
.attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "7.0"))
.build()
)
.add(
DiscoveryNodeUtils.builder("n4")
.roles(Set.of(DiscoveryNodeRole.MASTER_ROLE))
.attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "6.0"))
.build()
)
.build();
var processor = MlProcessors.getTotalMlNodeProcessors(nodes, null);
assertThat(processor.count(), equalTo(13.0));
}
}

0 comments on commit f89746f

Please sign in to comment.