Skip to content

Commit

Permalink
[ML] Ignore failed jobs in serverless autoscaling
Browse files Browse the repository at this point in the history
Failed jobs have persistent tasks but do not have corresponding
native processes running, so should not count towards the
memory requirements of the ML tier.

This PR filters failed jobs before calculating memory requirements
for serverless autoscaling. (This was already accounted for
correctly in stateful autoscaling.)

Also adds some missing tests.
  • Loading branch information
droberts195 committed Nov 1, 2023
1 parent df9002a commit 47ab70c
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.elasticsearch.xpack.core.ml.autoscaling.MlAutoscalingStats;
import org.elasticsearch.xpack.core.ml.inference.assignment.AssignmentState;
import org.elasticsearch.xpack.core.ml.inference.assignment.Priority;
import org.elasticsearch.xpack.core.ml.inference.assignment.TrainedModelAssignment;
import org.elasticsearch.xpack.core.ml.utils.MemoryTrackedTaskState;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.NodeLoadDetector;
import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
Expand Down Expand Up @@ -130,22 +132,27 @@ static void getMemoryAndProcessors(
autoscalingContext.modelAssignments.size()
);

// start with `minNodes = 1` if any ML job is started, further adjustments are made for trained models below
int minNodes = autoscalingContext.anomalyDetectionTasks.isEmpty()
&& autoscalingContext.dataframeAnalyticsTasks.isEmpty()
&& autoscalingContext.modelAssignments.isEmpty() ? 0 : 1;
// Start with `minNodes = 0`. If any ML job is started this will be increased to 1 in the loops below,
// and further adjustments are made for trained models depending on allocations.
int minNodes = 0;

// anomaly detection
for (var task : autoscalingContext.anomalyDetectionTasks) {
MemoryTrackedTaskState state = MlTasks.getMemoryTrackedTaskState(task);
if (state != null && state.consumesMemory() == false) {
continue;
}

String jobId = ((OpenJobAction.JobParams) task.getParams()).getJobId();
Long jobMemory = mlMemoryTracker.getAnomalyDetectorJobMemoryRequirement(jobId);

if (jobMemory == null) {
logger.debug("could not find memory requirement for job [{}], returning no-scale", jobId);
listener.onResponse(noScaleStats(numberMlNodes));
return;
}

minNodes = 1;

if (AWAITING_LAZY_ASSIGNMENT.equals(task.getAssignment())) {
logger.debug("job [{}] lacks assignment , memory required [{}]", jobId, jobMemory);

Expand All @@ -165,15 +172,21 @@ static void getMemoryAndProcessors(

// data frame analytics
for (var task : autoscalingContext.dataframeAnalyticsTasks) {
MemoryTrackedTaskState state = MlTasks.getMemoryTrackedTaskState(task);
if (state != null && state.consumesMemory() == false) {
continue;
}

String jobId = MlTasks.dataFrameAnalyticsId(task.getId());
Long jobMemory = mlMemoryTracker.getDataFrameAnalyticsJobMemoryRequirement(jobId);

if (jobMemory == null) {
logger.debug("could not find memory requirement for job [{}], returning no-scale", jobId);
listener.onResponse(noScaleStats(numberMlNodes));
return;
}

minNodes = 1;

if (AWAITING_LAZY_ASSIGNMENT.equals(task.getAssignment())) {
logger.debug("dfa job [{}] lacks assignment , memory required [{}]", jobId, jobMemory);

Expand All @@ -192,12 +205,12 @@ static void getMemoryAndProcessors(

// trained models
for (var modelAssignment : autoscalingContext.modelAssignments.entrySet()) {
final int numberOfAllocations = modelAssignment.getValue().getTaskParams().getNumberOfAllocations();
final int numberOfThreadsPerAllocation = modelAssignment.getValue().getTaskParams().getThreadsPerAllocation();
final long estimatedMemoryUsage = modelAssignment.getValue().getTaskParams().estimateMemoryUsageBytes();
TrainedModelAssignment assignment = modelAssignment.getValue();
final int numberOfAllocations = assignment.getTaskParams().getNumberOfAllocations();
final int numberOfThreadsPerAllocation = assignment.getTaskParams().getThreadsPerAllocation();
final long estimatedMemoryUsage = assignment.getTaskParams().estimateMemoryUsageBytes();

if (AssignmentState.STARTING.equals(modelAssignment.getValue().getAssignmentState())
&& modelAssignment.getValue().getNodeRoutingTable().isEmpty()) {
if (AssignmentState.STARTING.equals(assignment.getAssignmentState()) && assignment.getNodeRoutingTable().isEmpty()) {

logger.debug(
() -> format(
Expand All @@ -216,6 +229,9 @@ static void getMemoryAndProcessors(
extraSingleNodeProcessors = Math.max(extraSingleNodeProcessors, numberOfThreadsPerAllocation);
extraProcessors += numberOfAllocations * numberOfThreadsPerAllocation;
}
} else if (assignment.getNodeRoutingTable().values().stream().allMatch(r -> r.getState().consumesMemory() == false)) {
// Ignore states that don't consume memory, for example all allocations are failed
continue;
} else {
logger.debug(
() -> format(
Expand All @@ -229,9 +245,6 @@ static void getMemoryAndProcessors(
modelMemoryBytesSum += estimatedMemoryUsage;
processorsSum += numberOfAllocations * numberOfThreadsPerAllocation;

// min(3, max(number of allocations over all deployed models)
minNodes = Math.min(3, Math.max(minNodes, numberOfAllocations));

for (String node : modelAssignment.getValue().getNodeRoutingTable().keySet()) {
perNodeModelMemoryInBytes.computeIfAbsent(node, k -> new ArrayList<>())
.add(
Expand All @@ -244,6 +257,9 @@ static void getMemoryAndProcessors(
);
}
}

// min(3, max(number of allocations over all deployed models)
minNodes = Math.min(3, Math.max(minNodes, numberOfAllocations));
}

// check for downscaling
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
import org.elasticsearch.xpack.core.ml.action.StartTrainedModelDeploymentAction;
import org.elasticsearch.xpack.core.ml.autoscaling.MlAutoscalingStats;
import org.elasticsearch.xpack.core.ml.inference.assignment.Priority;
import org.elasticsearch.xpack.core.ml.inference.assignment.RoutingInfo;
import org.elasticsearch.xpack.core.ml.inference.assignment.RoutingState;
import org.elasticsearch.xpack.core.ml.inference.assignment.TrainedModelAssignment;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.process.MlMemoryTracker;

Expand All @@ -34,7 +38,9 @@
import java.util.function.Consumer;

import static org.elasticsearch.xpack.ml.autoscaling.MlAutoscalingResourceTracker.MlJobRequirements;
import static org.elasticsearch.xpack.ml.job.JobNodeSelector.AWAITING_LAZY_ASSIGNMENT;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class MlAutoscalingResourceTrackerTests extends ESTestCase {

Expand Down Expand Up @@ -83,6 +89,137 @@ public void testGetMemoryAndProcessors() throws InterruptedException {
);
}

public void testGetMemoryAndProcessorsScaleUpGivenAwaitingLazyAssignment() throws InterruptedException {
long memory = 1000000000;
Map<String, String> nodeAttr = Map.of(
MachineLearning.MACHINE_MEMORY_NODE_ATTR,
Long.toString(memory),
MachineLearning.MAX_JVM_SIZE_NODE_ATTR,
"400000000",
MachineLearning.ML_CONFIG_VERSION_NODE_ATTR,
"7.2.0"
);
String jobId = "lazy-job";
MlAutoscalingContext mlAutoscalingContext = new MlAutoscalingContext(
List.of(
new PersistentTasksCustomMetadata.PersistentTask<>(
MlTasks.jobTaskId(jobId),
MlTasks.JOB_TASK_NAME,
new OpenJobAction.JobParams(jobId),
1,
AWAITING_LAZY_ASSIGNMENT
)
),
List.of(),
List.of(),
Map.of(),
List.of(
DiscoveryNodeUtils.builder("ml-1")
.name("ml-1")
.address(new TransportAddress(InetAddress.getLoopbackAddress(), 9300))
.attributes(nodeAttr)
.roles(Set.of(DiscoveryNodeRole.ML_ROLE))
.build(),
DiscoveryNodeUtils.builder("ml-2")
.name("ml-2")
.address(new TransportAddress(InetAddress.getLoopbackAddress(), 9300))
.attributes(nodeAttr)
.roles(Set.of(DiscoveryNodeRole.ML_ROLE))
.build()
),
PersistentTasksCustomMetadata.builder().build()
);
MlMemoryTracker mockTracker = mock(MlMemoryTracker.class);
when(mockTracker.getAnomalyDetectorJobMemoryRequirement(jobId)).thenReturn(memory / 4);
this.<MlAutoscalingStats>assertAsync(
listener -> MlAutoscalingResourceTracker.getMemoryAndProcessors(
mlAutoscalingContext,
mockTracker,
Map.of("ml-1", memory, "ml-2", memory),
memory / 2,
10,
MachineLearning.DEFAULT_MAX_OPEN_JOBS_PER_NODE,
listener
),
stats -> {
assertEquals(memory, stats.perNodeMemoryInBytes());
assertEquals(2, stats.nodes());
assertEquals(1, stats.minNodes());
assertEquals(0, stats.extraSingleNodeProcessors());
assertEquals(memory / 4, stats.extraSingleNodeModelMemoryInBytes());
assertEquals(memory / 4, stats.extraModelMemoryInBytes());
assertEquals(MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes(), stats.perNodeMemoryOverheadInBytes());
}
);
}

public void testGetMemoryAndProcessorsScaleUpGivenAwaitingLazyAssignmentButFailed() throws InterruptedException {
long memory = 1000000000;
Map<String, String> nodeAttr = Map.of(
MachineLearning.MACHINE_MEMORY_NODE_ATTR,
Long.toString(memory),
MachineLearning.MAX_JVM_SIZE_NODE_ATTR,
"400000000",
MachineLearning.ML_CONFIG_VERSION_NODE_ATTR,
"7.2.0"
);
String jobId = "lazy-job";
MlAutoscalingContext mlAutoscalingContext = new MlAutoscalingContext(
List.of(
new PersistentTasksCustomMetadata.PersistentTask<>(
new PersistentTasksCustomMetadata.PersistentTask<>(
MlTasks.jobTaskId(jobId),
MlTasks.JOB_TASK_NAME,
new OpenJobAction.JobParams(jobId),
1,
AWAITING_LAZY_ASSIGNMENT
),
new JobTaskState(JobState.FAILED, 1, "a nasty bug")
)
),
List.of(),
List.of(),
Map.of(),
List.of(
DiscoveryNodeUtils.builder("ml-1")
.name("ml-1")
.address(new TransportAddress(InetAddress.getLoopbackAddress(), 9300))
.attributes(nodeAttr)
.roles(Set.of(DiscoveryNodeRole.ML_ROLE))
.build(),
DiscoveryNodeUtils.builder("ml-2")
.name("ml-2")
.address(new TransportAddress(InetAddress.getLoopbackAddress(), 9300))
.attributes(nodeAttr)
.roles(Set.of(DiscoveryNodeRole.ML_ROLE))
.build()
),
PersistentTasksCustomMetadata.builder().build()
);
MlMemoryTracker mockTracker = mock(MlMemoryTracker.class);
when(mockTracker.getAnomalyDetectorJobMemoryRequirement(jobId)).thenReturn(memory / 4);
this.<MlAutoscalingStats>assertAsync(
listener -> MlAutoscalingResourceTracker.getMemoryAndProcessors(
mlAutoscalingContext,
mockTracker,
Map.of("ml-1", memory, "ml-2", memory),
memory / 2,
10,
MachineLearning.DEFAULT_MAX_OPEN_JOBS_PER_NODE,
listener
),
stats -> {
assertEquals(memory, stats.perNodeMemoryInBytes());
assertEquals(2, stats.nodes());
assertEquals(0, stats.minNodes());
assertEquals(0, stats.extraSingleNodeProcessors());
assertEquals(0, stats.extraSingleNodeModelMemoryInBytes());
assertEquals(0, stats.extraModelMemoryInBytes());
assertEquals(MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes(), stats.perNodeMemoryOverheadInBytes());
}
);
}

public void testCheckIfJobsCanBeMovedInLeastEfficientWayMemoryOnly() {
assertEquals(
0L,
Expand Down Expand Up @@ -897,7 +1034,6 @@ public void testGetMemoryAndProcessorsScaleDown() throws InterruptedException {
)
).addRoutingEntry("ml-node-3", new RoutingInfo(1, 1, RoutingState.STARTED, "")).build()
),

List.of(
DiscoveryNodeUtils.builder("ml-node-1")
.name("ml-node-name-1")
Expand Down

0 comments on commit 47ab70c

Please sign in to comment.