diff --git a/server/src/main/java/org/opensearch/wlm/ResourceType.java b/server/src/main/java/org/opensearch/wlm/ResourceType.java index b8e9ba8d02429..a560268a66853 100644 --- a/server/src/main/java/org/opensearch/wlm/ResourceType.java +++ b/server/src/main/java/org/opensearch/wlm/ResourceType.java @@ -8,16 +8,15 @@ package org.opensearch.wlm; -import org.opensearch.cluster.metadata.QueryGroup; import org.opensearch.common.annotation.PublicApi; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.wlm.tracker.CpuUsageCalculator; import org.opensearch.wlm.tracker.MemoryUsageCalculator; -import org.opensearch.wlm.tracker.QueryGroupUsageHelper; import org.opensearch.wlm.tracker.ResourceUsageCalculator; import java.io.IOException; import java.util.List; +import java.util.function.Function; /** * Enum to hold the resource type @@ -26,35 +25,25 @@ */ @PublicApi(since = "2.17.0") public enum ResourceType { - CPU("cpu", true, CpuUsageCalculator.INSTANCE, new QueryGroupUsageHelper() { - @Override - protected double getNormalisedThreshold(QueryGroup queryGroup) { - return queryGroup.getResourceLimits().get(ResourceType.CPU) * getSettings().getNodeLevelCpuCancellationThreshold(); - } - }), - MEMORY("memory", true, MemoryUsageCalculator.INSTANCE, new QueryGroupUsageHelper() { - @Override - protected double getNormalisedThreshold(QueryGroup queryGroup) { - return queryGroup.getResourceLimits().get(ResourceType.MEMORY) * getSettings().getNodeLevelMemoryCancellationThreshold(); - } - }); + CPU("cpu", true, CpuUsageCalculator.INSTANCE, WorkloadManagementSettings::getNodeLevelCpuCancellationThreshold), + MEMORY("memory", true, MemoryUsageCalculator.INSTANCE, WorkloadManagementSettings::getNodeLevelMemoryCancellationThreshold); private final String name; private final boolean statsEnabled; private final ResourceUsageCalculator resourceUsageCalculator; - private final QueryGroupUsageHelper queryGroupUsageHelper; + private final Function nodeLevelThresholdSupplier; private static List sortedValues = List.of(CPU, MEMORY); ResourceType( String name, boolean statsEnabled, ResourceUsageCalculator resourceUsageCalculator, - QueryGroupUsageHelper queryGroupUsageHelper + Function nodeLevelThresholdSupplier ) { this.name = name; this.statsEnabled = statsEnabled; this.resourceUsageCalculator = resourceUsageCalculator; - this.queryGroupUsageHelper = queryGroupUsageHelper; + this.nodeLevelThresholdSupplier = nodeLevelThresholdSupplier; } /** @@ -87,8 +76,8 @@ public ResourceUsageCalculator getResourceUsageCalculator() { return resourceUsageCalculator; } - public QueryGroupUsageHelper getQueryGroupUsage() { - return queryGroupUsageHelper; + public double getNodeLevelThreshold(WorkloadManagementSettings settings) { + return nodeLevelThresholdSupplier.apply(settings); } public static List getSortedValues() { diff --git a/server/src/main/java/org/opensearch/wlm/cancellation/HighestResourceConsumingTaskFirstSelectionStrategy.java b/server/src/main/java/org/opensearch/wlm/cancellation/HighestResourceConsumingTaskFirstSelectionStrategy.java index c32272f1b8492..3bceae9c8ffb1 100644 --- a/server/src/main/java/org/opensearch/wlm/cancellation/HighestResourceConsumingTaskFirstSelectionStrategy.java +++ b/server/src/main/java/org/opensearch/wlm/cancellation/HighestResourceConsumingTaskFirstSelectionStrategy.java @@ -15,7 +15,6 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; -import java.util.function.Supplier; import java.util.stream.Collectors; import static org.opensearch.wlm.cancellation.TaskCanceller.MIN_VALUE; @@ -25,15 +24,7 @@ */ public class HighestResourceConsumingTaskFirstSelectionStrategy implements TaskSelectionStrategy { - private final Supplier nanoTimeSupplier; - - public HighestResourceConsumingTaskFirstSelectionStrategy() { - this(System::nanoTime); - } - - public HighestResourceConsumingTaskFirstSelectionStrategy(Supplier nanoTimeSupplier) { - this.nanoTimeSupplier = nanoTimeSupplier; - } + public HighestResourceConsumingTaskFirstSelectionStrategy() {} /** * Returns a comparator that defines the sorting condition for tasks. @@ -43,9 +34,7 @@ public HighestResourceConsumingTaskFirstSelectionStrategy(Supplier nanoTim * @return The comparator */ private Comparator sortingCondition(ResourceType resourceType) { - return Comparator.comparingDouble( - task -> resourceType.getResourceUsageCalculator().calculateTaskResourceUsage(task, nanoTimeSupplier) - ); + return Comparator.comparingDouble(task -> resourceType.getResourceUsageCalculator().calculateTaskResourceUsage(task)); } /** @@ -72,7 +61,7 @@ public List selectTasksForCancellation(List task double accumulated = 0; for (QueryGroupTask task : sortedTasks) { selectedTasks.add(task); - accumulated += resourceType.getResourceUsageCalculator().calculateTaskResourceUsage(task, nanoTimeSupplier); + accumulated += resourceType.getResourceUsageCalculator().calculateTaskResourceUsage(task); if ((accumulated - limit) > MIN_VALUE) { break; } diff --git a/server/src/main/java/org/opensearch/wlm/cancellation/TaskCanceller.java b/server/src/main/java/org/opensearch/wlm/cancellation/TaskCanceller.java index 60656fcdb0828..f86e4291eb181 100644 --- a/server/src/main/java/org/opensearch/wlm/cancellation/TaskCanceller.java +++ b/server/src/main/java/org/opensearch/wlm/cancellation/TaskCanceller.java @@ -16,7 +16,6 @@ import org.opensearch.wlm.QueryGroupTask; import org.opensearch.wlm.ResourceType; import org.opensearch.wlm.WorkloadManagementSettings; -import org.opensearch.wlm.tracker.QueryGroupUsageHelper; import java.util.ArrayList; import java.util.Collection; @@ -70,7 +69,6 @@ public TaskCanceller( this.activeQueryGroups = activeQueryGroups; this.deletedQueryGroups = deletedQueryGroups; this.isNodeInDuress = isNodeInDuress; - TRACKED_RESOURCES.forEach(resourceType -> resourceType.getQueryGroupUsage().setSettings(workloadManagementSettings)); } /** @@ -132,13 +130,9 @@ private List getQueryGroupsToCancelFrom(ResiliencyMode resiliencyMod if (queryGroup.getResiliencyMode() != resiliencyMode) { continue; } - Map queryGroupResourcesUsage = queryGroupLevelResourceUsageViews.get(queryGroup.get_id()) - .getResourceUsageData(); - for (ResourceType resourceType : TRACKED_RESOURCES) { if (queryGroup.getResourceLimits().containsKey(resourceType)) { - final double currentUsage = queryGroupResourcesUsage.get(resourceType); - if (resourceType.getQueryGroupUsage().isBreachingThresholdFor(queryGroup, currentUsage)) { + if (shouldCancelTasks(queryGroup, resourceType)) { queryGroupsToCancelFrom.add(queryGroup); break; } @@ -231,8 +225,17 @@ private double getExcessUsage(QueryGroup queryGroup, ResourceType resourceType) final QueryGroupLevelResourceUsageView queryGroupResourceUsageView = queryGroupLevelResourceUsageViews.get(queryGroup.get_id()); final double currentUsage = queryGroupResourceUsageView.getResourceUsageData().get(resourceType); - QueryGroupUsageHelper queryGroupUsageHelper = resourceType.getQueryGroupUsage(); - return queryGroupUsageHelper.getExcessUsage(queryGroup, currentUsage); + return currentUsage - getNormalisedThreshold(queryGroup, resourceType); + } + + /** + * normalises configured value with respect to node level cancellation thresholds + * @param queryGroup instance + * @return normalised value with respect to node level cancellation thresholds + */ + private double getNormalisedThreshold(QueryGroup queryGroup, ResourceType resourceType) { + double nodeLevelCancellationThreshold = resourceType.getNodeLevelThreshold(workloadManagementSettings); + return queryGroup.getResourceLimits().get(resourceType) * nodeLevelCancellationThreshold; } private void callbackOnCancel() { diff --git a/server/src/main/java/org/opensearch/wlm/tracker/CpuUsageCalculator.java b/server/src/main/java/org/opensearch/wlm/tracker/CpuUsageCalculator.java index 3cbab3db010c6..533fbeecbe945 100644 --- a/server/src/main/java/org/opensearch/wlm/tracker/CpuUsageCalculator.java +++ b/server/src/main/java/org/opensearch/wlm/tracker/CpuUsageCalculator.java @@ -12,28 +12,33 @@ import org.opensearch.wlm.QueryGroupTask; import java.util.List; -import java.util.function.Supplier; +import java.util.function.LongSupplier; /** * class to help make cpu usage calculations for the query group */ -public class CpuUsageCalculator implements ResourceUsageCalculator { +public class CpuUsageCalculator extends ResourceUsageCalculator { // This value should be initialised at the start time of the process and be used throughout the codebase public static final int PROCESSOR_COUNT = Runtime.getRuntime().availableProcessors(); public static final CpuUsageCalculator INSTANCE = new CpuUsageCalculator(); + private LongSupplier nanoTimeSupplier; private CpuUsageCalculator() {} + public void setNanoTimeSupplier(LongSupplier nanoTimeSupplier) { + this.nanoTimeSupplier = nanoTimeSupplier; + } + @Override - public double calculateResourceUsage(List tasks, Supplier timeSupplier) { - double usage = tasks.stream().mapToDouble(task -> calculateTaskResourceUsage(task, timeSupplier)).sum(); + public double calculateResourceUsage(List tasks) { + double usage = tasks.stream().mapToDouble(this::calculateTaskResourceUsage).sum(); usage /= PROCESSOR_COUNT; return usage; } @Override - public double calculateTaskResourceUsage(QueryGroupTask task, Supplier nanoTimeSupplier) { - return (1.0f * task.getTotalResourceUtilization(ResourceStats.CPU)) / (nanoTimeSupplier.get() - task.getStartTimeNanos()); + public double calculateTaskResourceUsage(QueryGroupTask task) { + return (1.0f * task.getTotalResourceUtilization(ResourceStats.CPU)) / (nanoTimeSupplier.getAsLong() - task.getStartTimeNanos()); } } diff --git a/server/src/main/java/org/opensearch/wlm/tracker/MemoryUsageCalculator.java b/server/src/main/java/org/opensearch/wlm/tracker/MemoryUsageCalculator.java index dd72ac9007705..fb66ff47f58d0 100644 --- a/server/src/main/java/org/opensearch/wlm/tracker/MemoryUsageCalculator.java +++ b/server/src/main/java/org/opensearch/wlm/tracker/MemoryUsageCalculator.java @@ -13,24 +13,23 @@ import org.opensearch.wlm.QueryGroupTask; import java.util.List; -import java.util.function.Supplier; /** * class to help make memory usage calculations for the query group */ -public class MemoryUsageCalculator implements ResourceUsageCalculator { +public class MemoryUsageCalculator extends ResourceUsageCalculator { public static final long HEAP_SIZE_BYTES = JvmStats.jvmStats().getMem().getHeapMax().getBytes(); public static final MemoryUsageCalculator INSTANCE = new MemoryUsageCalculator(); private MemoryUsageCalculator() {} @Override - public double calculateResourceUsage(List tasks, Supplier timeSupplier) { - return tasks.stream().mapToDouble(task -> calculateTaskResourceUsage(task, timeSupplier)).sum(); + public double calculateResourceUsage(List tasks) { + return tasks.stream().mapToDouble(this::calculateTaskResourceUsage).sum(); } @Override - public double calculateTaskResourceUsage(QueryGroupTask task, Supplier timeSupplier) { + public double calculateTaskResourceUsage(QueryGroupTask task) { return (1.0f * task.getTotalResourceUtilization(ResourceStats.MEMORY)) / HEAP_SIZE_BYTES; } } diff --git a/server/src/main/java/org/opensearch/wlm/tracker/QueryGroupResourceUsageTrackerService.java b/server/src/main/java/org/opensearch/wlm/tracker/QueryGroupResourceUsageTrackerService.java index c7d9ff00929f9..f616f29a4d031 100644 --- a/server/src/main/java/org/opensearch/wlm/tracker/QueryGroupResourceUsageTrackerService.java +++ b/server/src/main/java/org/opensearch/wlm/tracker/QueryGroupResourceUsageTrackerService.java @@ -17,7 +17,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.function.Supplier; +import java.util.function.LongSupplier; import java.util.stream.Collectors; /** @@ -26,16 +26,15 @@ public class QueryGroupResourceUsageTrackerService { public static final EnumSet TRACKED_RESOURCES = EnumSet.allOf(ResourceType.class); private final TaskResourceTrackingService taskResourceTrackingService; - private final Supplier nanoTimeSupplier; /** * QueryGroupResourceTrackerService constructor * * @param taskResourceTrackingService Service that helps track resource usage of tasks running on a node. */ - public QueryGroupResourceUsageTrackerService(TaskResourceTrackingService taskResourceTrackingService, Supplier nanoTimeSupplier) { + public QueryGroupResourceUsageTrackerService(TaskResourceTrackingService taskResourceTrackingService, LongSupplier nanoTimeSupplier) { this.taskResourceTrackingService = taskResourceTrackingService; - this.nanoTimeSupplier = nanoTimeSupplier; + ResourceType.CPU.getResourceUsageCalculator().setNanoTimeSupplier(nanoTimeSupplier); } /** @@ -52,8 +51,7 @@ public Map constructQueryGroupLevelUsa // Compute the QueryGroup resource usage final Map resourceUsage = new HashMap<>(); for (ResourceType resourceType : TRACKED_RESOURCES) { - double usage = resourceType.getResourceUsageCalculator() - .calculateResourceUsage(queryGroupEntry.getValue(), nanoTimeSupplier); + double usage = resourceType.getResourceUsageCalculator().calculateResourceUsage(queryGroupEntry.getValue()); resourceUsage.put(resourceType, usage); } diff --git a/server/src/main/java/org/opensearch/wlm/tracker/QueryGroupUsageHelper.java b/server/src/main/java/org/opensearch/wlm/tracker/QueryGroupUsageHelper.java deleted file mode 100644 index 0053c21e3b78b..0000000000000 --- a/server/src/main/java/org/opensearch/wlm/tracker/QueryGroupUsageHelper.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.wlm.tracker; - -import org.opensearch.cluster.metadata.QueryGroup; -import org.opensearch.common.annotation.PublicApi; -import org.opensearch.wlm.WorkloadManagementSettings; - -/** - * Utility class to provide utility methods at query group level - */ -@PublicApi(since = "2.18.0") -public abstract class QueryGroupUsageHelper { - private WorkloadManagementSettings settings; - - public WorkloadManagementSettings getSettings() { - return settings; - } - - /** - * WorkloadManagementSettings setter - * @param settings - */ - public void setSettings(WorkloadManagementSettings settings) { - this.settings = settings; - } - - /** - * Determines whether {@link QueryGroup} is breaching its threshold for the resource - * @param queryGroup - * @param currentUsage - * @return whether the query group is breaching threshold for this resource - */ - public boolean isBreachingThresholdFor(QueryGroup queryGroup, double currentUsage) { - return getExcessUsage(queryGroup, currentUsage) > 0; - } - - /** - * returns the value by which the resource usage is beyond the configured limit for the query group - * @param queryGroup instance - * @return the overshooting limit for the resource - */ - public double getExcessUsage(QueryGroup queryGroup, double currentUsage) { - return currentUsage - getNormalisedThreshold(queryGroup); - } - - /** - * normalises configured value with respect to node level cancellation thresholds - * @param queryGroup instance - * @return normalised value with respect to node level cancellation thresholds - */ - protected abstract double getNormalisedThreshold(QueryGroup queryGroup); -} diff --git a/server/src/main/java/org/opensearch/wlm/tracker/ResourceUsageCalculator.java b/server/src/main/java/org/opensearch/wlm/tracker/ResourceUsageCalculator.java index 2c30687b3d259..e68693b9a6433 100644 --- a/server/src/main/java/org/opensearch/wlm/tracker/ResourceUsageCalculator.java +++ b/server/src/main/java/org/opensearch/wlm/tracker/ResourceUsageCalculator.java @@ -12,26 +12,32 @@ import org.opensearch.wlm.QueryGroupTask; import java.util.List; -import java.util.function.Supplier; +import java.util.function.LongSupplier; /** * This class is used to track query group level resource usage */ @PublicApi(since = "2.18.0") -public interface ResourceUsageCalculator { +public abstract class ResourceUsageCalculator { /** * calculates the current resource usage for the query group * * @param tasks list of tasks in the query group - * @param timeSupplier nano time supplier */ - double calculateResourceUsage(List tasks, Supplier timeSupplier); + public abstract double calculateResourceUsage(List tasks); /** * calculates the task level resource usage * @param task QueryGroupTask - * @param timeSupplier in nano seconds unit * @return task level resource usage */ - double calculateTaskResourceUsage(QueryGroupTask task, Supplier timeSupplier); + public abstract double calculateTaskResourceUsage(QueryGroupTask task); + + /** + * Since only few implementations might need this + * @param nanoTimeSupplier + */ + public void setNanoTimeSupplier(LongSupplier nanoTimeSupplier) { + + } } diff --git a/server/src/test/java/org/opensearch/wlm/cancellation/HighestResourceConsumingTaskFirstSelectionStrategyTests.java b/server/src/test/java/org/opensearch/wlm/cancellation/HighestResourceConsumingTaskFirstSelectionStrategyTests.java index 1582a45897db3..96fe0f0462c77 100644 --- a/server/src/test/java/org/opensearch/wlm/cancellation/HighestResourceConsumingTaskFirstSelectionStrategyTests.java +++ b/server/src/test/java/org/opensearch/wlm/cancellation/HighestResourceConsumingTaskFirstSelectionStrategyTests.java @@ -17,7 +17,6 @@ import org.opensearch.test.OpenSearchTestCase; import org.opensearch.wlm.QueryGroupTask; import org.opensearch.wlm.ResourceType; -import org.opensearch.wlm.tracker.ResourceUsageCalculatorTrackerServiceTests.TestClock; import java.util.ArrayList; import java.util.Collections; @@ -28,12 +27,10 @@ import static org.opensearch.wlm.tracker.MemoryUsageCalculator.HEAP_SIZE_BYTES; public class HighestResourceConsumingTaskFirstSelectionStrategyTests extends OpenSearchTestCase { - private TestClock clock; public void testSelectTasksToCancelSelectsTasksMeetingThreshold_ifReduceByIsGreaterThanZero() { - clock = new TestClock(); HighestResourceConsumingTaskFirstSelectionStrategy testHighestResourceConsumingTaskFirstSelectionStrategy = - new HighestResourceConsumingTaskFirstSelectionStrategy(clock::getTime); + new HighestResourceConsumingTaskFirstSelectionStrategy(); double reduceBy = 50000.0 / HEAP_SIZE_BYTES; ResourceType resourceType = ResourceType.MEMORY; List tasks = getListOfTasks(100); @@ -46,8 +43,8 @@ public void testSelectTasksToCancelSelectsTasksMeetingThreshold_ifReduceByIsGrea boolean sortedInDescendingResourceUsage = IntStream.range(0, selectedTasks.size() - 1) .noneMatch( index -> ResourceType.MEMORY.getResourceUsageCalculator() - .calculateTaskResourceUsage(selectedTasks.get(index), null) < ResourceType.MEMORY.getResourceUsageCalculator() - .calculateTaskResourceUsage(selectedTasks.get(index + 1), null) + .calculateTaskResourceUsage(selectedTasks.get(index)) < ResourceType.MEMORY.getResourceUsageCalculator() + .calculateTaskResourceUsage(selectedTasks.get(index + 1)) ); assertTrue(sortedInDescendingResourceUsage); assertTrue(tasksUsageMeetsThreshold(selectedTasks, reduceBy)); @@ -84,7 +81,7 @@ public void testSelectTasksToCancelSelectsTasksMeetingThreshold_ifReduceByIsEqua private boolean tasksUsageMeetsThreshold(List selectedTasks, double threshold) { double memory = 0; for (QueryGroupTask task : selectedTasks) { - memory += ResourceType.MEMORY.getResourceUsageCalculator().calculateTaskResourceUsage(task, clock::getTime); + memory += ResourceType.MEMORY.getResourceUsageCalculator().calculateTaskResourceUsage(task); if ((memory - threshold) > MIN_VALUE) { return true; } diff --git a/server/src/test/java/org/opensearch/wlm/tracker/ResourceUsageCalculatorTests.java b/server/src/test/java/org/opensearch/wlm/tracker/ResourceUsageCalculatorTests.java index ebc9f366d369a..ad6ce267b2452 100644 --- a/server/src/test/java/org/opensearch/wlm/tracker/ResourceUsageCalculatorTests.java +++ b/server/src/test/java/org/opensearch/wlm/tracker/ResourceUsageCalculatorTests.java @@ -39,25 +39,23 @@ public void testQueryGroupCpuUsage() { double expectedQueryGroupCpuUsage = 1.0 / PROCESSOR_COUNT; QueryGroupTask mockTask = createMockTaskWithResourceStats(QueryGroupTask.class, fastForwardTime, 200, 0, 123); - double actualUsage = ResourceType.CPU.getResourceUsageCalculator().calculateResourceUsage(List.of(mockTask), clock::getTime); + ResourceType.CPU.getResourceUsageCalculator().setNanoTimeSupplier(clock::getTime); + double actualUsage = ResourceType.CPU.getResourceUsageCalculator().calculateResourceUsage(List.of(mockTask)); assertEquals(expectedQueryGroupCpuUsage, actualUsage, MIN_VALUE); - double taskResourceUsage = ResourceType.CPU.getResourceUsageCalculator().calculateTaskResourceUsage(mockTask, clock::getTime); + double taskResourceUsage = ResourceType.CPU.getResourceUsageCalculator().calculateTaskResourceUsage(mockTask); assertEquals(1.0, taskResourceUsage, MIN_VALUE); } public void testQueryGroupMemoryUsage() { - TestClock clock = new TestClock(); - QueryGroupTask mockTask = createMockTaskWithResourceStats(QueryGroupTask.class, 100, 200, 0, 123); - double actualMemoryUsage = ResourceType.MEMORY.getResourceUsageCalculator() - .calculateResourceUsage(List.of(mockTask), clock::getTime); + double actualMemoryUsage = ResourceType.MEMORY.getResourceUsageCalculator().calculateResourceUsage(List.of(mockTask)); double expectedMemoryUsage = 200.0 / HEAP_SIZE_BYTES; assertEquals(expectedMemoryUsage, actualMemoryUsage, MIN_VALUE); assertEquals( 200.0 / HEAP_SIZE_BYTES, - ResourceType.MEMORY.getResourceUsageCalculator().calculateTaskResourceUsage(mockTask, clock::getTime), + ResourceType.MEMORY.getResourceUsageCalculator().calculateTaskResourceUsage(mockTask), MIN_VALUE ); }