diff --git a/server/src/main/java/org/opensearch/wlm/cancellation/MaximumResourceTaskSelectionStrategy.java b/server/src/main/java/org/opensearch/wlm/cancellation/MaximumResourceTaskSelectionStrategy.java index 9407fc32114d0..7216984da8aca 100644 --- a/server/src/main/java/org/opensearch/wlm/cancellation/MaximumResourceTaskSelectionStrategy.java +++ b/server/src/main/java/org/opensearch/wlm/cancellation/MaximumResourceTaskSelectionStrategy.java @@ -17,7 +17,7 @@ import java.util.List; import java.util.stream.Collectors; -import static org.opensearch.wlm.cancellation.TaskCanceller.MIN_VALUE; +import static org.opensearch.wlm.cancellation.TaskCancellationService.MIN_VALUE; /** * Represents the highest resource consuming task first selection strategy. diff --git a/server/src/main/java/org/opensearch/wlm/cancellation/TaskCanceller.java b/server/src/main/java/org/opensearch/wlm/cancellation/TaskCancellationService.java similarity index 94% rename from server/src/main/java/org/opensearch/wlm/cancellation/TaskCanceller.java rename to server/src/main/java/org/opensearch/wlm/cancellation/TaskCancellationService.java index 355ebfb838a05..24cf9f3746404 100644 --- a/server/src/main/java/org/opensearch/wlm/cancellation/TaskCanceller.java +++ b/server/src/main/java/org/opensearch/wlm/cancellation/TaskCancellationService.java @@ -16,6 +16,7 @@ import org.opensearch.wlm.QueryGroupTask; import org.opensearch.wlm.ResourceType; import org.opensearch.wlm.WorkloadManagementSettings; +import org.opensearch.wlm.tracker.QueryGroupResourceUsageTrackerService; import java.util.ArrayList; import java.util.Collection; @@ -44,28 +45,29 @@ * @see QueryGroup * @see ResourceType */ -public class TaskCanceller { +public class TaskCancellationService { public static final double MIN_VALUE = 1e-9; private final WorkloadManagementSettings workloadManagementSettings; private final TaskSelectionStrategy taskSelectionStrategy; + private final QueryGroupResourceUsageTrackerService resourceUsageTrackerService; // a map of QueryGroupId to its corresponding QueryGroupLevelResourceUsageView object - private final Map queryGroupLevelResourceUsageViews; + Map queryGroupLevelResourceUsageViews; private final Collection activeQueryGroups; private final Collection deletedQueryGroups; private BooleanSupplier isNodeInDuress; - public TaskCanceller( + public TaskCancellationService( WorkloadManagementSettings workloadManagementSettings, - MaximumResourceTaskSelectionStrategy taskSelectionStrategy, - Map queryGroupLevelResourceUsageViews, + TaskSelectionStrategy taskSelectionStrategy, + QueryGroupResourceUsageTrackerService resourceUsageTrackerService, Collection activeQueryGroups, Collection deletedQueryGroups, BooleanSupplier isNodeInDuress ) { this.workloadManagementSettings = workloadManagementSettings; this.taskSelectionStrategy = taskSelectionStrategy; - this.queryGroupLevelResourceUsageViews = queryGroupLevelResourceUsageViews; + this.resourceUsageTrackerService = resourceUsageTrackerService; this.activeQueryGroups = activeQueryGroups; this.deletedQueryGroups = deletedQueryGroups; this.isNodeInDuress = isNodeInDuress; @@ -75,6 +77,7 @@ public TaskCanceller( * Cancel tasks based on the implemented strategy. */ public final void cancelTasks() { + queryGroupLevelResourceUsageViews = resourceUsageTrackerService.constructQueryGroupLevelUsageViews(); // cancel tasks from QueryGroups that are in Enforced mode that are breaching their resource limits cancelTasks(ResiliencyMode.ENFORCED); // if the node is in duress, cancel tasks accordingly. diff --git a/server/src/main/java/org/opensearch/wlm/tracker/CpuUsageCalculator.java b/server/src/main/java/org/opensearch/wlm/tracker/CpuUsageCalculator.java index 533fbeecbe945..772e698c324b3 100644 --- a/server/src/main/java/org/opensearch/wlm/tracker/CpuUsageCalculator.java +++ b/server/src/main/java/org/opensearch/wlm/tracker/CpuUsageCalculator.java @@ -31,6 +31,7 @@ public void setNanoTimeSupplier(LongSupplier nanoTimeSupplier) { @Override public double calculateResourceUsage(List tasks) { + assert nanoTimeSupplier != null : "nanoTimeSupplier has to be set in order to calculate the resource usage"; double usage = tasks.stream().mapToDouble(this::calculateTaskResourceUsage).sum(); usage /= PROCESSOR_COUNT; diff --git a/server/src/test/java/org/opensearch/wlm/QueryGroupLevelResourceUsageViewTests.java b/server/src/test/java/org/opensearch/wlm/QueryGroupLevelResourceUsageViewTests.java index 0d254ad73f9f4..77fc6ac6e535b 100644 --- a/server/src/test/java/org/opensearch/wlm/QueryGroupLevelResourceUsageViewTests.java +++ b/server/src/test/java/org/opensearch/wlm/QueryGroupLevelResourceUsageViewTests.java @@ -14,7 +14,7 @@ import java.util.List; import java.util.Map; -import static org.opensearch.wlm.cancellation.TaskCanceller.MIN_VALUE; +import static org.opensearch.wlm.cancellation.TaskCancellationService.MIN_VALUE; import static org.opensearch.wlm.tracker.CpuUsageCalculator.PROCESSOR_COUNT; import static org.opensearch.wlm.tracker.MemoryUsageCalculator.HEAP_SIZE_BYTES; import static org.opensearch.wlm.tracker.ResourceUsageCalculatorTests.createMockTaskWithResourceStats; diff --git a/server/src/test/java/org/opensearch/wlm/cancellation/MaximumResourceTaskSelectionStrategyTests.java b/server/src/test/java/org/opensearch/wlm/cancellation/MaximumResourceTaskSelectionStrategyTests.java index d522a71b630a5..05d080d51aa65 100644 --- a/server/src/test/java/org/opensearch/wlm/cancellation/MaximumResourceTaskSelectionStrategyTests.java +++ b/server/src/test/java/org/opensearch/wlm/cancellation/MaximumResourceTaskSelectionStrategyTests.java @@ -23,7 +23,7 @@ import java.util.List; import java.util.stream.IntStream; -import static org.opensearch.wlm.cancellation.TaskCanceller.MIN_VALUE; +import static org.opensearch.wlm.cancellation.TaskCancellationService.MIN_VALUE; import static org.opensearch.wlm.tracker.MemoryUsageCalculator.HEAP_SIZE_BYTES; public class MaximumResourceTaskSelectionStrategyTests extends OpenSearchTestCase { diff --git a/server/src/test/java/org/opensearch/wlm/cancellation/TaskCancellerTests.java b/server/src/test/java/org/opensearch/wlm/cancellation/TaskCancellationServiceTests.java similarity index 89% rename from server/src/test/java/org/opensearch/wlm/cancellation/TaskCancellerTests.java rename to server/src/test/java/org/opensearch/wlm/cancellation/TaskCancellationServiceTests.java index 8291b6c4b5b1f..07764b93a41ea 100644 --- a/server/src/test/java/org/opensearch/wlm/cancellation/TaskCancellerTests.java +++ b/server/src/test/java/org/opensearch/wlm/cancellation/TaskCancellationServiceTests.java @@ -20,6 +20,7 @@ import org.opensearch.wlm.QueryGroupTask; import org.opensearch.wlm.ResourceType; import org.opensearch.wlm.WorkloadManagementSettings; +import org.opensearch.wlm.tracker.QueryGroupResourceUsageTrackerService; import org.opensearch.wlm.tracker.ResourceUsageCalculatorTrackerServiceTests.TestClock; import org.junit.Before; @@ -30,44 +31,23 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.function.BooleanSupplier; import java.util.stream.Collectors; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class TaskCancellerTests extends OpenSearchTestCase { +public class TaskCancellationServiceTests extends OpenSearchTestCase { private static final String queryGroupId1 = "queryGroup1"; private static final String queryGroupId2 = "queryGroup2"; private TestClock clock; - private static class TestTaskCancellerImpl extends TaskCanceller { - - public TestTaskCancellerImpl( - WorkloadManagementSettings workloadManagementSettings, - MaximumResourceTaskSelectionStrategy highestResourceConsumingTaskFirstSelectionStrategy, - Map queryGroupLevelViews, - Set activeQueryGroups, - Set deletedQueryGroups, - BooleanSupplier isNodeInDuress - ) { - super( - workloadManagementSettings, - highestResourceConsumingTaskFirstSelectionStrategy, - queryGroupLevelViews, - activeQueryGroups, - deletedQueryGroups, - isNodeInDuress - ); - } - } - private Map queryGroupLevelViews; private Set activeQueryGroups; private Set deletedQueryGroups; - private TaskCanceller taskCancellation; + private TaskCancellationService taskCancellation; private WorkloadManagementSettings workloadManagementSettings; + private QueryGroupResourceUsageTrackerService resourceUsageTrackerService; @Before public void setup() { @@ -77,12 +57,14 @@ public void setup() { deletedQueryGroups = new HashSet<>(); clock = new TestClock(); + ResourceType.CPU.getResourceUsageCalculator().setNanoTimeSupplier(clock::getTime); when(workloadManagementSettings.getNodeLevelCpuCancellationThreshold()).thenReturn(0.9); when(workloadManagementSettings.getNodeLevelMemoryCancellationThreshold()).thenReturn(0.9); - taskCancellation = new TestTaskCancellerImpl( + resourceUsageTrackerService = mock(QueryGroupResourceUsageTrackerService.class); + taskCancellation = new TaskCancellationService( workloadManagementSettings, new MaximumResourceTaskSelectionStrategy(), - queryGroupLevelViews, + resourceUsageTrackerService, activeQueryGroups, deletedQueryGroups, () -> false @@ -106,6 +88,7 @@ public void testGetCancellableTasksFrom_setupAppropriateCancellationReasonAndSco QueryGroupLevelResourceUsageView mockView = createResourceUsageViewMock(); when(mockView.getResourceUsageData()).thenReturn(Map.of(resourceType, cpuUsage, ResourceType.MEMORY, memoryUsage)); queryGroupLevelViews.put(queryGroupId1, mockView); + taskCancellation.queryGroupLevelResourceUsageViews = queryGroupLevelViews; List cancellableTasksFrom = taskCancellation.getCancellableTasksFrom(queryGroup1); assertEquals(2, cancellableTasksFrom.size()); @@ -137,6 +120,7 @@ public void testGetCancellableTasksFrom_returnsTasksWhenBreachingThreshold() { QueryGroupLevelResourceUsageView mockView = createResourceUsageViewMock(); when(mockView.getResourceUsageData()).thenReturn(Map.of(resourceType, cpuUsage, ResourceType.MEMORY, memoryUsage)); queryGroupLevelViews.put(queryGroupId1, mockView); + taskCancellation.queryGroupLevelResourceUsageViews = queryGroupLevelViews; List cancellableTasksFrom = taskCancellation.getCancellableTasksFrom(queryGroup1); assertEquals(2, cancellableTasksFrom.size()); @@ -162,6 +146,7 @@ public void testGetCancellableTasksFrom_returnsTasksWhenBreachingThresholdForMem queryGroupLevelViews.put(queryGroupId1, mockView); activeQueryGroups.add(queryGroup1); + taskCancellation.queryGroupLevelResourceUsageViews = queryGroupLevelViews; List cancellableTasksFrom = taskCancellation.getAllCancellableTasks(ResiliencyMode.ENFORCED); assertEquals(2, cancellableTasksFrom.size()); @@ -185,6 +170,7 @@ public void testGetCancellableTasksFrom_returnsNoTasksWhenNotBreachingThreshold( when(mockView.getResourceUsageData()).thenReturn(Map.of(ResourceType.CPU, cpuUsage, ResourceType.MEMORY, memoryUsage)); queryGroupLevelViews.put(queryGroupId1, mockView); activeQueryGroups.add(queryGroup1); + taskCancellation.queryGroupLevelResourceUsageViews = queryGroupLevelViews; List cancellableTasksFrom = taskCancellation.getCancellableTasksFrom(queryGroup1); assertTrue(cancellableTasksFrom.isEmpty()); @@ -205,11 +191,12 @@ public void testGetCancellableTasksFrom_filtersQueryGroupCorrectly() { QueryGroupLevelResourceUsageView mockView = createResourceUsageViewMock(); queryGroupLevelViews.put(queryGroupId1, mockView); activeQueryGroups.add(queryGroup1); + taskCancellation.queryGroupLevelResourceUsageViews = queryGroupLevelViews; - TestTaskCancellerImpl taskCancellation = new TestTaskCancellerImpl( + TaskCancellationService taskCancellation = new TaskCancellationService( workloadManagementSettings, new MaximumResourceTaskSelectionStrategy(), - queryGroupLevelViews, + resourceUsageTrackerService, activeQueryGroups, deletedQueryGroups, () -> false @@ -239,20 +226,23 @@ public void testCancelTasks_cancelsGivenTasks() { queryGroupLevelViews.put(queryGroupId1, mockView); activeQueryGroups.add(queryGroup1); - TestTaskCancellerImpl taskCancellation = new TestTaskCancellerImpl( + TaskCancellationService taskCancellation = new TaskCancellationService( workloadManagementSettings, new MaximumResourceTaskSelectionStrategy(), - queryGroupLevelViews, + resourceUsageTrackerService, activeQueryGroups, deletedQueryGroups, () -> false ); + taskCancellation.queryGroupLevelResourceUsageViews = queryGroupLevelViews; + List cancellableTasksFrom = taskCancellation.getAllCancellableTasks(ResiliencyMode.ENFORCED); assertEquals(2, cancellableTasksFrom.size()); assertEquals(1234, cancellableTasksFrom.get(0).getTask().getId()); assertEquals(4321, cancellableTasksFrom.get(1).getTask().getId()); + when(resourceUsageTrackerService.constructQueryGroupLevelUsageViews()).thenReturn(queryGroupLevelViews); taskCancellation.cancelTasks(); assertTrue(cancellableTasksFrom.get(0).getTask().isCancelled()); assertTrue(cancellableTasksFrom.get(1).getTask().isCancelled()); @@ -299,15 +289,17 @@ public void testCancelTasks_cancelsTasksFromDeletedQueryGroups() { activeQueryGroups.add(activeQueryGroup); deletedQueryGroups.add(deletedQueryGroup); - TestTaskCancellerImpl taskCancellation = new TestTaskCancellerImpl( + TaskCancellationService taskCancellation = new TaskCancellationService( workloadManagementSettings, new MaximumResourceTaskSelectionStrategy(), - queryGroupLevelViews, + resourceUsageTrackerService, activeQueryGroups, deletedQueryGroups, () -> true ); + taskCancellation.queryGroupLevelResourceUsageViews = queryGroupLevelViews; + List cancellableTasksFrom = taskCancellation.getAllCancellableTasks(ResiliencyMode.ENFORCED); assertEquals(2, cancellableTasksFrom.size()); assertEquals(1234, cancellableTasksFrom.get(0).getTask().getId()); @@ -320,6 +312,7 @@ public void testCancelTasks_cancelsTasksFromDeletedQueryGroups() { assertEquals(1000, cancellableTasksFromDeletedQueryGroups.get(0).getTask().getId()); assertEquals(1001, cancellableTasksFromDeletedQueryGroups.get(1).getTask().getId()); + when(resourceUsageTrackerService.constructQueryGroupLevelUsageViews()).thenReturn(queryGroupLevelViews); taskCancellation.cancelTasks(); assertTrue(cancellableTasksFrom.get(0).getTask().isCancelled()); @@ -370,14 +363,15 @@ public void testCancelTasks_does_not_cancelTasksFromDeletedQueryGroups_whenNodeN activeQueryGroups.add(activeQueryGroup); deletedQueryGroups.add(deletedQueryGroup); - TestTaskCancellerImpl taskCancellation = new TestTaskCancellerImpl( + TaskCancellationService taskCancellation = new TaskCancellationService( workloadManagementSettings, new MaximumResourceTaskSelectionStrategy(), - queryGroupLevelViews, + resourceUsageTrackerService, activeQueryGroups, deletedQueryGroups, () -> false ); + taskCancellation.queryGroupLevelResourceUsageViews = queryGroupLevelViews; List cancellableTasksFrom = taskCancellation.getAllCancellableTasks(ResiliencyMode.ENFORCED); assertEquals(2, cancellableTasksFrom.size()); @@ -391,6 +385,7 @@ public void testCancelTasks_does_not_cancelTasksFromDeletedQueryGroups_whenNodeN assertEquals(1000, cancellableTasksFromDeletedQueryGroups.get(0).getTask().getId()); assertEquals(1001, cancellableTasksFromDeletedQueryGroups.get(1).getTask().getId()); + when(resourceUsageTrackerService.constructQueryGroupLevelUsageViews()).thenReturn(queryGroupLevelViews); taskCancellation.cancelTasks(); assertTrue(cancellableTasksFrom.get(0).getTask().isCancelled()); @@ -430,15 +425,17 @@ public void testCancelTasks_cancelsGivenTasks_WhenNodeInDuress() { queryGroupLevelViews.put(queryGroupId2, mockView); Collections.addAll(activeQueryGroups, queryGroup1, queryGroup2); - TestTaskCancellerImpl taskCancellation = new TestTaskCancellerImpl( + TaskCancellationService taskCancellation = new TaskCancellationService( workloadManagementSettings, new MaximumResourceTaskSelectionStrategy(), - queryGroupLevelViews, + resourceUsageTrackerService, activeQueryGroups, deletedQueryGroups, () -> true ); + taskCancellation.queryGroupLevelResourceUsageViews = queryGroupLevelViews; + List cancellableTasksFrom = taskCancellation.getAllCancellableTasks(ResiliencyMode.ENFORCED); assertEquals(2, cancellableTasksFrom.size()); assertEquals(1234, cancellableTasksFrom.get(0).getTask().getId()); @@ -449,6 +446,7 @@ public void testCancelTasks_cancelsGivenTasks_WhenNodeInDuress() { assertEquals(5678, cancellableTasksFrom1.get(0).getTask().getId()); assertEquals(8765, cancellableTasksFrom1.get(1).getTask().getId()); + when(resourceUsageTrackerService.constructQueryGroupLevelUsageViews()).thenReturn(queryGroupLevelViews); taskCancellation.cancelTasks(); assertTrue(cancellableTasksFrom.get(0).getTask().isCancelled()); assertTrue(cancellableTasksFrom.get(1).getTask().isCancelled()); @@ -475,6 +473,7 @@ public void testGetAllCancellableTasks_ReturnsNoTasksWhenNotBreachingThresholds( ); queryGroupLevelViews.put(queryGroupId1, mockView); activeQueryGroups.add(queryGroup1); + taskCancellation.queryGroupLevelResourceUsageViews = queryGroupLevelViews; List allCancellableTasks = taskCancellation.getAllCancellableTasks(ResiliencyMode.ENFORCED); assertTrue(allCancellableTasks.isEmpty()); @@ -497,6 +496,7 @@ public void testGetAllCancellableTasks_ReturnsTasksWhenBreachingThresholds() { when(mockView.getResourceUsageData()).thenReturn(Map.of(ResourceType.CPU, cpuUsage, ResourceType.MEMORY, memoryUsage)); queryGroupLevelViews.put(queryGroupId1, mockView); activeQueryGroups.add(queryGroup1); + taskCancellation.queryGroupLevelResourceUsageViews = queryGroupLevelViews; List allCancellableTasks = taskCancellation.getAllCancellableTasks(ResiliencyMode.ENFORCED); assertEquals(2, allCancellableTasks.size()); @@ -526,6 +526,7 @@ public void testGetCancellableTasksFrom_doesNotReturnTasksWhenQueryGroupIdNotFou queryGroupLevelViews.put(queryGroupId1, mockView); activeQueryGroups.add(queryGroup1); activeQueryGroups.add(queryGroup2); + taskCancellation.queryGroupLevelResourceUsageViews = queryGroupLevelViews; List cancellableTasksFrom = taskCancellation.getCancellableTasksFrom(queryGroup2); assertEquals(0, cancellableTasksFrom.size()); diff --git a/server/src/test/java/org/opensearch/wlm/tracker/ResourceUsageCalculatorTests.java b/server/src/test/java/org/opensearch/wlm/tracker/ResourceUsageCalculatorTests.java index ad6ce267b2452..044239e2a1ecd 100644 --- a/server/src/test/java/org/opensearch/wlm/tracker/ResourceUsageCalculatorTests.java +++ b/server/src/test/java/org/opensearch/wlm/tracker/ResourceUsageCalculatorTests.java @@ -20,7 +20,7 @@ import java.util.List; import java.util.Map; -import static org.opensearch.wlm.cancellation.TaskCanceller.MIN_VALUE; +import static org.opensearch.wlm.cancellation.TaskCancellationService.MIN_VALUE; import static org.opensearch.wlm.tracker.CpuUsageCalculator.PROCESSOR_COUNT; import static org.opensearch.wlm.tracker.MemoryUsageCalculator.HEAP_SIZE_BYTES; import static org.mockito.Mockito.mock; diff --git a/server/src/test/java/org/opensearch/wlm/tracker/ResourceUsageCalculatorTrackerServiceTests.java b/server/src/test/java/org/opensearch/wlm/tracker/ResourceUsageCalculatorTrackerServiceTests.java index d109f37243ee2..63913f5a8f67e 100644 --- a/server/src/test/java/org/opensearch/wlm/tracker/ResourceUsageCalculatorTrackerServiceTests.java +++ b/server/src/test/java/org/opensearch/wlm/tracker/ResourceUsageCalculatorTrackerServiceTests.java @@ -32,7 +32,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import static org.opensearch.wlm.QueryGroupTask.QUERY_GROUP_ID_HEADER; -import static org.opensearch.wlm.cancellation.TaskCanceller.MIN_VALUE; +import static org.opensearch.wlm.cancellation.TaskCancellationService.MIN_VALUE; import static org.opensearch.wlm.tracker.CpuUsageCalculator.PROCESSOR_COUNT; import static org.opensearch.wlm.tracker.MemoryUsageCalculator.HEAP_SIZE_BYTES; import static org.mockito.ArgumentMatchers.anyString;