diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index fb7d73f599670..e75e22f30431d 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -11,7 +11,7 @@ # 3. Use the command palette to run the CODEOWNERS: Show owners of current file command, which will display all code owners for the current file. # Default ownership for all repo files -* @anasalkouz @andrross @ashking94 @Bukhtawar @CEHENKLE @dblock @dbwiddis @gbbafna @kotwanikunal @mch2 @msfroh @nknize @owaiskazi19 @reta @Rishikesh1159 @sachinpkale @saratvemulapalli @shwetathareja @sohami @VachaShah +* @anasalkouz @andrross @ashking94 @Bukhtawar @CEHENKLE @dblock @dbwiddis @gbbafna @kotwanikunal @linuxpi @mch2 @msfroh @nknize @owaiskazi19 @reta @Rishikesh1159 @sachinpkale @saratvemulapalli @shwetathareja @sohami @VachaShah /modules/lang-painless/ @anasalkouz @andrross @ashking94 @Bukhtawar @CEHENKLE @dblock @dbwiddis @gbbafna @jed326 @kotwanikunal @mch2 @msfroh @nknize @owaiskazi19 @reta @Rishikesh1159 @sachinpkale @saratvemulapalli @shwetathareja @sohami @VachaShah /modules/parent-join/ @anasalkouz @andrross @ashking94 @Bukhtawar @CEHENKLE @dblock @dbwiddis @gbbafna @jed326 @kotwanikunal @mch2 @msfroh @nknize @owaiskazi19 @reta @Rishikesh1159 @sachinpkale @saratvemulapalli @shwetathareja @sohami @VachaShah diff --git a/.github/workflows/benchmark-pull-request.yml b/.github/workflows/benchmark-pull-request.yml index 1096014e4a291..0173b7e35c64d 100644 --- a/.github/workflows/benchmark-pull-request.yml +++ b/.github/workflows/benchmark-pull-request.yml @@ -62,6 +62,7 @@ jobs: } if (benchmarkConfigs[configId].hasOwnProperty('baseline_cluster_config')) { core.exportVariable('BASELINE_CLUSTER_CONFIG', benchmarkConfigs[configId]['baseline_cluster_config']); + } - name: Post invalid format comment if: steps.check_comment.outputs.invalid == 'true' uses: actions/github-script@v7 diff --git a/CHANGELOG-3.0.md b/CHANGELOG-3.0.md index 78e93eed0158a..48d978bede420 100644 --- a/CHANGELOG-3.0.md +++ b/CHANGELOG-3.0.md @@ -13,7 +13,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - GHA to verify checklist items completion in PR descriptions ([#10800](https://github.com/opensearch-project/OpenSearch/pull/10800)) - Allow to pass the list settings through environment variables (like [], ["a", "b", "c"], ...) ([#10625](https://github.com/opensearch-project/OpenSearch/pull/10625)) - Views, simplify data access and manipulation by providing a virtual layer over one or more indices ([#11957](https://github.com/opensearch-project/OpenSearch/pull/11957)) -- Add took time to request nodes stats ([#15054](https://github.com/opensearch-project/OpenSearch/pull/15054)) ### Dependencies diff --git a/CHANGELOG.md b/CHANGELOG.md index be5e5598b09c2..f44949bf38511 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add ThreadContextPermission for stashAndMergeHeaders and stashWithOrigin ([#15039](https://github.com/opensearch-project/OpenSearch/pull/15039)) - [Concurrent Segment Search] Support composite aggregations with scripting ([#15072](https://github.com/opensearch-project/OpenSearch/pull/15072)) - Add `rangeQuery` and `regexpQuery` for `constant_keyword` field type ([#14711](https://github.com/opensearch-project/OpenSearch/pull/14711)) +- Add took time to request nodes stats ([#15054](https://github.com/opensearch-project/OpenSearch/pull/15054)) +- [Workload Management] QueryGroup resource tracking framework changes ([#13897](https://github.com/opensearch-project/OpenSearch/pull/13897)) ### Dependencies - Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081)) diff --git a/MAINTAINERS.md b/MAINTAINERS.md index f77c69ddeff2a..7be19fbc877d7 100644 --- a/MAINTAINERS.md +++ b/MAINTAINERS.md @@ -5,7 +5,7 @@ This document contains a list of maintainers in this repo. See [opensearch-proje ## Current Maintainers | Maintainer | GitHub ID | Affiliation | -| ------------------------ | ------------------------------------------------------- | ----------- | +|--------------------------|---------------------------------------------------------|-------------| | Anas Alkouz | [anasalkouz](https://github.com/anasalkouz) | Amazon | | Andrew Ross | [andrross](https://github.com/andrross) | Amazon | | Andriy Redko | [reta](https://github.com/reta) | Aiven | @@ -18,6 +18,7 @@ This document contains a list of maintainers in this repo. See [opensearch-proje | Gaurav Bafna | [gbbafna](https://github.com/gbbafna) | Amazon | | Jay Deng | [jed326](https://github.com/jed326) | Amazon | | Kunal Kotwani | [kotwanikunal](https://github.com/kotwanikunal) | Amazon | +| Varun Bansal | [linuxpi](https://github.com/linuxpi) | Amazon | | Marc Handalian | [mch2](https://github.com/mch2) | Amazon | | Michael Froh | [msfroh](https://github.com/msfroh) | Amazon | | Nick Knize | [nknize](https://github.com/nknize) | Amazon | diff --git a/TRIAGING.md b/TRIAGING.md index 53ef77de49159..dddcbc15394ab 100644 --- a/TRIAGING.md +++ b/TRIAGING.md @@ -31,8 +31,8 @@ Meeting structure may vary slightly, but the general structure is as follows: - [Search](https://github.com/opensearch-project/OpenSearch/issues?q=is%3Aissue+is%3Aopen+label%3Auntriaged+label%3A%22Search%22%2C%22Search%3ARemote+Search%22%2C%22Search%3AResiliency%22%2C%22Search%3APerformance%22%2C%22Search%3ARelevance%22%2C%22Search%3AAggregations%22%2C%22Search%3AQuery+Capabilities%22%2C%22Search%3AQuery+Insights%22%2C%22Search%3ASearchable+Snapshots%22%2C%22Search%3AUser+Behavior+Insights%22) - [Indexing](https://github.com/opensearch-project/OpenSearch/issues?q=is%3Aissue+is%3Aopen+label%3Auntriaged+label%3A%22Indexing%3AReplication%22%2C%22Indexing%22%2C%22Indexing%3APerformance%22%2C%22Indexing+%26+Search%22%2C) - [Storage](https://github.com/opensearch-project/OpenSearch/issues?q=is%3Aissue+is%3Aopen+label%3Auntriaged+label%3AStorage%2C%22Storage%3AResiliency%22%2C%22Storage%3APerformance%22%2C%22Storage%3ASnapshots%22%2C%22Storage%3ARemote%22%2C%22Storage%3ADurability%22) - - [Cluster Manager](https://github.com/opensearch-project/OpenSearch/issues?q=is%3Aissue+is%3Aopen+label%3Auntriaged+label%3A%22Cluster+Manager%22%2C%22ClusterManager%3ARemoteState%22) - - [Core](https://github.com/opensearch-project/OpenSearch/issues?q=is%3Aissue+is%3Aopen+label%3Auntriaged+-label%3A%22Search%22%2C%22Search%3ARemote+Search%22%2C%22Search%3AResiliency%22%2C%22Search%3APerformance%22%2C%22Search%3ARelevance%22%2C%22Search%3AAggregations%22%2C%22Search%3AQuery+Capabilities%22%2C%22Search%3AQuery+Insights%22%2C%22Search%3ASearchable+Snapshots%22%2C%22Search%3AUser+Behavior+Insights%22%2C%22Storage%22%2C%22Storage%3AResiliency%22%2C%22Storage%3APerformance%22%2C%22Storage%3ASnapshots%22%2C%22Storage%3ARemote%22%2C%22Storage%3ADurability%22%2C%22Cluster+Manager%22%2C%22ClusterManager%3ARemoteState%22%2C%22Indexing%3AReplication%22%2C%22Indexing%22%2C%22Indexing%3APerformance%22%2C%22Indexing+%26+Search%22) + - [Cluster Manager](https://github.com/opensearch-project/OpenSearch/issues?q=is%3Aissue+is%3Aopen+label%3Auntriaged+label%3A%22Cluster+Manager%22%2C%22ClusterManager%3ARemoteState%22%2C%22ShardManagement%3AResiliency%22%2C%22ShardManagement%3AInsights%22%2C%22ShardManagement%3ASizing%22%2C%22ShardManagement%3APerformance%22%2C%22ShardManagement%3APlacement%22%2C%22ShardManagement%3ARouting%22) + - [Core](https://github.com/opensearch-project/OpenSearch/issues?q=is%3Aissue+is%3Aopen+label%3Auntriaged+-label%3A%22Search%22%2C%22Search%3ARemote+Search%22%2C%22Search%3AResiliency%22%2C%22Search%3APerformance%22%2C%22Search%3ARelevance%22%2C%22Search%3AAggregations%22%2C%22Search%3AQuery+Capabilities%22%2C%22Search%3AQuery+Insights%22%2C%22Search%3ASearchable+Snapshots%22%2C%22Search%3AUser+Behavior+Insights%22%2C%22Storage%22%2C%22Storage%3AResiliency%22%2C%22Storage%3APerformance%22%2C%22Storage%3ASnapshots%22%2C%22Storage%3ARemote%22%2C%22Storage%3ADurability%22%2C%22Cluster+Manager%22%2C%22ClusterManager%3ARemoteState%22%2C%22ShardManagement%3AResiliency%22%2C%22ShardManagement%3AInsights%22%2C%22ShardManagement%3ASizing%22%2C%22ShardManagement%3APerformance%22%2C%22ShardManagement%3APlacement%22%2C%22ShardManagement%3ARouting%22%2C%22Indexing%3AReplication%22%2C%22Indexing%22%2C%22Indexing%3APerformance%22%2C%22Indexing+%26+Search%22) 5. **Attendee Requests:** An opportunity for any meeting member to request consideration of an issue or pull request. 6. **Open Discussion:** Attendees can bring up any topics not already covered by filed issues or pull requests. 7. **Review of Old Untriaged Issues:** Look at all [untriaged issues older than 14 days](https://peternied.github.io/redirect/issue_search.html?owner=opensearch-project&repo=OpenSearch&tag=untriaged&created-since-days=14) to prevent issues from falling through the cracks. diff --git a/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java b/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java index 440b9e267cf0a..09bef2ddf9ee6 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java @@ -1391,7 +1391,7 @@ public Builder put(final QueryGroup queryGroup) { return queryGroups(existing); } - private Map getQueryGroups() { + public Map getQueryGroups() { return Optional.ofNullable(this.customs.get(QueryGroupMetadata.TYPE)) .map(o -> (QueryGroupMetadata) o) .map(QueryGroupMetadata::queryGroups) diff --git a/server/src/main/java/org/opensearch/search/ResourceType.java b/server/src/main/java/org/opensearch/search/ResourceType.java index fe5ce4dd2bb50..0cba2222a6e20 100644 --- a/server/src/main/java/org/opensearch/search/ResourceType.java +++ b/server/src/main/java/org/opensearch/search/ResourceType.java @@ -10,21 +10,26 @@ import org.opensearch.common.annotation.PublicApi; import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.tasks.resourcetracker.ResourceStats; +import org.opensearch.tasks.Task; import java.io.IOException; +import java.util.function.Function; /** * Enum to hold the resource type */ @PublicApi(since = "2.x") public enum ResourceType { - CPU("cpu"), - MEMORY("memory"); + CPU("cpu", task -> task.getTotalResourceUtilization(ResourceStats.CPU)), + MEMORY("memory", task -> task.getTotalResourceUtilization(ResourceStats.MEMORY)); private final String name; + private final Function getResourceUsage; - ResourceType(String name) { + ResourceType(String name, Function getResourceUsage) { this.name = name; + this.getResourceUsage = getResourceUsage; } /** @@ -48,4 +53,14 @@ public static void writeTo(StreamOutput out, ResourceType resourceType) throws I public String getName() { return name; } + + /** + * Gets the resource usage for a given resource type and task. + * + * @param task the task for which to calculate resource usage + * @return the resource usage + */ + public long getResourceUsage(Task task) { + return getResourceUsage.apply(task); + } } diff --git a/server/src/main/java/org/opensearch/wlm/QueryGroupLevelResourceUsageView.java b/server/src/main/java/org/opensearch/wlm/QueryGroupLevelResourceUsageView.java new file mode 100644 index 0000000000000..2fd743dc3f83f --- /dev/null +++ b/server/src/main/java/org/opensearch/wlm/QueryGroupLevelResourceUsageView.java @@ -0,0 +1,50 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.wlm; + +import org.opensearch.search.ResourceType; +import org.opensearch.tasks.Task; + +import java.util.List; +import java.util.Map; + +/** + * Represents the point in time view of resource usage of a QueryGroup and + * has a 1:1 relation with a QueryGroup. + * This class holds the resource usage data and the list of active tasks. + */ +public class QueryGroupLevelResourceUsageView { + // resourceUsage holds the resource usage data for a QueryGroup at a point in time + private final Map resourceUsage; + // activeTasks holds the list of active tasks for a QueryGroup at a point in time + private final List activeTasks; + + public QueryGroupLevelResourceUsageView(Map resourceUsage, List activeTasks) { + this.resourceUsage = resourceUsage; + this.activeTasks = activeTasks; + } + + /** + * Returns the resource usage data. + * + * @return The map of resource usage data + */ + public Map getResourceUsageData() { + return resourceUsage; + } + + /** + * Returns the list of active tasks. + * + * @return The list of active tasks + */ + public List getActiveTasks() { + return activeTasks; + } +} diff --git a/server/src/main/java/org/opensearch/wlm/tracker/QueryGroupResourceUsageTrackerService.java b/server/src/main/java/org/opensearch/wlm/tracker/QueryGroupResourceUsageTrackerService.java new file mode 100644 index 0000000000000..bfbf5d8a452d1 --- /dev/null +++ b/server/src/main/java/org/opensearch/wlm/tracker/QueryGroupResourceUsageTrackerService.java @@ -0,0 +1,84 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.wlm.tracker; + +import org.opensearch.search.ResourceType; +import org.opensearch.tasks.Task; +import org.opensearch.tasks.TaskResourceTrackingService; +import org.opensearch.wlm.QueryGroupLevelResourceUsageView; +import org.opensearch.wlm.QueryGroupTask; + +import java.util.EnumMap; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * This class tracks resource usage per QueryGroup + */ +public class QueryGroupResourceUsageTrackerService { + + public static final EnumSet TRACKED_RESOURCES = EnumSet.allOf(ResourceType.class); + private final TaskResourceTrackingService taskResourceTrackingService; + + /** + * QueryGroupResourceTrackerService constructor + * + * @param taskResourceTrackingService Service that helps track resource usage of tasks running on a node. + */ + public QueryGroupResourceUsageTrackerService(TaskResourceTrackingService taskResourceTrackingService) { + this.taskResourceTrackingService = taskResourceTrackingService; + } + + /** + * Constructs a map of QueryGroupLevelResourceUsageView instances for each QueryGroup. + * + * @return Map of QueryGroup views + */ + public Map constructQueryGroupLevelUsageViews() { + final Map> tasksByQueryGroup = getTasksGroupedByQueryGroup(); + final Map queryGroupViews = new HashMap<>(); + + // Iterate over each QueryGroup entry + for (Map.Entry> queryGroupEntry : tasksByQueryGroup.entrySet()) { + // Compute the QueryGroup usage + final EnumMap queryGroupUsage = new EnumMap<>(ResourceType.class); + for (ResourceType resourceType : TRACKED_RESOURCES) { + long queryGroupResourceUsage = 0; + for (Task task : queryGroupEntry.getValue()) { + queryGroupResourceUsage += resourceType.getResourceUsage(task); + } + queryGroupUsage.put(resourceType, queryGroupResourceUsage); + } + + // Add to the QueryGroup View + queryGroupViews.put( + queryGroupEntry.getKey(), + new QueryGroupLevelResourceUsageView(queryGroupUsage, queryGroupEntry.getValue()) + ); + } + return queryGroupViews; + } + + /** + * Groups tasks by their associated QueryGroup. + * + * @return Map of tasks grouped by QueryGroup + */ + private Map> getTasksGroupedByQueryGroup() { + return taskResourceTrackingService.getResourceAwareTasks() + .values() + .stream() + .filter(QueryGroupTask.class::isInstance) + .map(QueryGroupTask.class::cast) + .collect(Collectors.groupingBy(QueryGroupTask::getQueryGroupId, Collectors.mapping(task -> (Task) task, Collectors.toList()))); + } +} diff --git a/server/src/main/java/org/opensearch/wlm/tracker/package-info.java b/server/src/main/java/org/opensearch/wlm/tracker/package-info.java new file mode 100644 index 0000000000000..86efc99355d3d --- /dev/null +++ b/server/src/main/java/org/opensearch/wlm/tracker/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * QueryGroup resource tracking artifacts + */ +package org.opensearch.wlm.tracker; diff --git a/server/src/test/java/org/opensearch/search/ResourceTypeTests.java b/server/src/test/java/org/opensearch/search/ResourceTypeTests.java new file mode 100644 index 0000000000000..78827b8b1bdad --- /dev/null +++ b/server/src/test/java/org/opensearch/search/ResourceTypeTests.java @@ -0,0 +1,52 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search; + +import org.opensearch.action.search.SearchShardTask; +import org.opensearch.core.tasks.resourcetracker.ResourceStats; +import org.opensearch.tasks.CancellableTask; +import org.opensearch.test.OpenSearchTestCase; + +import static org.junit.Assert.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ResourceTypeTests extends OpenSearchTestCase { + + public void testFromName() { + assertSame(ResourceType.CPU, ResourceType.fromName("cpu")); + assertThrows(IllegalArgumentException.class, () -> { ResourceType.fromName("CPU"); }); + assertThrows(IllegalArgumentException.class, () -> { ResourceType.fromName("Cpu"); }); + + assertSame(ResourceType.MEMORY, ResourceType.fromName("memory")); + assertThrows(IllegalArgumentException.class, () -> { ResourceType.fromName("Memory"); }); + assertThrows(IllegalArgumentException.class, () -> { ResourceType.fromName("MEMORY"); }); + assertThrows(IllegalArgumentException.class, () -> { ResourceType.fromName("JVM"); }); + assertThrows(IllegalArgumentException.class, () -> { ResourceType.fromName("Heap"); }); + assertThrows(IllegalArgumentException.class, () -> { ResourceType.fromName("Disk"); }); + } + + public void testGetName() { + assertEquals("cpu", ResourceType.CPU.getName()); + assertEquals("memory", ResourceType.MEMORY.getName()); + } + + public void testGetResourceUsage() { + SearchShardTask mockTask = createMockTask(SearchShardTask.class, 100, 200); + assertEquals(100, ResourceType.CPU.getResourceUsage(mockTask)); + assertEquals(200, ResourceType.MEMORY.getResourceUsage(mockTask)); + } + + private T createMockTask(Class type, long cpuUsage, long heapUsage) { + T task = mock(type); + when(task.getTotalResourceUtilization(ResourceStats.CPU)).thenReturn(cpuUsage); + when(task.getTotalResourceUtilization(ResourceStats.MEMORY)).thenReturn(heapUsage); + return task; + } +} diff --git a/server/src/test/java/org/opensearch/wlm/QueryGroupLevelResourceUsageViewTests.java b/server/src/test/java/org/opensearch/wlm/QueryGroupLevelResourceUsageViewTests.java new file mode 100644 index 0000000000000..7f6419505fec2 --- /dev/null +++ b/server/src/test/java/org/opensearch/wlm/QueryGroupLevelResourceUsageViewTests.java @@ -0,0 +1,64 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.wlm; + +import org.opensearch.action.search.SearchAction; +import org.opensearch.core.tasks.TaskId; +import org.opensearch.search.ResourceType; +import org.opensearch.tasks.Task; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class QueryGroupLevelResourceUsageViewTests extends OpenSearchTestCase { + Map resourceUsage; + List activeTasks; + + public void setUp() throws Exception { + super.setUp(); + resourceUsage = Map.of(ResourceType.fromName("memory"), 34L, ResourceType.fromName("cpu"), 12L); + activeTasks = List.of(getRandomTask(4321)); + } + + public void testGetResourceUsageData() { + QueryGroupLevelResourceUsageView queryGroupLevelResourceUsageView = new QueryGroupLevelResourceUsageView( + resourceUsage, + activeTasks + ); + Map resourceUsageData = queryGroupLevelResourceUsageView.getResourceUsageData(); + assertTrue(assertResourceUsageData(resourceUsageData)); + } + + public void testGetActiveTasks() { + QueryGroupLevelResourceUsageView queryGroupLevelResourceUsageView = new QueryGroupLevelResourceUsageView( + resourceUsage, + activeTasks + ); + List activeTasks = queryGroupLevelResourceUsageView.getActiveTasks(); + assertEquals(1, activeTasks.size()); + assertEquals(4321, activeTasks.get(0).getId()); + } + + private boolean assertResourceUsageData(Map resourceUsageData) { + return resourceUsageData.get(ResourceType.fromName("memory")) == 34L && resourceUsageData.get(ResourceType.fromName("cpu")) == 12L; + } + + private Task getRandomTask(long id) { + return new Task( + id, + "transport", + SearchAction.NAME, + "test description", + new TaskId(randomLong() + ":" + randomLong()), + Collections.emptyMap() + ); + } +} diff --git a/server/src/test/java/org/opensearch/wlm/tracker/QueryGroupResourceUsageTrackerServiceTests.java b/server/src/test/java/org/opensearch/wlm/tracker/QueryGroupResourceUsageTrackerServiceTests.java new file mode 100644 index 0000000000000..967119583c25f --- /dev/null +++ b/server/src/test/java/org/opensearch/wlm/tracker/QueryGroupResourceUsageTrackerServiceTests.java @@ -0,0 +1,126 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.wlm.tracker; + +import org.opensearch.action.search.SearchShardTask; +import org.opensearch.action.search.SearchTask; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.core.tasks.resourcetracker.ResourceStats; +import org.opensearch.search.ResourceType; +import org.opensearch.tasks.CancellableTask; +import org.opensearch.tasks.Task; +import org.opensearch.tasks.TaskResourceTrackingService; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.wlm.QueryGroupLevelResourceUsageView; +import org.opensearch.wlm.QueryGroupTask; +import org.junit.After; +import org.junit.Before; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.opensearch.wlm.QueryGroupTask.QUERY_GROUP_ID_HEADER; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class QueryGroupResourceUsageTrackerServiceTests extends OpenSearchTestCase { + TestThreadPool threadPool; + TaskResourceTrackingService mockTaskResourceTrackingService; + QueryGroupResourceUsageTrackerService queryGroupResourceUsageTrackerService; + + @Before + public void setup() { + threadPool = new TestThreadPool(getTestName()); + mockTaskResourceTrackingService = mock(TaskResourceTrackingService.class); + queryGroupResourceUsageTrackerService = new QueryGroupResourceUsageTrackerService(mockTaskResourceTrackingService); + } + + @After + public void cleanup() { + ThreadPool.terminate(threadPool, 5, TimeUnit.SECONDS); + } + + public void testConstructQueryGroupLevelViews_CreatesQueryGroupLevelUsageView_WhenTasksArePresent() { + List queryGroupIds = List.of("queryGroup1", "queryGroup2", "queryGroup3"); + + Map activeSearchShardTasks = createActiveSearchShardTasks(queryGroupIds); + when(mockTaskResourceTrackingService.getResourceAwareTasks()).thenReturn(activeSearchShardTasks); + Map stringQueryGroupLevelResourceUsageViewMap = queryGroupResourceUsageTrackerService + .constructQueryGroupLevelUsageViews(); + + for (String queryGroupId : queryGroupIds) { + assertEquals( + 400, + (long) stringQueryGroupLevelResourceUsageViewMap.get(queryGroupId).getResourceUsageData().get(ResourceType.MEMORY) + ); + assertEquals(2, stringQueryGroupLevelResourceUsageViewMap.get(queryGroupId).getActiveTasks().size()); + } + } + + public void testConstructQueryGroupLevelViews_CreatesQueryGroupLevelUsageView_WhenTasksAreNotPresent() { + Map stringQueryGroupLevelResourceUsageViewMap = queryGroupResourceUsageTrackerService + .constructQueryGroupLevelUsageViews(); + assertTrue(stringQueryGroupLevelResourceUsageViewMap.isEmpty()); + } + + public void testConstructQueryGroupLevelUsageViews_WithTasksHavingDifferentResourceUsage() { + Map activeSearchShardTasks = new HashMap<>(); + activeSearchShardTasks.put(1L, createMockTask(SearchShardTask.class, 100, 200, "queryGroup1")); + activeSearchShardTasks.put(2L, createMockTask(SearchShardTask.class, 200, 400, "queryGroup1")); + when(mockTaskResourceTrackingService.getResourceAwareTasks()).thenReturn(activeSearchShardTasks); + + Map queryGroupViews = queryGroupResourceUsageTrackerService + .constructQueryGroupLevelUsageViews(); + + assertEquals(600, (long) queryGroupViews.get("queryGroup1").getResourceUsageData().get(ResourceType.MEMORY)); + assertEquals(2, queryGroupViews.get("queryGroup1").getActiveTasks().size()); + } + + private Map createActiveSearchShardTasks(List queryGroupIds) { + Map activeSearchShardTasks = new HashMap<>(); + long task_id = 0; + for (String queryGroupId : queryGroupIds) { + for (int i = 0; i < 2; i++) { + activeSearchShardTasks.put(++task_id, createMockTask(SearchShardTask.class, 100, 200, queryGroupId)); + } + } + return activeSearchShardTasks; + } + + private T createMockTask(Class type, long cpuUsage, long heapUsage, String queryGroupId) { + T task = mock(type); + if (task instanceof SearchTask || task instanceof SearchShardTask) { + // Stash the current thread context to ensure that any existing context is preserved and restored after setting the query group + // ID. + try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) { + threadPool.getThreadContext().putHeader(QUERY_GROUP_ID_HEADER, queryGroupId); + ((QueryGroupTask) task).setQueryGroupId(threadPool.getThreadContext()); + } + } + when(task.getTotalResourceUtilization(ResourceStats.CPU)).thenReturn(cpuUsage); + when(task.getTotalResourceUtilization(ResourceStats.MEMORY)).thenReturn(heapUsage); + when(task.getStartTimeNanos()).thenReturn((long) 0); + + AtomicBoolean isCancelled = new AtomicBoolean(false); + doAnswer(invocation -> { + isCancelled.set(true); + return null; + }).when(task).cancel(anyString()); + doAnswer(invocation -> isCancelled.get()).when(task).isCancelled(); + + return task; + } +}