diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportMlInfoAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportMlInfoAction.java index 6a67f942c0f19..2445c34c7fd58 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportMlInfoAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportMlInfoAction.java @@ -11,11 +11,13 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.Processors; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.tasks.Task; import org.elasticsearch.transport.TransportService; @@ -26,14 +28,20 @@ import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits; import org.elasticsearch.xpack.core.ml.job.config.CategorizationAnalyzerConfig; import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.job.NodeLoadDetector; import org.elasticsearch.xpack.ml.process.MlControllerHolder; +import org.elasticsearch.xpack.ml.utils.MlProcessors; import org.elasticsearch.xpack.ml.utils.NativeMemoryCalculator; import java.io.IOException; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; +import java.util.OptionalLong; import java.util.concurrent.TimeoutException; public class TransportMlInfoAction extends HandledTransportAction { @@ -137,6 +145,39 @@ private Map limits() { limits.put("effective_max_model_memory_limit", effectiveMaxModelMemoryLimit.getStringRep()); } limits.put("total_ml_memory", NativeMemoryCalculator.calculateTotalMlMemory(clusterSettings, nodes).getStringRep()); + + // Add processor information _if_ known with certainty. It won't be known with certainty if autoscaling is enabled. + // If we can scale up in terms of memory, assume we can also scale up in terms of processors. + List mlNodes = nodes.stream().filter(MachineLearning::isMlNode).toList(); + if (areMlNodesBiggestSize(clusterSettings.get(MachineLearning.MAX_ML_NODE_SIZE), mlNodes)) { + Processors singleNodeProcessors = MlProcessors.getMaxMlNodeProcessors( + nodes, + clusterSettings.get(MachineLearning.ALLOCATED_PROCESSORS_SCALE) + ); + if (singleNodeProcessors.count() > 0) { + limits.put("max_single_ml_node_processors", singleNodeProcessors.roundDown()); + } + Processors totalMlProcessors = MlProcessors.getTotalMlNodeProcessors( + nodes, + clusterSettings.get(MachineLearning.ALLOCATED_PROCESSORS_SCALE) + ); + if (totalMlProcessors.count() > 0) { + int potentialExtraProcessors = Math.max(0, clusterSettings.get(MachineLearning.MAX_LAZY_ML_NODES) - mlNodes.size()) + * singleNodeProcessors.roundDown(); + limits.put("total_ml_processors", totalMlProcessors.roundDown() + potentialExtraProcessors); + } + } return limits; } + + static boolean areMlNodesBiggestSize(ByteSizeValue maxMLNodeSize, Collection mlNodes) { + if (maxMLNodeSize.getBytes() == 0) { + return true; + } + + OptionalLong smallestMLNode = mlNodes.stream().map(NodeLoadDetector::getNodeSize).flatMapToLong(OptionalLong::stream).min(); + + // Can we scale vertically? + return smallestMLNode.isPresent() && smallestMLNode.getAsLong() < maxMLNodeSize.getBytes(); + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/MlProcessors.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/MlProcessors.java index 1c45c6da2bcc7..1769a7946ce80 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/MlProcessors.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/MlProcessors.java @@ -8,6 +8,8 @@ package org.elasticsearch.xpack.ml.utils; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.unit.Processors; import org.elasticsearch.xpack.ml.MachineLearning; @@ -44,4 +46,29 @@ public static Processors get(DiscoveryNode node, Integer allocatedProcessorScale return Processors.ZERO; } } + + public static Processors getMaxMlNodeProcessors(DiscoveryNodes nodes, Integer allocatedProcessorScale) { + Processors answer = Processors.ZERO; + for (DiscoveryNode node : nodes) { + if (node.getRoles().contains(DiscoveryNodeRole.ML_ROLE)) { + Processors nodeProcessors = get(node, allocatedProcessorScale); + if (answer.compareTo(nodeProcessors) < 0) { + answer = nodeProcessors; + } + } + } + return answer; + } + + public static Processors getTotalMlNodeProcessors(DiscoveryNodes nodes, Integer allocatedProcessorScale) { + int total = 0; + for (DiscoveryNode node : nodes) { + if (node.getRoles().contains(DiscoveryNodeRole.ML_ROLE)) { + Processors nodeProcessors = get(node, allocatedProcessorScale); + // Round down before summing, because ML only uses whole processors + total += nodeProcessors.roundDown(); + } + } + return Processors.of((double) total); + } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportMlInfoActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportMlInfoActionTests.java new file mode 100644 index 0000000000000..da2eac163fb18 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportMlInfoActionTests.java @@ -0,0 +1,50 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.ml.action; + +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.cluster.node.DiscoveryNodeUtils; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ml.MachineLearning; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.hamcrest.Matchers.is; + +public class TransportMlInfoActionTests extends ESTestCase { + + public void testAreMlNodesBiggestSize() { + boolean expectedResult = randomBoolean(); + long mlNodeSize = randomLongBetween(10000000L, 10000000000L); + long biggestSize = expectedResult ? mlNodeSize : mlNodeSize * randomLongBetween(2, 5); + long otherNodeSize = randomLongBetween(mlNodeSize / 2, biggestSize * 2); + var nodes = List.of( + DiscoveryNodeUtils.builder("n1") + .roles(Set.of(DiscoveryNodeRole.ML_ROLE)) + .attributes(Map.of(MachineLearning.MACHINE_MEMORY_NODE_ATTR, Long.toString(mlNodeSize))) + .build(), + DiscoveryNodeUtils.builder("n2") + .roles(Set.of(DiscoveryNodeRole.DATA_ROLE)) + .attributes(Map.of(MachineLearning.MACHINE_MEMORY_NODE_ATTR, Long.toString(otherNodeSize))) + .build(), + DiscoveryNodeUtils.builder("n3") + .roles(Set.of(DiscoveryNodeRole.ML_ROLE)) + .attributes(Map.of(MachineLearning.MACHINE_MEMORY_NODE_ATTR, Long.toString(mlNodeSize))) + .build(), + DiscoveryNodeUtils.builder("n4") + .roles(Set.of(DiscoveryNodeRole.MASTER_ROLE)) + .attributes(Map.of(MachineLearning.MACHINE_MEMORY_NODE_ATTR, Long.toString(otherNodeSize))) + .build() + ); + assertThat(TransportMlInfoAction.areMlNodesBiggestSize(ByteSizeValue.ofBytes(biggestSize), nodes), is(expectedResult)); + + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/MlProcessorsTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/MlProcessorsTests.java index 2ff3196dc87e9..b1b213e2c3f15 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/MlProcessorsTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/MlProcessorsTests.java @@ -7,11 +7,14 @@ package org.elasticsearch.xpack.ml.utils; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.ml.MachineLearning; import java.util.Map; +import java.util.Set; import static org.hamcrest.Matchers.equalTo; @@ -34,4 +37,190 @@ public void testGetWithNull() { var processor = MlProcessors.get(node, null); assertThat(processor.count(), equalTo(8.0)); } + + public void testGetMaxMlNodeProcessors() { + var nodes = DiscoveryNodes.builder() + .add( + DiscoveryNodeUtils.builder("n1") + .roles(Set.of(DiscoveryNodeRole.ML_ROLE)) + .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "8.0")) + .build() + ) + .add( + DiscoveryNodeUtils.builder("n2") + .roles(Set.of(DiscoveryNodeRole.DATA_ROLE)) + .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "9.0")) + .build() + ) + .add( + DiscoveryNodeUtils.builder("n3") + .roles(Set.of(DiscoveryNodeRole.ML_ROLE)) + .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "7.0")) + .build() + ) + .add( + DiscoveryNodeUtils.builder("n4") + .roles(Set.of(DiscoveryNodeRole.MASTER_ROLE)) + .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "6.0")) + .build() + ) + .build(); + var processor = MlProcessors.getMaxMlNodeProcessors(nodes, 1); + assertThat(processor.count(), equalTo(8.0)); + } + + public void testGetMaxMlNodeProcessorsWithScale() { + var nodes = DiscoveryNodes.builder() + .add( + DiscoveryNodeUtils.builder("n1") + .roles(Set.of(DiscoveryNodeRole.ML_ROLE)) + .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "8.0")) + .build() + ) + .add( + DiscoveryNodeUtils.builder("n2") + .roles(Set.of(DiscoveryNodeRole.DATA_ROLE)) + .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "9.0")) + .build() + ) + .add( + DiscoveryNodeUtils.builder("n3") + .roles(Set.of(DiscoveryNodeRole.ML_ROLE)) + .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "12.0")) + .build() + ) + .add( + DiscoveryNodeUtils.builder("n4") + .roles(Set.of(DiscoveryNodeRole.MASTER_ROLE)) + .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "10.0")) + .build() + ) + .build(); + var processor = MlProcessors.getMaxMlNodeProcessors(nodes, 2); + assertThat(processor.count(), equalTo(6.0)); + } + + public void testGetMaxMlNodeProcessorsWithNull() { + var nodes = DiscoveryNodes.builder() + .add( + DiscoveryNodeUtils.builder("n1") + .roles(Set.of(DiscoveryNodeRole.ML_ROLE)) + .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "6.0")) + .build() + ) + .add( + DiscoveryNodeUtils.builder("n2") + .roles(Set.of(DiscoveryNodeRole.DATA_ROLE)) + .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "9.0")) + .build() + ) + .add( + DiscoveryNodeUtils.builder("n3") + .roles(Set.of(DiscoveryNodeRole.ML_ROLE)) + .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "7.0")) + .build() + ) + .add( + DiscoveryNodeUtils.builder("n4") + .roles(Set.of(DiscoveryNodeRole.MASTER_ROLE)) + .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "6.0")) + .build() + ) + .build(); + var processor = MlProcessors.getMaxMlNodeProcessors(nodes, null); + assertThat(processor.count(), equalTo(7.0)); + } + + public void testGetTotalMlNodeProcessors() { + var nodes = DiscoveryNodes.builder() + .add( + DiscoveryNodeUtils.builder("n1") + .roles(Set.of(DiscoveryNodeRole.ML_ROLE)) + .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "8.0")) + .build() + ) + .add( + DiscoveryNodeUtils.builder("n2") + .roles(Set.of(DiscoveryNodeRole.DATA_ROLE)) + .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "9.0")) + .build() + ) + .add( + DiscoveryNodeUtils.builder("n3") + .roles(Set.of(DiscoveryNodeRole.ML_ROLE)) + .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "7.0")) + .build() + ) + .add( + DiscoveryNodeUtils.builder("n4") + .roles(Set.of(DiscoveryNodeRole.MASTER_ROLE)) + .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "6.0")) + .build() + ) + .build(); + var processor = MlProcessors.getTotalMlNodeProcessors(nodes, 1); + assertThat(processor.count(), equalTo(15.0)); + } + + public void testGetTotalMlNodeProcessorsWithScale() { + var nodes = DiscoveryNodes.builder() + .add( + DiscoveryNodeUtils.builder("n1") + .roles(Set.of(DiscoveryNodeRole.ML_ROLE)) + .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "8.0")) + .build() + ) + .add( + DiscoveryNodeUtils.builder("n2") + .roles(Set.of(DiscoveryNodeRole.DATA_ROLE)) + .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "9.0")) + .build() + ) + .add( + DiscoveryNodeUtils.builder("n3") + .roles(Set.of(DiscoveryNodeRole.ML_ROLE)) + .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "7.0")) + .build() + ) + .add( + DiscoveryNodeUtils.builder("n4") + .roles(Set.of(DiscoveryNodeRole.MASTER_ROLE)) + .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "6.0")) + .build() + ) + .build(); + var processor = MlProcessors.getTotalMlNodeProcessors(nodes, 2); + assertThat(processor.count(), equalTo(7.0)); + } + + public void testGetTotalMlNodeProcessorsWithNull() { + var nodes = DiscoveryNodes.builder() + .add( + DiscoveryNodeUtils.builder("n1") + .roles(Set.of(DiscoveryNodeRole.ML_ROLE)) + .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "6.5")) + .build() + ) + .add( + DiscoveryNodeUtils.builder("n2") + .roles(Set.of(DiscoveryNodeRole.DATA_ROLE)) + .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "9.0")) + .build() + ) + .add( + DiscoveryNodeUtils.builder("n3") + .roles(Set.of(DiscoveryNodeRole.ML_ROLE)) + .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "7.0")) + .build() + ) + .add( + DiscoveryNodeUtils.builder("n4") + .roles(Set.of(DiscoveryNodeRole.MASTER_ROLE)) + .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "6.0")) + .build() + ) + .build(); + var processor = MlProcessors.getTotalMlNodeProcessors(nodes, null); + assertThat(processor.count(), equalTo(13.0)); + } }