diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/autoscaling/MlAutoscalingStats.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/autoscaling/MlAutoscalingStats.java index ffadf4cafaf12..febe6e97a12aa 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/autoscaling/MlAutoscalingStats.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/autoscaling/MlAutoscalingStats.java @@ -29,21 +29,30 @@ *

* The word "total" in an attribute name indicates that the attribute is a sum across all nodes. * - * @param currentTotalNodes the count of nodes that are currently in the cluster - * @param currentPerNodeMemoryBytes the minimum size (memory) of all nodes in the cluster - * @param currentTotalModelMemoryBytes the sum of model memory over every assignment/deployment - * @param currentTotalProcessorsInUse the sum of processors used over every assignment/deployment - * @param currentPerNodeMemoryOverheadBytes always equal to MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD - * @param wantedMinNodes the minimum number of nodes that must be provided by the autoscaler - * @param wantedExtraPerNodeMemoryBytes the amount of additional memory that must be provided on every node - * (this value must be >0 to trigger a scale up based on memory) - * @param wantedExtraPerNodeNodeProcessors the number of additional processors that must be provided on every node - * (this value must be >0 to trigger a scale up based on processors) - * @param wantedExtraModelMemoryBytes the amount of additional model memory that is newly required - * (due to a new assignment/deployment) - * @param wantedExtraProcessors the number of additional processors that are required to be added to the cluster - * @param unwantedNodeMemoryBytesToRemove the amount of memory that should be removed from the cluster. If this is equal to the amount of - * memory provided by a node, a node will be removed. + * @param currentTotalNodes The count of nodes that are currently in the cluster, + * used to confirm that both sides have same view of current state + * @param currentPerNodeMemoryBytes The minimum size (memory) of all nodes in the cluster + * used to confirm that both sides have same view of current state. + * @param currentTotalModelMemoryBytes The sum of model memory over every assignment/deployment, used to calculate requirements + * @param currentTotalProcessorsInUse The sum of processors used over every assignment/deployment, not used by autoscaler + * @param currentPerNodeMemoryOverheadBytes Always equal to MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD, + * @param wantedMinNodes The minimum number of nodes that must be provided by the autoscaler + * @param wantedExtraPerNodeMemoryBytes If there are jobs or trained models that have been started but cannot be allocated on the + * ML nodes currently within the cluster then this will be the *max* of the ML native memory + * requirements of those jobs/trained models. The metric is in terms of ML native memory, + * not container memory. + * @param wantedExtraPerNodeNodeProcessors If there are trained model allocations that have been started but cannot be allocated on the + * ML nodes currently within the cluster then this will be the *max* of the vCPU requirements of + * those allocations. Zero otherwise. + * @param wantedExtraModelMemoryBytes If there are jobs or trained models that have been started but cannot be allocated on the ML + * nodes currently within the cluster then this will be the *sum* of the ML native memory + * requirements of those jobs/trained models. The metric is in terms of ML native memory, + * not container memory. + * @param wantedExtraProcessors If there are trained model allocations that have been started but cannot be allocated on the + * ML nodes currently within the cluster then this will be the *sum* of the vCPU requirements + * of those allocations. Zero otherwise. + * @param unwantedNodeMemoryBytesToRemove The size of the ML node to be removed, in GB rounded to the nearest GB, + * or zero if no nodes could be removed. */ public record MlAutoscalingStats( diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentRebalancer.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentRebalancer.java index 624ef5434e2a0..8804d588988b2 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentRebalancer.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentRebalancer.java @@ -123,8 +123,8 @@ private static AssignmentPlan mergePlans( nodesByZone.values().forEach(allNodes::addAll); final List allDeployments = new ArrayList<>(); - allDeployments.addAll(planForNormalPriorityModels.models()); - allDeployments.addAll(planForLowPriorityModels.models()); + allDeployments.addAll(planForNormalPriorityModels.deployments()); + allDeployments.addAll(planForLowPriorityModels.deployments()); final Map originalNodeById = allNodes.stream() .collect(Collectors.toMap(AssignmentPlan.Node::id, Function.identity())); @@ -139,7 +139,7 @@ private static void copyAssignments( AssignmentPlan.Builder dest, Map originalNodeById ) { - for (AssignmentPlan.Deployment m : source.models()) { + for (AssignmentPlan.Deployment m : source.deployments()) { Map nodeAssignments = source.assignments(m).orElse(Map.of()); for (Map.Entry assignment : nodeAssignments.entrySet()) { AssignmentPlan.Node originalNode = originalNodeById.get(assignment.getKey().id()); @@ -328,14 +328,14 @@ private static long getNodeFreeMemoryExcludingPerNodeOverheadAndNativeInference( private TrainedModelAssignmentMetadata.Builder buildAssignmentsFromPlan(AssignmentPlan assignmentPlan) { TrainedModelAssignmentMetadata.Builder builder = TrainedModelAssignmentMetadata.Builder.empty(); - for (AssignmentPlan.Deployment deployment : assignmentPlan.models()) { - TrainedModelAssignment existingAssignment = currentMetadata.getDeploymentAssignment(deployment.id()); + for (AssignmentPlan.Deployment deployment : assignmentPlan.deployments()) { + TrainedModelAssignment existingAssignment = currentMetadata.getDeploymentAssignment(deployment.deploymentId()); TrainedModelAssignment.Builder assignmentBuilder = existingAssignment == null && createAssignmentRequest.isPresent() ? TrainedModelAssignment.Builder.empty(createAssignmentRequest.get()) : TrainedModelAssignment.Builder.empty( - currentMetadata.getDeploymentAssignment(deployment.id()).getTaskParams(), - currentMetadata.getDeploymentAssignment(deployment.id()).getAdaptiveAllocationsSettings() + currentMetadata.getDeploymentAssignment(deployment.deploymentId()).getTaskParams(), + currentMetadata.getDeploymentAssignment(deployment.deploymentId()).getAdaptiveAllocationsSettings() ); if (existingAssignment != null) { assignmentBuilder.setStartTime(existingAssignment.getStartTime()); @@ -366,7 +366,7 @@ private TrainedModelAssignmentMetadata.Builder buildAssignmentsFromPlan(Assignme assignmentBuilder.calculateAndSetAssignmentState(); explainAssignments(assignmentPlan, nodeLoads, deployment).ifPresent(assignmentBuilder::setReason); - builder.addNewAssignment(deployment.id(), assignmentBuilder); + builder.addNewAssignment(deployment.deploymentId(), assignmentBuilder); } return builder; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/planning/AbstractPreserveAllocations.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/planning/AbstractPreserveAllocations.java index 0151c8f5ee9c8..66b8d9e570211 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/planning/AbstractPreserveAllocations.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/planning/AbstractPreserveAllocations.java @@ -54,7 +54,7 @@ Deployment modifyModelPreservingPreviousAssignments(Deployment m) { } return new Deployment( - m.id(), + m.deploymentId(), m.memoryBytes(), m.allocations() - calculatePreservedAllocations(m), m.threadsPerAllocation(), @@ -71,11 +71,14 @@ AssignmentPlan mergePreservedAllocations(AssignmentPlan assignmentPlan) { // they will not match the models/nodes members we have in this class. // Therefore, we build a lookup table based on the ids, so we can merge the plan // with its preserved allocations. - final Map, Integer> plannedAssignmentsByModelNodeIdPair = new HashMap<>(); - for (Deployment m : assignmentPlan.models()) { - Map assignments = assignmentPlan.assignments(m).orElse(Map.of()); - for (Map.Entry nodeAssignment : assignments.entrySet()) { - plannedAssignmentsByModelNodeIdPair.put(Tuple.tuple(m.id(), nodeAssignment.getKey().id()), nodeAssignment.getValue()); + final Map, Integer> plannedAssignmentsByDeploymentNodeIdPair = new HashMap<>(); + for (Deployment d : assignmentPlan.deployments()) { + Map assignmentsOfDeployment = assignmentPlan.assignments(d).orElse(Map.of()); + for (Map.Entry nodeAssignment : assignmentsOfDeployment.entrySet()) { + plannedAssignmentsByDeploymentNodeIdPair.put( + Tuple.tuple(d.deploymentId(), nodeAssignment.getKey().id()), + nodeAssignment.getValue() + ); } } @@ -93,8 +96,8 @@ AssignmentPlan mergePreservedAllocations(AssignmentPlan assignmentPlan) { } } for (Deployment deploymentNewAllocations : deployments) { - int newAllocations = plannedAssignmentsByModelNodeIdPair.getOrDefault( - Tuple.tuple(deploymentNewAllocations.id(), n.id()), + int newAllocations = plannedAssignmentsByDeploymentNodeIdPair.getOrDefault( + Tuple.tuple(deploymentNewAllocations.deploymentId(), n.id()), 0 ); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/planning/AssignmentPlan.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/planning/AssignmentPlan.java index 7fc16394ed85c..c294e7b2de792 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/planning/AssignmentPlan.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/planning/AssignmentPlan.java @@ -31,8 +31,22 @@ */ public class AssignmentPlan implements Comparable { + /** + * + * @param deploymentId + * @param memoryBytes + * @param allocations + * @param threadsPerAllocation + * @param currentAllocationsByNodeId + * @param maxAssignedAllocations this value is used by the ZoneAwareAssignmentPlan and AssignmentPlanner to keep track of the + * maximum number of allocations which have been assigned. It is mainly for assigning over AZs. + * @param adaptiveAllocationsSettings + * @param priority + * @param perDeploymentMemoryBytes + * @param perAllocationMemoryBytes + */ public record Deployment( - String id, + String deploymentId, long memoryBytes, int allocations, int threadsPerAllocation, @@ -44,7 +58,7 @@ public record Deployment( long perAllocationMemoryBytes ) { public Deployment( - String id, + String deploymentId, long modelBytes, int allocations, int threadsPerAllocation, @@ -55,7 +69,7 @@ public Deployment( long perAllocationMemoryBytes ) { this( - id, + deploymentId, modelBytes, allocations, threadsPerAllocation, @@ -82,7 +96,7 @@ boolean hasEverBeenAllocated() { public long estimateMemoryUsageBytes(int allocations) { return StartTrainedModelDeploymentAction.estimateMemoryUsageBytes( - id, + deploymentId, memoryBytes, perDeploymentMemoryBytes, perAllocationMemoryBytes, @@ -92,13 +106,13 @@ public long estimateMemoryUsageBytes(int allocations) { long estimateAdditionalMemoryUsageBytes(int allocationsOld, int allocationsNew) { return StartTrainedModelDeploymentAction.estimateMemoryUsageBytes( - id, + deploymentId, memoryBytes, perDeploymentMemoryBytes, perAllocationMemoryBytes, allocationsNew ) - StartTrainedModelDeploymentAction.estimateMemoryUsageBytes( - id, + deploymentId, memoryBytes, perDeploymentMemoryBytes, perAllocationMemoryBytes, @@ -109,7 +123,7 @@ long estimateAdditionalMemoryUsageBytes(int allocationsOld, int allocationsNew) long minimumMemoryRequiredBytes() { return StartTrainedModelDeploymentAction.estimateMemoryUsageBytes( - id, + deploymentId, memoryBytes, perDeploymentMemoryBytes, perAllocationMemoryBytes, @@ -136,7 +150,7 @@ int findExcessAllocations(int maxAllocations, long availableMemoryBytes) { @Override public String toString() { - return id + return deploymentId + " (mem = " + ByteSizeValue.ofBytes(memoryBytes) + ") (allocations = " @@ -186,7 +200,7 @@ private AssignmentPlan( this.remainingModelAllocations = Objects.requireNonNull(remainingModelAllocations); } - public Set models() { + public Set deployments() { return assignments.keySet(); } @@ -208,7 +222,7 @@ public int compareTo(AssignmentPlan o) { } public boolean satisfiesCurrentAssignments() { - return models().stream().allMatch(this::isSatisfyingCurrentAssignmentsForModel); + return deployments().stream().allMatch(this::isSatisfyingCurrentAssignmentsForModel); } private boolean isSatisfyingCurrentAssignmentsForModel(Deployment m) { @@ -225,18 +239,18 @@ public boolean satisfiesAllocations(Deployment m) { } public boolean satisfiesAllModels() { - return models().stream().allMatch(this::satisfiesAllocations); + return deployments().stream().allMatch(this::satisfiesAllocations); } public boolean arePreviouslyAssignedModelsAssigned() { - return models().stream() + return deployments().stream() .filter(Deployment::hasEverBeenAllocated) .map(this::totalAllocations) .allMatch(totalAllocations -> totalAllocations > 0); } public long countPreviouslyAssignedModelsThatAreStillAssigned() { - return models().stream() + return deployments().stream() .filter(Deployment::hasEverBeenAllocated) .map(this::totalAllocations) .filter(totalAllocations -> totalAllocations > 0) @@ -301,11 +315,11 @@ public String prettyPrint() { msg.append(" ->"); for (Tuple modelAllocations : nodeToModel.get(n) .stream() - .sorted(Comparator.comparing(x -> x.v1().id())) + .sorted(Comparator.comparing(x -> x.v1().deploymentId())) .toList()) { if (modelAllocations.v2() > 0) { msg.append(" "); - msg.append(modelAllocations.v1().id()); + msg.append(modelAllocations.v1().deploymentId()); msg.append(" (mem = "); msg.append(ByteSizeValue.ofBytes(modelAllocations.v1().memoryBytes())); msg.append(")"); @@ -415,7 +429,7 @@ public Builder assignModelToNode(Deployment deployment, Node node, int allocatio + "] to assign [" + allocations + "] allocations to deployment [" - + deployment.id() + + deployment.deploymentId() + "]" ); } @@ -426,7 +440,7 @@ public Builder assignModelToNode(Deployment deployment, Node node, int allocatio + "] to assign [" + allocations + "] allocations to deployment [" - + deployment.id() + + deployment.deploymentId() + "]; required threads per allocation [" + deployment.threadsPerAllocation() + "]" @@ -464,7 +478,7 @@ public void accountMemory(Deployment m, Node n) { private void accountMemory(Deployment m, Node n, long requiredMemory) { remainingNodeMemory.computeIfPresent(n, (k, v) -> v - requiredMemory); if (remainingNodeMemory.containsKey(n) && remainingNodeMemory.get(n) < 0) { - throw new IllegalArgumentException("not enough memory on node [" + n.id() + "] to assign model [" + m.id() + "]"); + throw new IllegalArgumentException("not enough memory on node [" + n.id() + "] to assign model [" + m.deploymentId() + "]"); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/planning/AssignmentPlanner.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/planning/AssignmentPlanner.java index 38279a2fd6c03..8b5f33e25e242 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/planning/AssignmentPlanner.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/planning/AssignmentPlanner.java @@ -50,7 +50,7 @@ public class AssignmentPlanner { public AssignmentPlanner(List nodes, List deployments) { this.nodes = nodes.stream().sorted(Comparator.comparing(Node::id)).toList(); - this.deployments = deployments.stream().sorted(Comparator.comparing(AssignmentPlan.Deployment::id)).toList(); + this.deployments = deployments.stream().sorted(Comparator.comparing(AssignmentPlan.Deployment::deploymentId)).toList(); } public AssignmentPlan computePlan() { @@ -111,7 +111,7 @@ private AssignmentPlan solveAllocatingAtLeastOnceModelsThatWerePreviouslyAllocat .filter(m -> m.hasEverBeenAllocated()) .map( m -> new AssignmentPlan.Deployment( - m.id(), + m.deploymentId(), m.memoryBytes(), 1, m.threadsPerAllocation(), @@ -130,21 +130,21 @@ private AssignmentPlan solveAllocatingAtLeastOnceModelsThatWerePreviouslyAllocat ).solvePlan(true); Map modelIdToNodeIdWithSingleAllocation = new HashMap<>(); - for (AssignmentPlan.Deployment m : planWithSingleAllocationForPreviouslyAssignedModels.models()) { + for (AssignmentPlan.Deployment m : planWithSingleAllocationForPreviouslyAssignedModels.deployments()) { Optional> assignments = planWithSingleAllocationForPreviouslyAssignedModels.assignments(m); Set nodes = assignments.orElse(Map.of()).keySet(); if (nodes.isEmpty() == false) { assert nodes.size() == 1; - modelIdToNodeIdWithSingleAllocation.put(m.id(), nodes.iterator().next().id()); + modelIdToNodeIdWithSingleAllocation.put(m.deploymentId(), nodes.iterator().next().id()); } } List planDeployments = deployments.stream().map(m -> { - Map currentAllocationsByNodeId = modelIdToNodeIdWithSingleAllocation.containsKey(m.id()) - ? Map.of(modelIdToNodeIdWithSingleAllocation.get(m.id()), 1) + Map currentAllocationsByNodeId = modelIdToNodeIdWithSingleAllocation.containsKey(m.deploymentId()) + ? Map.of(modelIdToNodeIdWithSingleAllocation.get(m.deploymentId()), 1) : Map.of(); return new AssignmentPlan.Deployment( - m.id(), + m.deploymentId(), m.memoryBytes(), m.allocations(), m.threadsPerAllocation(), diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/planning/LinearProgrammingPlanSolver.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/planning/LinearProgrammingPlanSolver.java index bd97680e285cc..90b3d3590a254 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/planning/LinearProgrammingPlanSolver.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/planning/LinearProgrammingPlanSolver.java @@ -279,24 +279,24 @@ private boolean solveLinearProgram( Map, Variable> allocationVars = new HashMap<>(); - for (AssignmentPlan.Deployment m : deployments) { + for (AssignmentPlan.Deployment d : deployments) { for (Node n : nodes) { - Variable allocationVar = model.addVariable("allocations_of_model_" + m.id() + "_on_node_" + n.id()) + Variable allocationVar = model.addVariable("allocations_of_model_" + d.deploymentId() + "_on_node_" + n.id()) .integer(false) // We relax the program to non-integer as the integer solver is much slower and can often lead to // infeasible solutions .lower(0.0) // It is important not to set an upper bound here as it impacts memory negatively - .weight(weightForAllocationVar(m, n, weights)); - allocationVars.put(Tuple.tuple(m, n), allocationVar); + .weight(weightForAllocationVar(d, n, weights)); + allocationVars.put(Tuple.tuple(d, n), allocationVar); } } - for (Deployment m : deployments) { + for (Deployment d : deployments) { // Each model should not get more allocations than is required. // Also, if the model has previous assignments, it should get at least as many allocations as it did before. - model.addExpression("allocations_of_model_" + m.id() + "_not_more_than_required") - .lower(m.getCurrentAssignedAllocations()) - .upper(m.allocations()) - .setLinearFactorsSimple(varsForModel(m, allocationVars)); + model.addExpression("allocations_of_model_" + d.deploymentId() + "_not_more_than_required") + .lower(d.getCurrentAssignedAllocations()) + .upper(d.allocations()) + .setLinearFactorsSimple(varsForModel(d, allocationVars)); } double[] threadsPerAllocationPerModel = deployments.stream().mapToDouble(m -> m.threadsPerAllocation()).toArray(); @@ -374,18 +374,18 @@ private String prettyPrintSolverResult( for (int i = 0; i < nodes.size(); i++) { Node n = nodes.get(i); msg.append(n + " ->"); - for (Deployment m : deployments) { - if (threadValues.get(Tuple.tuple(m, n)) > 0) { + for (Deployment d : deployments) { + if (threadValues.get(Tuple.tuple(d, n)) > 0) { msg.append(" "); - msg.append(m.id()); + msg.append(d.deploymentId()); msg.append(" (mem = "); - msg.append(ByteSizeValue.ofBytes(m.memoryBytes())); + msg.append(ByteSizeValue.ofBytes(d.memoryBytes())); msg.append(") (allocations = "); - msg.append(threadValues.get(Tuple.tuple(m, n))); + msg.append(threadValues.get(Tuple.tuple(d, n))); msg.append("/"); - msg.append(m.allocations()); + msg.append(d.allocations()); msg.append(") (y = "); - msg.append(assignmentValues.get(Tuple.tuple(m, n))); + msg.append(assignmentValues.get(Tuple.tuple(d, n))); msg.append(")"); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/planning/ZoneAwareAssignmentPlanner.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/planning/ZoneAwareAssignmentPlanner.java index 1f0857391598f..c5b750f91014f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/planning/ZoneAwareAssignmentPlanner.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/planning/ZoneAwareAssignmentPlanner.java @@ -80,22 +80,22 @@ private AssignmentPlan computePlan(boolean tryAssigningPreviouslyAssignedModels) // allocated on the first per zone assignment plans. int remainingZones = nodesByZone.size(); - Map modelIdToRemainingAllocations = deployments.stream() - .collect(Collectors.toMap(AssignmentPlan.Deployment::id, AssignmentPlan.Deployment::allocations)); + Map deploymentIdToRemainingAllocations = deployments.stream() + .collect(Collectors.toMap(AssignmentPlan.Deployment::deploymentId, AssignmentPlan.Deployment::allocations)); List plans = new ArrayList<>(); for (var zoneToNodes : nodesByZone.entrySet()) { logger.debug(() -> format("computing plan for availability zone %s", zoneToNodes.getKey())); AssignmentPlan plan = computeZonePlan( zoneToNodes.getValue(), - modelIdToRemainingAllocations, + deploymentIdToRemainingAllocations, remainingZones, tryAssigningPreviouslyAssignedModels ); - plan.models() + plan.deployments() .forEach( - m -> modelIdToRemainingAllocations.computeIfPresent( - m.id(), - (modelId, remainingAllocations) -> remainingAllocations - plan.totalAllocations(m) + d -> deploymentIdToRemainingAllocations.computeIfPresent( + d.deploymentId(), + (deploymentId, remainingAllocations) -> remainingAllocations - plan.totalAllocations(d) ) ); plans.add(plan); @@ -108,56 +108,69 @@ private AssignmentPlan computePlan(boolean tryAssigningPreviouslyAssignedModels) private AssignmentPlan computeZonePlan( List nodes, - Map modelIdToRemainingAllocations, + Map deploymentIdToRemainingAllocations, int remainingZones, boolean tryAssigningPreviouslyAssignedModels ) { - Map modelIdToTargetAllocations = modelIdToRemainingAllocations.entrySet() + Map deploymentIdToTargetAllocationsPerZone = deploymentIdToRemainingAllocations.entrySet() .stream() .filter(e -> e.getValue() > 0) - .collect(Collectors.toMap(e -> e.getKey(), e -> (e.getValue() - 1) / remainingZones + 1)); + .collect( + Collectors.toMap(Map.Entry::getKey, e -> 1 + remainingAllocationsPerZoneAfterAssigningOne(remainingZones, e.getValue())) + ); + // If there was at least one allocation for a deployment, we will apply it to each zone List modifiedDeployments = deployments.stream() - .filter(m -> modelIdToTargetAllocations.getOrDefault(m.id(), 0) > 0) + .filter(d -> deploymentIdToTargetAllocationsPerZone.getOrDefault(d.deploymentId(), 0) > 0) + // filter out deployments with no allocations .map( - m -> new AssignmentPlan.Deployment( - m.id(), - m.memoryBytes(), - modelIdToTargetAllocations.get(m.id()), - m.threadsPerAllocation(), - m.currentAllocationsByNodeId(), - (tryAssigningPreviouslyAssignedModels && modelIdToRemainingAllocations.get(m.id()) == m.allocations()) - ? m.maxAssignedAllocations() + d -> new AssignmentPlan.Deployment( + // replace each deployment with a new deployment + d.deploymentId(), + d.memoryBytes(), + deploymentIdToTargetAllocationsPerZone.get(d.deploymentId()), + d.threadsPerAllocation(), + d.currentAllocationsByNodeId(), + // (below) Only force assigning at least once previously assigned models that have not had any allocation yet + (tryAssigningPreviouslyAssignedModels && deploymentIdToRemainingAllocations.get(d.deploymentId()) == d.allocations()) + ? d.maxAssignedAllocations() : 0, - m.getAdaptiveAllocationsSettings(), - // Only force assigning at least once previously assigned models that have not had any allocation yet - m.perDeploymentMemoryBytes(), - m.perAllocationMemoryBytes() + d.getAdaptiveAllocationsSettings(), + d.perDeploymentMemoryBytes(), + d.perAllocationMemoryBytes() ) ) .toList(); return new AssignmentPlanner(nodes, modifiedDeployments).computePlan(tryAssigningPreviouslyAssignedModels); } + private static int remainingAllocationsPerZoneAfterAssigningOne(int remainingZones, Integer remainingAllocations) { + if (remainingAllocations == null || remainingZones == 0) { + // should never happen + return 0; + } + return (remainingAllocations - 1) / remainingZones; + } + private AssignmentPlan computePlanAcrossAllNodes(List plans) { logger.debug(() -> "computing plan across all nodes"); final List allNodes = new ArrayList<>(); nodesByZone.values().forEach(allNodes::addAll); - Map> allocationsByNodeIdByModelId = mergeAllocationsByNodeIdByModelId(plans); + Map> allocationsByNodeIdByDeploymentId = mergeAllocationsByNodeIdByDeploymentId(plans); List modelsAccountingPlans = deployments.stream() .map( - m -> new AssignmentPlan.Deployment( - m.id(), - m.memoryBytes(), - m.allocations(), - m.threadsPerAllocation(), - allocationsByNodeIdByModelId.get(m.id()), - m.maxAssignedAllocations(), - m.getAdaptiveAllocationsSettings(), - m.perDeploymentMemoryBytes(), - m.perAllocationMemoryBytes() + d -> new AssignmentPlan.Deployment( + d.deploymentId(), + d.memoryBytes(), + d.allocations(), + d.threadsPerAllocation(), + allocationsByNodeIdByDeploymentId.get(d.deploymentId()), + d.maxAssignedAllocations(), + d.getAdaptiveAllocationsSettings(), + d.perDeploymentMemoryBytes(), + d.perAllocationMemoryBytes() ) ) .toList(); @@ -176,11 +189,11 @@ private AssignmentPlan swapOriginalModelsInPlan( List planDeployments ) { final Map originalModelById = deployments.stream() - .collect(Collectors.toMap(AssignmentPlan.Deployment::id, Function.identity())); + .collect(Collectors.toMap(AssignmentPlan.Deployment::deploymentId, Function.identity())); final Map originalNodeById = allNodes.stream().collect(Collectors.toMap(Node::id, Function.identity())); AssignmentPlan.Builder planBuilder = AssignmentPlan.builder(allNodes, deployments); for (AssignmentPlan.Deployment m : planDeployments) { - AssignmentPlan.Deployment originalDeployment = originalModelById.get(m.id()); + AssignmentPlan.Deployment originalDeployment = originalModelById.get(m.deploymentId()); Map nodeAssignments = plan.assignments(m).orElse(Map.of()); for (Map.Entry assignment : nodeAssignments.entrySet()) { Node originalNode = originalNodeById.get(assignment.getKey().id()); @@ -193,12 +206,12 @@ private AssignmentPlan swapOriginalModelsInPlan( return planBuilder.build(); } - private Map> mergeAllocationsByNodeIdByModelId(List plans) { - Map> allocationsByNodeIdByModelId = new HashMap<>(); - deployments.forEach(m -> allocationsByNodeIdByModelId.put(m.id(), new HashMap<>())); + private Map> mergeAllocationsByNodeIdByDeploymentId(List plans) { + Map> allocationsByNodeIdByDeploymentId = new HashMap<>(); + deployments.forEach(d -> allocationsByNodeIdByDeploymentId.put(d.deploymentId(), new HashMap<>())); for (AssignmentPlan plan : plans) { - for (AssignmentPlan.Deployment m : plan.models()) { - Map nodeIdToAllocations = allocationsByNodeIdByModelId.get(m.id()); + for (AssignmentPlan.Deployment m : plan.deployments()) { + Map nodeIdToAllocations = allocationsByNodeIdByDeploymentId.get(m.deploymentId()); Optional> assignments = plan.assignments(m); if (assignments.isPresent()) { for (Map.Entry nodeAssignments : assignments.get().entrySet()) { @@ -212,6 +225,6 @@ private Map> mergeAllocationsByNodeIdByModelId(List } } } - return allocationsByNodeIdByModelId; + return allocationsByNodeIdByDeploymentId; } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/assignment/planning/AssignmentPlanTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/assignment/planning/AssignmentPlanTests.java index d84c04f0c41f1..3f93c3431d891 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/assignment/planning/AssignmentPlanTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/assignment/planning/AssignmentPlanTests.java @@ -69,7 +69,7 @@ public void testAssignModelToNode_GivenNoPreviousAssignment() { AssignmentPlan plan = builder.build(); - assertThat(plan.models(), contains(m)); + assertThat(plan.deployments(), contains(m)); assertThat(plan.satisfiesCurrentAssignments(), is(true)); assertThat(plan.assignments(m).get(), equalTo(Map.of(n, 1))); } @@ -102,7 +102,7 @@ public void testAssignModelToNode_GivenNoPreviousAssignment() { AssignmentPlan plan = builder.build(); - assertThat(plan.models(), contains(m)); + assertThat(plan.deployments(), contains(m)); assertThat(plan.satisfiesCurrentAssignments(), is(true)); assertThat(plan.assignments(m).get(), equalTo(Map.of(n, 1))); } @@ -134,7 +134,7 @@ public void testAssignModelToNode_GivenNewPlanSatisfiesCurrentAssignment() { AssignmentPlan plan = builder.build(); - assertThat(plan.models(), contains(m)); + assertThat(plan.deployments(), contains(m)); assertThat(plan.satisfiesCurrentAssignments(), is(true)); assertThat(plan.assignments(m).get(), equalTo(Map.of(n, 1))); } @@ -162,7 +162,7 @@ public void testAssignModelToNode_GivenNewPlanSatisfiesCurrentAssignment() { AssignmentPlan plan = builder.build(); - assertThat(plan.models(), contains(m)); + assertThat(plan.deployments(), contains(m)); assertThat(plan.satisfiesCurrentAssignments(), is(true)); assertThat(plan.assignments(m).get(), equalTo(Map.of(n, 1))); @@ -186,7 +186,7 @@ public void testAssignModelToNode_GivenNewPlanDoesNotSatisfyCurrentAssignment() AssignmentPlan plan = builder.build(); - assertThat(plan.models(), contains(m)); + assertThat(plan.deployments(), contains(m)); assertThat(plan.satisfiesCurrentAssignments(), is(false)); assertThat(plan.assignments(m).get(), equalTo(Map.of(n, 1))); } @@ -215,7 +215,7 @@ public void testAssignModelToNode_GivenNewPlanDoesNotSatisfyCurrentAssignment() AssignmentPlan plan = builder.build(); - assertThat(plan.models(), contains(m)); + assertThat(plan.deployments(), contains(m)); assertThat(plan.satisfiesCurrentAssignments(), is(false)); assertThat(plan.assignments(m).get(), equalTo(Map.of(n, 1))); } @@ -251,7 +251,7 @@ public void testAssignModelToNode_GivenPreviouslyAssignedModelDoesNotFit() { builder.assignModelToNode(m, n, 2); AssignmentPlan plan = builder.build(); - assertThat(plan.models(), contains(m)); + assertThat(plan.deployments(), contains(m)); assertThat(plan.satisfiesCurrentAssignments(), is(true)); assertThat(plan.assignments(m).get(), equalTo(Map.of(n, 2))); } @@ -274,7 +274,7 @@ public void testAssignModelToNode_GivenPreviouslyAssignedModelDoesNotFit() { builder.assignModelToNode(m, n, 2); AssignmentPlan plan = builder.build(); - assertThat(plan.models(), contains(m)); + assertThat(plan.deployments(), contains(m)); assertThat(plan.satisfiesCurrentAssignments(), is(true)); assertThat(plan.assignments(m).get(), equalTo(Map.of(n, 2))); } @@ -355,7 +355,7 @@ public void testAssignModelToNode_GivenSameModelAssignedTwice() { AssignmentPlan plan = builder.build(); - assertThat(plan.models(), contains(m)); + assertThat(plan.deployments(), contains(m)); assertThat(plan.satisfiesCurrentAssignments(), is(true)); assertThat(plan.assignments(m).get(), equalTo(Map.of(n, 3))); } @@ -511,7 +511,7 @@ public void testCompareTo_GivenDifferenceInMemory() { assertThat(planUsingMoreMemory.compareTo(planUsingLessMemory), lessThan(0)); } - public void testSatisfiesAllModels_GivenAllModelsAreSatisfied() { + public void testSatisfiesAllModels_GivenAllDeploymentsAreSatisfied() { Node node1 = new Node("n_1", ByteSizeValue.ofMb(1000).getBytes(), 4); Node node2 = new Node("n_2", ByteSizeValue.ofMb(1000).getBytes(), 4); { @@ -602,7 +602,7 @@ public void testSatisfiesAllModels_GivenAllModelsAreSatisfied() { } } - public void testSatisfiesAllModels_GivenOneModelHasOneAllocationLess() { + public void testSatisfiesAllDeployments_GivenOneModelHasOneAllocationLess() { Node node1 = new Node("n_1", ByteSizeValue.ofMb(1000).getBytes(), 4); Node node2 = new Node("n_2", ByteSizeValue.ofMb(1000).getBytes(), 4); Deployment deployment1 = new Deployment("m_1", ByteSizeValue.ofMb(50).getBytes(), 1, 2, Map.of(), 0, null, 0, 0); @@ -617,7 +617,7 @@ public void testSatisfiesAllModels_GivenOneModelHasOneAllocationLess() { assertThat(plan.satisfiesAllModels(), is(false)); } - public void testArePreviouslyAssignedModelsAssigned_GivenTrue() { + public void testArePreviouslyAssignedDeploymentsAssigned_GivenTrue() { Node node1 = new Node("n_1", ByteSizeValue.ofMb(1000).getBytes(), 4); Node node2 = new Node("n_2", ByteSizeValue.ofMb(1000).getBytes(), 4); Deployment deployment1 = new Deployment("m_1", ByteSizeValue.ofMb(50).getBytes(), 1, 2, Map.of(), 3, null, 0, 0); @@ -630,7 +630,7 @@ public void testArePreviouslyAssignedModelsAssigned_GivenTrue() { assertThat(plan.arePreviouslyAssignedModelsAssigned(), is(true)); } - public void testArePreviouslyAssignedModelsAssigned_GivenFalse() { + public void testArePreviouslyAssignedDeploymentsAssigned_GivenFalse() { Node node1 = new Node("n_1", ByteSizeValue.ofMb(1000).getBytes(), 4); Node node2 = new Node("n_2", ByteSizeValue.ofMb(1000).getBytes(), 4); Deployment deployment1 = new Deployment("m_1", ByteSizeValue.ofMb(50).getBytes(), 1, 2, Map.of(), 3, null, 0, 0); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/assignment/planning/AssignmentPlannerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/assignment/planning/AssignmentPlannerTests.java index ef76c388b81a1..24095600c42d0 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/assignment/planning/AssignmentPlannerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/assignment/planning/AssignmentPlannerTests.java @@ -261,7 +261,7 @@ public void testModelWithMoreAllocationsThanAvailableCores_GivenSingleThreadPerA } } - public void testMultipleModelsAndNodesWithSingleSolution() { + public void testMultipleDeploymentsAndNodesWithSingleSolution() { Node node1 = new Node("n_1", 2 * scaleNodeSize(50), 7); Node node2 = new Node("n_2", 2 * scaleNodeSize(50), 7); Node node3 = new Node("n_3", 2 * scaleNodeSize(50), 2); @@ -316,7 +316,7 @@ public void testMultipleModelsAndNodesWithSingleSolution() { } } - public void testMultipleModelsAndNodesWithSingleSolution_NewMemoryFields() { + public void testMultipleDeploymentsAndNodesWithSingleSolution_NewMemoryFields() { Node node1 = new Node("n_1", ByteSizeValue.ofMb(800).getBytes(), 7); Node node2 = new Node("n_2", ByteSizeValue.ofMb(800).getBytes(), 7); Node node3 = new Node("n_3", ByteSizeValue.ofMb(900).getBytes(), 2); @@ -508,7 +508,7 @@ public void testModelWithPreviousAssignmentAndNoMoreCoresAvailable() { assertThat(plan.assignments(deployment).get(), equalTo(Map.of(node, 4))); } - public void testFullCoreUtilization_GivenModelsWithSingleThreadPerAllocation() { + public void testFullCoreUtilization_GivenDeploymentsWithSingleThreadPerAllocation() { List nodes = List.of( new Node("n_1", ByteSizeValue.ofGb(18).getBytes(), 8), new Node("n_2", ByteSizeValue.ofGb(18).getBytes(), 8), @@ -544,7 +544,7 @@ public void testFullCoreUtilization_GivenModelsWithSingleThreadPerAllocation() { assertPreviousAssignmentsAreSatisfied(deployments, assignmentPlan); } - public void testFullCoreUtilization_GivenModelsWithSingleThreadPerAllocation_NewMemoryFields() { + public void testFullCoreUtilization_GivenDeploymentsWithSingleThreadPerAllocation_NewMemoryFields() { List nodes = List.of( new Node("n_1", ByteSizeValue.ofGb(18).getBytes(), 8), new Node("n_2", ByteSizeValue.ofGb(18).getBytes(), 8), @@ -641,32 +641,32 @@ public void testFullCoreUtilization_GivenModelsWithSingleThreadPerAllocation_New assertPreviousAssignmentsAreSatisfied(deployments, assignmentPlan); } - public void testTooManyNodesAndModels_DoesNotThrowOOM_GivenNodesJustUnderLimit() { - runTooManyNodesAndModels(3161, 1); + public void testTooManyNodesAndDeployments_DoesNotThrowOOM_GivenNodesJustUnderLimit() { + runTooManyNodesAndDeployments(3161, 1); } - public void testTooManyNodesAndModels_DoesNotThrowOOM_GivenNodesJustOverLimit() { - runTooManyNodesAndModels(3162, 1); + public void testTooManyNodesAndDeployments_DoesNotThrowOOM_GivenNodesJustOverLimit() { + runTooManyNodesAndDeployments(3162, 1); } - public void testTooManyNodesAndModels_DoesNotThrowOOM_GivenModelsJustUnderLimit() { - runTooManyNodesAndModels(1, 3161); + public void testTooManyNodesAndModels_DoesNotThrowOOM_GivenDeploymentsJustUnderLimit() { + runTooManyNodesAndDeployments(1, 3161); } - public void testTooManyNodesAndModels_DoesNotThrowOOM_GivenModelsJustOverLimit() { - runTooManyNodesAndModels(1, 3162); + public void testTooManyNodesAndModels_DoesNotThrowOOM_GivenDeploymentsJustOverLimit() { + runTooManyNodesAndDeployments(1, 3162); } - public void testTooManyNodesAndModels_DoesNotThrowOOM_GivenComboJustUnderLimit() { - runTooManyNodesAndModels(170, 171); + public void testTooManyNodesAndDeployments_DoesNotThrowOOM_GivenComboJustUnderLimit() { + runTooManyNodesAndDeployments(170, 171); } - public void testTooManyNodesAndModels_DoesNotThrowOOM_GivenComboJustOverLimit() { - runTooManyNodesAndModels(171, 171); + public void testTooManyNodesAndDeployments_DoesNotThrowOOM_GivenComboJustOverLimit() { + runTooManyNodesAndDeployments(171, 171); } - public void testTooManyNodesAndModels_DoesNotThrowOOM_GivenComboWayOverLimit() { - runTooManyNodesAndModels(1000, 1000); + public void testTooManyNodesAndDeployments_DoesNotThrowOOM_GivenComboWayOverLimit() { + runTooManyNodesAndDeployments(1000, 1000); } public void testRandomBenchmark() { @@ -679,7 +679,7 @@ public void testRandomBenchmark() { int scale = randomIntBetween(0, 10); double load = randomDoubleBetween(0.1, 1.0, true); List nodes = randomNodes(scale); - List deployments = randomModels(scale, load); + List deployments = randomDeployments(scale, load); nodeSizes.add(nodes.size()); modelSizes.add(deployments.size()); logger.debug("Nodes = " + nodes.size() + "; Models = " + deployments.size()); @@ -719,7 +719,7 @@ public void testPreviousAssignmentsGetAtLeastAsManyAllocationsAfterAddingNewMode int scale = randomIntBetween(0, 10); double load = randomDoubleBetween(0.1, 1.0, true); List nodes = randomNodes(scale); - List deployments = randomModels(scale, load); + List deployments = randomDeployments(scale, load); AssignmentPlan originalPlan = new AssignmentPlanner(nodes, deployments).computePlan(); List previousModelsPlusNew = new ArrayList<>(deployments.size() + 1); @@ -730,7 +730,7 @@ public void testPreviousAssignmentsGetAtLeastAsManyAllocationsAfterAddingNewMode .collect(Collectors.toMap(e -> e.getKey().id(), Map.Entry::getValue)); previousModelsPlusNew.add( new AssignmentPlan.Deployment( - m.id(), + m.deploymentId(), m.memoryBytes(), m.allocations(), m.threadsPerAllocation(), @@ -827,7 +827,7 @@ public void testModelWithoutCurrentAllocationsGetsAssignedIfAllocatedPreviously( assertThat(assignmentPlan.getRemainingNodeMemory("n_2"), greaterThanOrEqualTo(0L)); } - public void testGivenPreviouslyAssignedModels_CannotAllBeAllocated() { + public void testGivenPreviouslyAssignedDeployments_CannotAllBeAllocated() { Node node1 = new Node("n_1", scaleNodeSize(ByteSizeValue.ofGb(2).getMb()), 2); AssignmentPlan.Deployment deployment1 = new Deployment("m_1", ByteSizeValue.ofMb(1200).getBytes(), 1, 1, Map.of(), 1, null, 0, 0); AssignmentPlan.Deployment deployment2 = new Deployment("m_2", ByteSizeValue.ofMb(1100).getBytes(), 1, 1, Map.of(), 1, null, 0, 0); @@ -854,7 +854,7 @@ public void testGivenClusterResize_AllocationShouldNotExceedMemoryConstraints() // Then start m_2 assignmentPlan = new AssignmentPlanner( List.of(node1, node2), - Stream.concat(createModelsFromPlan(assignmentPlan).stream(), Stream.of(deployment2)).toList() + Stream.concat(createDeploymentsFromPlan(assignmentPlan).stream(), Stream.of(deployment2)).toList() ).computePlan(); indexedBasedPlan = convertToIdIndexed(assignmentPlan); @@ -865,7 +865,7 @@ public void testGivenClusterResize_AllocationShouldNotExceedMemoryConstraints() // Then start m_3 assignmentPlan = new AssignmentPlanner( List.of(node1, node2), - Stream.concat(createModelsFromPlan(assignmentPlan).stream(), Stream.of(deployment3)).toList() + Stream.concat(createDeploymentsFromPlan(assignmentPlan).stream(), Stream.of(deployment3)).toList() ).computePlan(); indexedBasedPlan = convertToIdIndexed(assignmentPlan); @@ -875,7 +875,7 @@ public void testGivenClusterResize_AllocationShouldNotExceedMemoryConstraints() assertThat(indexedBasedPlan.get("m_3"), equalTo(Map.of("n_2", 1))); // First, one node goes away. - assignmentPlan = new AssignmentPlanner(List.of(node1), createModelsFromPlan(assignmentPlan)).computePlan(); + assignmentPlan = new AssignmentPlanner(List.of(node1), createDeploymentsFromPlan(assignmentPlan)).computePlan(); assertThat(assignmentPlan.getRemainingNodeMemory("n_1"), greaterThanOrEqualTo(0L)); } @@ -896,7 +896,7 @@ public void testGivenClusterResize_ShouldAllocateEachModelAtLeastOnce() { // Then start m_2 assignmentPlan = new AssignmentPlanner( List.of(node1, node2), - Stream.concat(createModelsFromPlan(assignmentPlan).stream(), Stream.of(deployment2)).toList() + Stream.concat(createDeploymentsFromPlan(assignmentPlan).stream(), Stream.of(deployment2)).toList() ).computePlan(); indexedBasedPlan = convertToIdIndexed(assignmentPlan); @@ -907,7 +907,7 @@ public void testGivenClusterResize_ShouldAllocateEachModelAtLeastOnce() { // Then start m_3 assignmentPlan = new AssignmentPlanner( List.of(node1, node2), - Stream.concat(createModelsFromPlan(assignmentPlan).stream(), Stream.of(deployment3)).toList() + Stream.concat(createDeploymentsFromPlan(assignmentPlan).stream(), Stream.of(deployment3)).toList() ).computePlan(); indexedBasedPlan = convertToIdIndexed(assignmentPlan); @@ -921,20 +921,20 @@ public void testGivenClusterResize_ShouldAllocateEachModelAtLeastOnce() { Node node4 = new Node("n_4", ByteSizeValue.ofMb(2600).getBytes(), 2); // First, one node goes away. - assignmentPlan = new AssignmentPlanner(List.of(node1), createModelsFromPlan(assignmentPlan)).computePlan(); + assignmentPlan = new AssignmentPlanner(List.of(node1), createDeploymentsFromPlan(assignmentPlan)).computePlan(); assertThat(assignmentPlan.getRemainingNodeMemory(node1.id()), greaterThanOrEqualTo(0L)); // Then, a node double in memory size is added. - assignmentPlan = new AssignmentPlanner(List.of(node1, node3), createModelsFromPlan(assignmentPlan)).computePlan(); + assignmentPlan = new AssignmentPlanner(List.of(node1, node3), createDeploymentsFromPlan(assignmentPlan)).computePlan(); assertThat(assignmentPlan.getRemainingNodeMemory(node1.id()), greaterThanOrEqualTo(0L)); assertThat(assignmentPlan.getRemainingNodeMemory(node3.id()), greaterThanOrEqualTo(0L)); // And another. - assignmentPlan = new AssignmentPlanner(List.of(node1, node3, node4), createModelsFromPlan(assignmentPlan)).computePlan(); + assignmentPlan = new AssignmentPlanner(List.of(node1, node3, node4), createDeploymentsFromPlan(assignmentPlan)).computePlan(); assertThat(assignmentPlan.getRemainingNodeMemory(node1.id()), greaterThanOrEqualTo(0L)); assertThat(assignmentPlan.getRemainingNodeMemory(node3.id()), greaterThanOrEqualTo(0L)); assertThat(assignmentPlan.getRemainingNodeMemory(node4.id()), greaterThanOrEqualTo(0L)); // Finally, the remaining smaller node is removed - assignmentPlan = new AssignmentPlanner(List.of(node3, node4), createModelsFromPlan(assignmentPlan)).computePlan(); + assignmentPlan = new AssignmentPlanner(List.of(node3, node4), createDeploymentsFromPlan(assignmentPlan)).computePlan(); assertThat(assignmentPlan.getRemainingNodeMemory(node3.id()), greaterThanOrEqualTo(0L)); assertThat(assignmentPlan.getRemainingNodeMemory(node4.id()), greaterThanOrEqualTo(0L)); @@ -949,7 +949,7 @@ public void testGivenClusterResize_ShouldAllocateEachModelAtLeastOnce() { assertThat(assignmentPlan.getRemainingNodeCores("n_2"), equalTo(0)); } - public void testGivenClusterResize_ShouldRemoveAllocatedModels() { + public void testGivenClusterResize_ShouldRemoveAllocatedDeployments() { // Ensure that plan is removing previously allocated models if not enough memory is available Node node1 = new Node("n_1", ByteSizeValue.ofMb(1840).getBytes(), 2); Node node2 = new Node("n_2", ByteSizeValue.ofMb(2580).getBytes(), 2); @@ -969,14 +969,14 @@ public void testGivenClusterResize_ShouldRemoveAllocatedModels() { assertThat(assignmentPlan.getRemainingNodeMemory(node2.id()), greaterThanOrEqualTo(0L)); // Now the cluster starts getting resized. Ensure that resources are not over-allocated. - assignmentPlan = new AssignmentPlanner(List.of(node1), createModelsFromPlan(assignmentPlan)).computePlan(); + assignmentPlan = new AssignmentPlanner(List.of(node1), createDeploymentsFromPlan(assignmentPlan)).computePlan(); assertThat(indexedBasedPlan.get("m_1"), equalTo(Map.of("n_1", 2))); assertThat(assignmentPlan.getRemainingNodeMemory(node1.id()), greaterThanOrEqualTo(0L)); assertThat(assignmentPlan.getRemainingNodeCores(node1.id()), greaterThanOrEqualTo(0)); } - public void testGivenClusterResize_ShouldRemoveAllocatedModels_NewMemoryFields() { + public void testGivenClusterResize_ShouldRemoveAllocatedDeployments_NewMemoryFields() { // Ensure that plan is removing previously allocated models if not enough memory is available Node node1 = new Node("n_1", ByteSizeValue.ofMb(700).getBytes(), 2); Node node2 = new Node("n_2", ByteSizeValue.ofMb(1000).getBytes(), 2); @@ -1026,16 +1026,16 @@ public void testGivenClusterResize_ShouldRemoveAllocatedModels_NewMemoryFields() assertThat(assignmentPlan.getRemainingNodeMemory(node2.id()), greaterThanOrEqualTo(0L)); // Now the cluster starts getting resized. Ensure that resources are not over-allocated. - assignmentPlan = new AssignmentPlanner(List.of(node1), createModelsFromPlan(assignmentPlan)).computePlan(); + assignmentPlan = new AssignmentPlanner(List.of(node1), createDeploymentsFromPlan(assignmentPlan)).computePlan(); assertThat(indexedBasedPlan.get("m_1"), equalTo(Map.of("n_1", 2))); assertThat(assignmentPlan.getRemainingNodeMemory(node1.id()), greaterThanOrEqualTo(0L)); assertThat(assignmentPlan.getRemainingNodeCores(node1.id()), greaterThanOrEqualTo(0)); } - public static List createModelsFromPlan(AssignmentPlan plan) { + public static List createDeploymentsFromPlan(AssignmentPlan plan) { List deployments = new ArrayList<>(); - for (Deployment m : plan.models()) { + for (Deployment m : plan.deployments()) { Optional> assignments = plan.assignments(m); Map currentAllocations = Map.of(); if (assignments.isPresent()) { @@ -1047,7 +1047,7 @@ public static List createModelsFromPlan(AssignmentPlan plan) { int totalAllocations = currentAllocations.values().stream().mapToInt(Integer::intValue).sum(); deployments.add( new Deployment( - m.id(), + m.deploymentId(), m.memoryBytes(), m.allocations(), m.threadsPerAllocation(), @@ -1064,13 +1064,13 @@ public static List createModelsFromPlan(AssignmentPlan plan) { public static Map> convertToIdIndexed(AssignmentPlan plan) { Map> result = new HashMap<>(); - for (AssignmentPlan.Deployment m : plan.models()) { + for (AssignmentPlan.Deployment m : plan.deployments()) { Optional> assignments = plan.assignments(m); Map allocationsPerNodeId = assignments.isPresent() ? new HashMap<>() : Map.of(); for (Map.Entry nodeAssignments : assignments.orElse(Map.of()).entrySet()) { allocationsPerNodeId.put(nodeAssignments.getKey().id(), nodeAssignments.getValue()); } - result.put(m.id(), allocationsPerNodeId); + result.put(m.deploymentId(), allocationsPerNodeId); } return result; } @@ -1103,7 +1103,7 @@ public static List randomNodes(int scale, String nodeIdPrefix) { return nodes; } - public static List randomModels(int scale, double load) { + public static List randomDeployments(int scale, double load) { List deployments = new ArrayList<>(); for (int i = 0; i < Math.max(2, Math.round(load * (1 + 8 * scale))); i++) { deployments.add(randomModel(String.valueOf(i))); @@ -1158,7 +1158,7 @@ public static void assertPreviousAssignmentsAreSatisfied(List nodes = new ArrayList<>(); for (int i = 0; i < nodesSize; i++) { nodes.add(new Node("n_" + i, ByteSizeValue.ofGb(6).getBytes(), 100)); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/assignment/planning/PreserveAllAllocationsTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/assignment/planning/PreserveAllAllocationsTests.java index 9885c4d583198..7499470cc8d6f 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/assignment/planning/PreserveAllAllocationsTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/assignment/planning/PreserveAllAllocationsTests.java @@ -83,13 +83,13 @@ public void testGivenPreviousAssignments() { List modelsPreservingAllocations = preserveAllAllocations.modelsPreservingAllocations(); assertThat(modelsPreservingAllocations, hasSize(2)); - assertThat(modelsPreservingAllocations.get(0).id(), equalTo("m_1")); + assertThat(modelsPreservingAllocations.get(0).deploymentId(), equalTo("m_1")); assertThat(modelsPreservingAllocations.get(0).memoryBytes(), equalTo(ByteSizeValue.ofMb(30).getBytes())); assertThat(modelsPreservingAllocations.get(0).allocations(), equalTo(1)); assertThat(modelsPreservingAllocations.get(0).threadsPerAllocation(), equalTo(1)); assertThat(modelsPreservingAllocations.get(0).currentAllocationsByNodeId(), equalTo(Map.of("n_1", 0))); - assertThat(modelsPreservingAllocations.get(1).id(), equalTo("m_2")); + assertThat(modelsPreservingAllocations.get(1).deploymentId(), equalTo("m_2")); assertThat(modelsPreservingAllocations.get(1).memoryBytes(), equalTo(ByteSizeValue.ofMb(50).getBytes())); assertThat(modelsPreservingAllocations.get(1).allocations(), equalTo(3)); assertThat(modelsPreservingAllocations.get(1).threadsPerAllocation(), equalTo(4)); @@ -166,7 +166,7 @@ public void testGivenPreviousAssignments() { List modelsPreservingAllocations = preserveAllAllocations.modelsPreservingAllocations(); assertThat(modelsPreservingAllocations, hasSize(2)); - assertThat(modelsPreservingAllocations.get(0).id(), equalTo("m_1")); + assertThat(modelsPreservingAllocations.get(0).deploymentId(), equalTo("m_1")); assertThat(modelsPreservingAllocations.get(0).memoryBytes(), equalTo(ByteSizeValue.ofMb(30).getBytes())); assertThat(modelsPreservingAllocations.get(0).perDeploymentMemoryBytes(), equalTo(ByteSizeValue.ofMb(300).getBytes())); assertThat(modelsPreservingAllocations.get(0).perAllocationMemoryBytes(), equalTo(ByteSizeValue.ofMb(10).getBytes())); @@ -174,7 +174,7 @@ public void testGivenPreviousAssignments() { assertThat(modelsPreservingAllocations.get(0).threadsPerAllocation(), equalTo(1)); assertThat(modelsPreservingAllocations.get(0).currentAllocationsByNodeId(), equalTo(Map.of("n_1", 0))); - assertThat(modelsPreservingAllocations.get(1).id(), equalTo("m_2")); + assertThat(modelsPreservingAllocations.get(1).deploymentId(), equalTo("m_2")); assertThat(modelsPreservingAllocations.get(1).memoryBytes(), equalTo(ByteSizeValue.ofMb(50).getBytes())); assertThat(modelsPreservingAllocations.get(1).perDeploymentMemoryBytes(), equalTo(ByteSizeValue.ofMb(300).getBytes())); assertThat(modelsPreservingAllocations.get(1).perAllocationMemoryBytes(), equalTo(ByteSizeValue.ofMb(10).getBytes())); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/assignment/planning/PreserveOneAllocationTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/assignment/planning/PreserveOneAllocationTests.java index 50ba8763c690d..bc95fb1e0339e 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/assignment/planning/PreserveOneAllocationTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/assignment/planning/PreserveOneAllocationTests.java @@ -77,7 +77,7 @@ public void testGivenPreviousAssignments() { List modelsPreservingAllocations = preserveOneAllocation.modelsPreservingAllocations(); assertThat(modelsPreservingAllocations, hasSize(2)); - assertThat(modelsPreservingAllocations.get(0).id(), equalTo("m_1")); + assertThat(modelsPreservingAllocations.get(0).deploymentId(), equalTo("m_1")); assertThat(modelsPreservingAllocations.get(0).memoryBytes(), equalTo(ByteSizeValue.ofMb(30).getBytes())); assertThat(modelsPreservingAllocations.get(0).perDeploymentMemoryBytes(), equalTo(ByteSizeValue.ofMb(0).getBytes())); assertThat(modelsPreservingAllocations.get(0).perAllocationMemoryBytes(), equalTo(ByteSizeValue.ofMb(0).getBytes())); @@ -85,7 +85,7 @@ public void testGivenPreviousAssignments() { assertThat(modelsPreservingAllocations.get(0).threadsPerAllocation(), equalTo(1)); assertThat(modelsPreservingAllocations.get(0).currentAllocationsByNodeId(), equalTo(Map.of("n_1", 0))); - assertThat(modelsPreservingAllocations.get(1).id(), equalTo("m_2")); + assertThat(modelsPreservingAllocations.get(1).deploymentId(), equalTo("m_2")); assertThat(modelsPreservingAllocations.get(1).memoryBytes(), equalTo(ByteSizeValue.ofMb(50).getBytes())); assertThat(modelsPreservingAllocations.get(1).perDeploymentMemoryBytes(), equalTo(ByteSizeValue.ofMb(0).getBytes())); assertThat(modelsPreservingAllocations.get(1).perAllocationMemoryBytes(), equalTo(ByteSizeValue.ofMb(0).getBytes())); @@ -165,7 +165,7 @@ public void testGivenPreviousAssignments() { List modelsPreservingAllocations = preserveOneAllocation.modelsPreservingAllocations(); assertThat(modelsPreservingAllocations, hasSize(2)); - assertThat(modelsPreservingAllocations.get(0).id(), equalTo("m_1")); + assertThat(modelsPreservingAllocations.get(0).deploymentId(), equalTo("m_1")); assertThat(modelsPreservingAllocations.get(0).memoryBytes(), equalTo(ByteSizeValue.ofMb(30).getBytes())); assertThat(modelsPreservingAllocations.get(0).perDeploymentMemoryBytes(), equalTo(ByteSizeValue.ofMb(300).getBytes())); assertThat(modelsPreservingAllocations.get(0).perAllocationMemoryBytes(), equalTo(ByteSizeValue.ofMb(10).getBytes())); @@ -173,7 +173,7 @@ public void testGivenPreviousAssignments() { assertThat(modelsPreservingAllocations.get(0).threadsPerAllocation(), equalTo(1)); assertThat(modelsPreservingAllocations.get(0).currentAllocationsByNodeId(), equalTo(Map.of("n_1", 0))); - assertThat(modelsPreservingAllocations.get(1).id(), equalTo("m_2")); + assertThat(modelsPreservingAllocations.get(1).deploymentId(), equalTo("m_2")); assertThat(modelsPreservingAllocations.get(1).memoryBytes(), equalTo(ByteSizeValue.ofMb(50).getBytes())); assertThat(modelsPreservingAllocations.get(1).perDeploymentMemoryBytes(), equalTo(ByteSizeValue.ofMb(300).getBytes())); assertThat(modelsPreservingAllocations.get(1).perAllocationMemoryBytes(), equalTo(ByteSizeValue.ofMb(10).getBytes())); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/assignment/planning/ZoneAwareAssignmentPlannerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/assignment/planning/ZoneAwareAssignmentPlannerTests.java index 4993600d0d3b3..7005ad959577b 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/assignment/planning/ZoneAwareAssignmentPlannerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/assignment/planning/ZoneAwareAssignmentPlannerTests.java @@ -22,9 +22,9 @@ import static org.elasticsearch.xpack.ml.inference.assignment.planning.AssignmentPlannerTests.assertModelFullyAssignedToNode; import static org.elasticsearch.xpack.ml.inference.assignment.planning.AssignmentPlannerTests.assertPreviousAssignmentsAreSatisfied; import static org.elasticsearch.xpack.ml.inference.assignment.planning.AssignmentPlannerTests.convertToIdIndexed; -import static org.elasticsearch.xpack.ml.inference.assignment.planning.AssignmentPlannerTests.createModelsFromPlan; +import static org.elasticsearch.xpack.ml.inference.assignment.planning.AssignmentPlannerTests.createDeploymentsFromPlan; +import static org.elasticsearch.xpack.ml.inference.assignment.planning.AssignmentPlannerTests.randomDeployments; import static org.elasticsearch.xpack.ml.inference.assignment.planning.AssignmentPlannerTests.randomModel; -import static org.elasticsearch.xpack.ml.inference.assignment.planning.AssignmentPlannerTests.randomModels; import static org.elasticsearch.xpack.ml.inference.assignment.planning.AssignmentPlannerTests.randomNodes; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -138,6 +138,33 @@ public void testGivenOneModel_OneNodePerZone_TwoZones_FullyFits() { assertThat(indexedBasedPlan.get("m_1"), equalTo(Map.of("n_1", 1, "n_2", 1))); } + public void testGivenOneModel_OneLargeNodePerZone_TwoZones_FullyFits() { + Node node1 = new Node("n_1", ByteSizeValue.ofGb(16).getBytes(), 8); + Node node2 = new Node("n_2", ByteSizeValue.ofGb(16).getBytes(), 8); + AssignmentPlan.Deployment deployment = new AssignmentPlan.Deployment( + "m_1", + ByteSizeValue.ofMb(100).getBytes(), + 4, + 2, + Map.of(), + 0, + null, + 0, + 0 + ); + + AssignmentPlan plan = new ZoneAwareAssignmentPlanner( + Map.of(List.of("z_1"), List.of(node1), List.of("z_2"), List.of(node2)), + List.of(deployment) + ).computePlan(); + + assertThat(plan.satisfiesAllModels(), is(true)); + + Map> indexedBasedPlan = convertToIdIndexed(plan); + assertThat(indexedBasedPlan.keySet(), hasItems("m_1")); + assertThat(indexedBasedPlan.get("m_1"), equalTo(Map.of("n_1", 2, "n_2", 2))); + } + public void testGivenOneModel_OneNodePerZone_TwoZones_PartiallyFits() { Node node1 = new Node("n_1", ByteSizeValue.ofMb(440).getBytes(), 4); Node node2 = new Node("n_2", ByteSizeValue.ofMb(440).getBytes(), 4); @@ -166,7 +193,7 @@ public void testGivenOneModel_OneNodePerZone_TwoZones_PartiallyFits() { assertThat(plan.getRemainingNodeMemory("n_2"), equalTo(0L)); } - public void testGivenThreeModels_TwoNodesPerZone_ThreeZones_FullyFit() { + public void testGivenThreeDeployments_TwoNodesPerZone_ThreeZones_FullyFit() { Node node1 = new Node("n_1", ByteSizeValue.ofMb(1000).getBytes(), 4); Node node2 = new Node("n_2", ByteSizeValue.ofMb(1000).getBytes(), 4); Node node3 = new Node("n_3", ByteSizeValue.ofMb(1000).getBytes(), 4); @@ -217,7 +244,7 @@ public void testGivenThreeModels_TwoNodesPerZone_ThreeZones_FullyFit() { } } - public void testGivenTwoModelsWithSingleAllocation_OneNode_ThreeZones() { + public void testGivenTwoDeploymentsWithSingleAllocation_OneNode_ThreeZones() { Node node1 = new Node("n_1", ByteSizeValue.ofMb(1000).getBytes(), 4); Node node2 = new Node("n_2", ByteSizeValue.ofMb(1000).getBytes(), 4); Node node3 = new Node("n_3", ByteSizeValue.ofMb(1000).getBytes(), 4); @@ -243,7 +270,7 @@ public void testPreviousAssignmentsGetAtLeastAsManyAllocationsAfterAddingNewMode List.of("z_3"), randomNodes(scale, "z_3_") ); - List deployments = randomModels(scale, load); + List deployments = randomDeployments(scale, load); AssignmentPlan originalPlan = new ZoneAwareAssignmentPlanner(nodesByZone, deployments).computePlan(); List previousModelsPlusNew = new ArrayList<>(deployments.size() + 1); @@ -254,7 +281,7 @@ public void testPreviousAssignmentsGetAtLeastAsManyAllocationsAfterAddingNewMode .collect(Collectors.toMap(e -> e.getKey().id(), Map.Entry::getValue)); previousModelsPlusNew.add( new AssignmentPlan.Deployment( - m.id(), + m.deploymentId(), m.memoryBytes(), m.allocations(), m.threadsPerAllocation(), @@ -291,7 +318,7 @@ public void testGivenClusterResize_GivenOneZone_ShouldAllocateEachModelAtLeastOn // Then start m_2 assignmentPlan = new ZoneAwareAssignmentPlanner( Map.of(List.of(), List.of(node1, node2)), - Stream.concat(createModelsFromPlan(assignmentPlan).stream(), Stream.of(deployment2)).toList() + Stream.concat(createDeploymentsFromPlan(assignmentPlan).stream(), Stream.of(deployment2)).toList() ).computePlan(); indexedBasedPlan = convertToIdIndexed(assignmentPlan); @@ -302,7 +329,7 @@ public void testGivenClusterResize_GivenOneZone_ShouldAllocateEachModelAtLeastOn // Then start m_3 assignmentPlan = new ZoneAwareAssignmentPlanner( Map.of(List.of(), List.of(node1, node2)), - Stream.concat(createModelsFromPlan(assignmentPlan).stream(), Stream.of(deployment3)).toList() + Stream.concat(createDeploymentsFromPlan(assignmentPlan).stream(), Stream.of(deployment3)).toList() ).computePlan(); indexedBasedPlan = convertToIdIndexed(assignmentPlan); @@ -316,19 +343,19 @@ public void testGivenClusterResize_GivenOneZone_ShouldAllocateEachModelAtLeastOn Node node4 = new Node("n_4", ByteSizeValue.ofMb(5160).getBytes(), 2); // First, one node goes away. - assignmentPlan = new ZoneAwareAssignmentPlanner(Map.of(List.of(), List.of(node1)), createModelsFromPlan(assignmentPlan)) + assignmentPlan = new ZoneAwareAssignmentPlanner(Map.of(List.of(), List.of(node1)), createDeploymentsFromPlan(assignmentPlan)) .computePlan(); // Then, a node double in memory size is added. - assignmentPlan = new ZoneAwareAssignmentPlanner(Map.of(List.of(), List.of(node1, node3)), createModelsFromPlan(assignmentPlan)) + assignmentPlan = new ZoneAwareAssignmentPlanner(Map.of(List.of(), List.of(node1, node3)), createDeploymentsFromPlan(assignmentPlan)) .computePlan(); // And another. assignmentPlan = new ZoneAwareAssignmentPlanner( Map.of(List.of(), List.of(node1, node3, node4)), - createModelsFromPlan(assignmentPlan) + createDeploymentsFromPlan(assignmentPlan) ).computePlan(); // Finally, the remaining smaller node is removed - assignmentPlan = new ZoneAwareAssignmentPlanner(Map.of(List.of(), List.of(node3, node4)), createModelsFromPlan(assignmentPlan)) + assignmentPlan = new ZoneAwareAssignmentPlanner(Map.of(List.of(), List.of(node3, node4)), createDeploymentsFromPlan(assignmentPlan)) .computePlan(); indexedBasedPlan = convertToIdIndexed(assignmentPlan);