diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingResourceTracker.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingResourceTracker.java index 5c89c29a70cdd..4b925f678602a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingResourceTracker.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingResourceTracker.java @@ -22,6 +22,8 @@ import org.elasticsearch.xpack.core.ml.autoscaling.MlAutoscalingStats; import org.elasticsearch.xpack.core.ml.inference.assignment.AssignmentState; import org.elasticsearch.xpack.core.ml.inference.assignment.Priority; +import org.elasticsearch.xpack.core.ml.inference.assignment.TrainedModelAssignment; +import org.elasticsearch.xpack.core.ml.utils.MemoryTrackedTaskState; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.job.NodeLoadDetector; import org.elasticsearch.xpack.ml.process.MlMemoryTracker; @@ -130,22 +132,27 @@ static void getMemoryAndProcessors( autoscalingContext.modelAssignments.size() ); - // start with `minNodes = 1` if any ML job is started, further adjustments are made for trained models below - int minNodes = autoscalingContext.anomalyDetectionTasks.isEmpty() - && autoscalingContext.dataframeAnalyticsTasks.isEmpty() - && autoscalingContext.modelAssignments.isEmpty() ? 0 : 1; + // Start with `minNodes = 0`. If any ML job is started this will be increased to 1 in the loops below, + // and further adjustments are made for trained models depending on allocations. + int minNodes = 0; // anomaly detection for (var task : autoscalingContext.anomalyDetectionTasks) { + MemoryTrackedTaskState state = MlTasks.getMemoryTrackedTaskState(task); + if (state != null && state.consumesMemory() == false) { + continue; + } + String jobId = ((OpenJobAction.JobParams) task.getParams()).getJobId(); Long jobMemory = mlMemoryTracker.getAnomalyDetectorJobMemoryRequirement(jobId); - if (jobMemory == null) { logger.debug("could not find memory requirement for job [{}], returning no-scale", jobId); listener.onResponse(noScaleStats(numberMlNodes)); return; } + minNodes = 1; + if (AWAITING_LAZY_ASSIGNMENT.equals(task.getAssignment())) { logger.debug("job [{}] lacks assignment , memory required [{}]", jobId, jobMemory); @@ -165,15 +172,21 @@ static void getMemoryAndProcessors( // data frame analytics for (var task : autoscalingContext.dataframeAnalyticsTasks) { + MemoryTrackedTaskState state = MlTasks.getMemoryTrackedTaskState(task); + if (state != null && state.consumesMemory() == false) { + continue; + } + String jobId = MlTasks.dataFrameAnalyticsId(task.getId()); Long jobMemory = mlMemoryTracker.getDataFrameAnalyticsJobMemoryRequirement(jobId); - if (jobMemory == null) { logger.debug("could not find memory requirement for job [{}], returning no-scale", jobId); listener.onResponse(noScaleStats(numberMlNodes)); return; } + minNodes = 1; + if (AWAITING_LAZY_ASSIGNMENT.equals(task.getAssignment())) { logger.debug("dfa job [{}] lacks assignment , memory required [{}]", jobId, jobMemory); @@ -192,12 +205,12 @@ static void getMemoryAndProcessors( // trained models for (var modelAssignment : autoscalingContext.modelAssignments.entrySet()) { - final int numberOfAllocations = modelAssignment.getValue().getTaskParams().getNumberOfAllocations(); - final int numberOfThreadsPerAllocation = modelAssignment.getValue().getTaskParams().getThreadsPerAllocation(); - final long estimatedMemoryUsage = modelAssignment.getValue().getTaskParams().estimateMemoryUsageBytes(); + TrainedModelAssignment assignment = modelAssignment.getValue(); + final int numberOfAllocations = assignment.getTaskParams().getNumberOfAllocations(); + final int numberOfThreadsPerAllocation = assignment.getTaskParams().getThreadsPerAllocation(); + final long estimatedMemoryUsage = assignment.getTaskParams().estimateMemoryUsageBytes(); - if (AssignmentState.STARTING.equals(modelAssignment.getValue().getAssignmentState()) - && modelAssignment.getValue().getNodeRoutingTable().isEmpty()) { + if (AssignmentState.STARTING.equals(assignment.getAssignmentState()) && assignment.getNodeRoutingTable().isEmpty()) { logger.debug( () -> format( @@ -216,6 +229,9 @@ static void getMemoryAndProcessors( extraSingleNodeProcessors = Math.max(extraSingleNodeProcessors, numberOfThreadsPerAllocation); extraProcessors += numberOfAllocations * numberOfThreadsPerAllocation; } + } else if (assignment.getNodeRoutingTable().values().stream().allMatch(r -> r.getState().consumesMemory() == false)) { + // Ignore states that don't consume memory, for example all allocations are failed + continue; } else { logger.debug( () -> format( @@ -229,9 +245,6 @@ static void getMemoryAndProcessors( modelMemoryBytesSum += estimatedMemoryUsage; processorsSum += numberOfAllocations * numberOfThreadsPerAllocation; - // min(3, max(number of allocations over all deployed models) - minNodes = Math.min(3, Math.max(minNodes, numberOfAllocations)); - for (String node : modelAssignment.getValue().getNodeRoutingTable().keySet()) { perNodeModelMemoryInBytes.computeIfAbsent(node, k -> new ArrayList<>()) .add( @@ -244,6 +257,9 @@ static void getMemoryAndProcessors( ); } } + + // min(3, max(number of allocations over all deployed models) + minNodes = Math.min(3, Math.max(minNodes, numberOfAllocations)); } // check for downscaling diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingResourceTrackerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingResourceTrackerTests.java index 05478deac811c..7ea63cf7945f0 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingResourceTrackerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingResourceTrackerTests.java @@ -14,12 +14,16 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.ml.MlTasks; +import org.elasticsearch.xpack.core.ml.action.OpenJobAction; import org.elasticsearch.xpack.core.ml.action.StartTrainedModelDeploymentAction; import org.elasticsearch.xpack.core.ml.autoscaling.MlAutoscalingStats; import org.elasticsearch.xpack.core.ml.inference.assignment.Priority; import org.elasticsearch.xpack.core.ml.inference.assignment.RoutingInfo; import org.elasticsearch.xpack.core.ml.inference.assignment.RoutingState; import org.elasticsearch.xpack.core.ml.inference.assignment.TrainedModelAssignment; +import org.elasticsearch.xpack.core.ml.job.config.JobState; +import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.process.MlMemoryTracker; @@ -34,7 +38,9 @@ import java.util.function.Consumer; import static org.elasticsearch.xpack.ml.autoscaling.MlAutoscalingResourceTracker.MlJobRequirements; +import static org.elasticsearch.xpack.ml.job.JobNodeSelector.AWAITING_LAZY_ASSIGNMENT; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class MlAutoscalingResourceTrackerTests extends ESTestCase { @@ -83,6 +89,137 @@ public void testGetMemoryAndProcessors() throws InterruptedException { ); } + public void testGetMemoryAndProcessorsScaleUpGivenAwaitingLazyAssignment() throws InterruptedException { + long memory = 1000000000; + Map nodeAttr = Map.of( + MachineLearning.MACHINE_MEMORY_NODE_ATTR, + Long.toString(memory), + MachineLearning.MAX_JVM_SIZE_NODE_ATTR, + "400000000", + MachineLearning.ML_CONFIG_VERSION_NODE_ATTR, + "7.2.0" + ); + String jobId = "lazy-job"; + MlAutoscalingContext mlAutoscalingContext = new MlAutoscalingContext( + List.of( + new PersistentTasksCustomMetadata.PersistentTask<>( + MlTasks.jobTaskId(jobId), + MlTasks.JOB_TASK_NAME, + new OpenJobAction.JobParams(jobId), + 1, + AWAITING_LAZY_ASSIGNMENT + ) + ), + List.of(), + List.of(), + Map.of(), + List.of( + DiscoveryNodeUtils.builder("ml-1") + .name("ml-1") + .address(new TransportAddress(InetAddress.getLoopbackAddress(), 9300)) + .attributes(nodeAttr) + .roles(Set.of(DiscoveryNodeRole.ML_ROLE)) + .build(), + DiscoveryNodeUtils.builder("ml-2") + .name("ml-2") + .address(new TransportAddress(InetAddress.getLoopbackAddress(), 9300)) + .attributes(nodeAttr) + .roles(Set.of(DiscoveryNodeRole.ML_ROLE)) + .build() + ), + PersistentTasksCustomMetadata.builder().build() + ); + MlMemoryTracker mockTracker = mock(MlMemoryTracker.class); + when(mockTracker.getAnomalyDetectorJobMemoryRequirement(jobId)).thenReturn(memory / 4); + this.assertAsync( + listener -> MlAutoscalingResourceTracker.getMemoryAndProcessors( + mlAutoscalingContext, + mockTracker, + Map.of("ml-1", memory, "ml-2", memory), + memory / 2, + 10, + MachineLearning.DEFAULT_MAX_OPEN_JOBS_PER_NODE, + listener + ), + stats -> { + assertEquals(memory, stats.perNodeMemoryInBytes()); + assertEquals(2, stats.nodes()); + assertEquals(1, stats.minNodes()); + assertEquals(0, stats.extraSingleNodeProcessors()); + assertEquals(memory / 4, stats.extraSingleNodeModelMemoryInBytes()); + assertEquals(memory / 4, stats.extraModelMemoryInBytes()); + assertEquals(MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes(), stats.perNodeMemoryOverheadInBytes()); + } + ); + } + + public void testGetMemoryAndProcessorsScaleUpGivenAwaitingLazyAssignmentButFailed() throws InterruptedException { + long memory = 1000000000; + Map nodeAttr = Map.of( + MachineLearning.MACHINE_MEMORY_NODE_ATTR, + Long.toString(memory), + MachineLearning.MAX_JVM_SIZE_NODE_ATTR, + "400000000", + MachineLearning.ML_CONFIG_VERSION_NODE_ATTR, + "7.2.0" + ); + String jobId = "lazy-job"; + MlAutoscalingContext mlAutoscalingContext = new MlAutoscalingContext( + List.of( + new PersistentTasksCustomMetadata.PersistentTask<>( + new PersistentTasksCustomMetadata.PersistentTask<>( + MlTasks.jobTaskId(jobId), + MlTasks.JOB_TASK_NAME, + new OpenJobAction.JobParams(jobId), + 1, + AWAITING_LAZY_ASSIGNMENT + ), + new JobTaskState(JobState.FAILED, 1, "a nasty bug") + ) + ), + List.of(), + List.of(), + Map.of(), + List.of( + DiscoveryNodeUtils.builder("ml-1") + .name("ml-1") + .address(new TransportAddress(InetAddress.getLoopbackAddress(), 9300)) + .attributes(nodeAttr) + .roles(Set.of(DiscoveryNodeRole.ML_ROLE)) + .build(), + DiscoveryNodeUtils.builder("ml-2") + .name("ml-2") + .address(new TransportAddress(InetAddress.getLoopbackAddress(), 9300)) + .attributes(nodeAttr) + .roles(Set.of(DiscoveryNodeRole.ML_ROLE)) + .build() + ), + PersistentTasksCustomMetadata.builder().build() + ); + MlMemoryTracker mockTracker = mock(MlMemoryTracker.class); + when(mockTracker.getAnomalyDetectorJobMemoryRequirement(jobId)).thenReturn(memory / 4); + this.assertAsync( + listener -> MlAutoscalingResourceTracker.getMemoryAndProcessors( + mlAutoscalingContext, + mockTracker, + Map.of("ml-1", memory, "ml-2", memory), + memory / 2, + 10, + MachineLearning.DEFAULT_MAX_OPEN_JOBS_PER_NODE, + listener + ), + stats -> { + assertEquals(memory, stats.perNodeMemoryInBytes()); + assertEquals(2, stats.nodes()); + assertEquals(0, stats.minNodes()); + assertEquals(0, stats.extraSingleNodeProcessors()); + assertEquals(0, stats.extraSingleNodeModelMemoryInBytes()); + assertEquals(0, stats.extraModelMemoryInBytes()); + assertEquals(MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes(), stats.perNodeMemoryOverheadInBytes()); + } + ); + } + public void testCheckIfJobsCanBeMovedInLeastEfficientWayMemoryOnly() { assertEquals( 0L, @@ -897,7 +1034,6 @@ public void testGetMemoryAndProcessorsScaleDown() throws InterruptedException { ) ).addRoutingEntry("ml-node-3", new RoutingInfo(1, 1, RoutingState.STARTED, "")).build() ), - List.of( DiscoveryNodeUtils.builder("ml-node-1") .name("ml-node-name-1")