Skip to content

Commit

Permalink
QueryGroup Resource Tracking framework and implementation (#13897)
Browse files Browse the repository at this point in the history
* initial code for the sandbox resource tracking and cancellation framework

Signed-off-by: Kiran Prakash <[email protected]>

* Fix Failing Tests

Signed-off-by: Kiran Prakash <[email protected]>

* spotless Apply

Signed-off-by: Kiran Prakash <[email protected]>

* Update SandboxService.java

Signed-off-by: Kiran Prakash <[email protected]>

* Update SandboxService.java

Signed-off-by: Kiran Prakash <[email protected]>

* Update SandboxTask.java

Signed-off-by: Kiran Prakash <[email protected]>

* Add java docs

Signed-off-by: Kiran Prakash <[email protected]>

* spotless

Signed-off-by: Kiran Prakash <[email protected]>

* javadocs

Signed-off-by: Kiran Prakash <[email protected]>

* javadocs

Signed-off-by: Kiran Prakash <[email protected]>

* java docs

Signed-off-by: Kiran Prakash <[email protected]>

* Update AbstractTaskCancellation.java

Signed-off-by: Kiran Prakash <[email protected]>

* Update SandboxModule.java

Signed-off-by: Kiran Prakash <[email protected]>

* Some tests and stubs

Signed-off-by: Kiran Prakash <[email protected]>

* spotless

Signed-off-by: Kiran Prakash <[email protected]>

* :server:testingConventions

Signed-off-by: Kiran Prakash <[email protected]>

* Update AbstractTaskCancellation.java

Signed-off-by: Kiran Prakash <[email protected]>

* more tests

Signed-off-by: Kiran Prakash <[email protected]>

* addressing comments

Signed-off-by: Kiran Prakash <[email protected]>

* revert some accidentally pushed files

Signed-off-by: Kiran Prakash <[email protected]>

* resolve flakiness

Signed-off-by: Kiran Prakash <[email protected]>

* renaming sandbox to querygroup and adjusting code based on merged PRs

Signed-off-by: Kiran Prakash <[email protected]>

* jvm to memory

Signed-off-by: Kiran Prakash <[email protected]>

* missing java docs

Signed-off-by: Kiran Prakash <[email protected]>

* spotless

Signed-off-by: Kiran Prakash <[email protected]>

* Update CHANGELOG.md
Signed-off-by: Kiran Prakash <[email protected]>

* pluck cancellation changes out of this PR

Signed-off-by: Kiran Prakash <[email protected]>

* remove unused

Signed-off-by: Kiran Prakash <[email protected]>

* remove cancellation related code and add more tests coverage

Signed-off-by: Kiran Prakash <[email protected]>

* us only memory and not jvm

Signed-off-by: Kiran Prakash <[email protected]>

* test conventions

Signed-off-by: Kiran Prakash <[email protected]>

* Bring back enum

Signed-off-by: Kiran Prakash <[email protected]>

* Update SearchBackpressureService.java

Signed-off-by: Kiran Prakash <[email protected]>

* revert changes

Signed-off-by: Kiran Prakash <[email protected]>

* revert changes

Signed-off-by: Kiran Prakash <[email protected]>

* all required changes

Signed-off-by: Kiran Prakash <[email protected]>

* Update CHANGELOG.md

Signed-off-by: Kiran Prakash <[email protected]>

* cleanups

Signed-off-by: Kiran Prakash <[email protected]>

* Delete QueryGroupService.java

Signed-off-by: Kiran Prakash <[email protected]>

* cleanups

Signed-off-by: Kiran Prakash <[email protected]>

* Update QueryGroupLevelResourceUsageViewTests.java

Signed-off-by: Kiran Prakash <[email protected]>

* Update QueryGroupLevelResourceUsageViewTests.java

Signed-off-by: Kiran Prakash <[email protected]>

* Update QueryGroupResourceUsageTrackerService.java

Signed-off-by: Kiran Prakash <[email protected]>

* Update QueryGroupResourceUsageTrackerService.java

Signed-off-by: Kiran Prakash <[email protected]>

* Update QueryGroupResourceUsageTrackerService.java

Signed-off-by: Kiran Prakash <[email protected]>

* Update CHANGELOG.md
Signed-off-by: Kiran Prakash <[email protected]>

* rebasing with latest main

Signed-off-by: Kiran Prakash <[email protected]>

* remove experimental

Signed-off-by: Kiran Prakash <[email protected]>

* remove queryGroupId

Signed-off-by: Kiran Prakash <[email protected]>

* Update QueryGroupResourceUsageTrackerService.java

Signed-off-by: Kiran Prakash <[email protected]>

* change code comments

Signed-off-by: Kiran Prakash <[email protected]>

* remmove QueryGroupUsageTracker

Signed-off-by: Kiran Prakash <[email protected]>

* Update QueryGroupResourceUsageTrackerService.java

Signed-off-by: Kiran Prakash <[email protected]>

* Update QueryGroupResourceUsageTrackerService.java

Signed-off-by: Kiran Prakash <[email protected]>

* remove QueryGroupTestHelpers

Signed-off-by: Kiran Prakash <[email protected]>

* cleanups

Signed-off-by: Kiran Prakash <[email protected]>

* remove queryGroupHelper

Signed-off-by: Kiran Prakash <[email protected]>

* Update ResourceTypeTests.java

Signed-off-by: Kiran Prakash <[email protected]>

* extend OpenSearchTestCase

Signed-off-by: Kiran Prakash <[email protected]>

* pr comments

Signed-off-by: Kiran Prakash <[email protected]>

* Update CHANGELOG.md

Signed-off-by: Kiran Prakash <[email protected]>

* Update QueryGroupResourceUsageTrackerServiceTests.java

Signed-off-by: Kiran Prakash <[email protected]>

* Update ResourceTypeTests.java

Signed-off-by: Kiran Prakash <[email protected]>

* Update ResourceTypeTests.java

Signed-off-by: Kiran Prakash <[email protected]>

* Update ResourceType.java

Signed-off-by: Kiran Prakash <[email protected]>

* Update ResourceType.java

Signed-off-by: Kiran Prakash <[email protected]>

---------

Signed-off-by: Kiran Prakash <[email protected]>
(cherry picked from commit 97c1bf0)
  • Loading branch information
kiranprakash154 committed Aug 7, 2024
1 parent d4bdc7a commit 0e81e4e
Show file tree
Hide file tree
Showing 9 changed files with 408 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Concurrent Segment Search] Support composite aggregations with scripting ([#15072](https://github.com/opensearch-project/OpenSearch/pull/15072))
- Add `rangeQuery` and `regexpQuery` for `constant_keyword` field type ([#14711](https://github.com/opensearch-project/OpenSearch/pull/14711))
- Add took time to request nodes stats ([#15054](https://github.com/opensearch-project/OpenSearch/pull/15054))
- [Workload Management] QueryGroup resource tracking framework changes ([#13897](https://github.com/opensearch-project/OpenSearch/pull/13897))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1416,7 +1416,7 @@ public Builder put(final QueryGroup queryGroup) {
return queryGroups(existing);
}

private Map<String, QueryGroup> getQueryGroups() {
public Map<String, QueryGroup> getQueryGroups() {
return Optional.ofNullable(this.customs.get(QueryGroupMetadata.TYPE))
.map(o -> (QueryGroupMetadata) o)
.map(QueryGroupMetadata::queryGroups)
Expand Down
21 changes: 18 additions & 3 deletions server/src/main/java/org/opensearch/search/ResourceType.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,26 @@

import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.tasks.resourcetracker.ResourceStats;
import org.opensearch.tasks.Task;

import java.io.IOException;
import java.util.function.Function;

/**
* Enum to hold the resource type
*/
@PublicApi(since = "2.x")
public enum ResourceType {
CPU("cpu"),
MEMORY("memory");
CPU("cpu", task -> task.getTotalResourceUtilization(ResourceStats.CPU)),
MEMORY("memory", task -> task.getTotalResourceUtilization(ResourceStats.MEMORY));

private final String name;
private final Function<Task, Long> getResourceUsage;

ResourceType(String name) {
ResourceType(String name, Function<Task, Long> getResourceUsage) {
this.name = name;
this.getResourceUsage = getResourceUsage;
}

/**
Expand All @@ -48,4 +53,14 @@ public static void writeTo(StreamOutput out, ResourceType resourceType) throws I
public String getName() {
return name;
}

/**
* Gets the resource usage for a given resource type and task.
*
* @param task the task for which to calculate resource usage
* @return the resource usage
*/
public long getResourceUsage(Task task) {
return getResourceUsage.apply(task);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.wlm;

import org.opensearch.search.ResourceType;
import org.opensearch.tasks.Task;

import java.util.List;
import java.util.Map;

/**
* Represents the point in time view of resource usage of a QueryGroup and
* has a 1:1 relation with a QueryGroup.
* This class holds the resource usage data and the list of active tasks.
*/
public class QueryGroupLevelResourceUsageView {
// resourceUsage holds the resource usage data for a QueryGroup at a point in time
private final Map<ResourceType, Long> resourceUsage;
// activeTasks holds the list of active tasks for a QueryGroup at a point in time
private final List<Task> activeTasks;

public QueryGroupLevelResourceUsageView(Map<ResourceType, Long> resourceUsage, List<Task> activeTasks) {
this.resourceUsage = resourceUsage;
this.activeTasks = activeTasks;
}

/**
* Returns the resource usage data.
*
* @return The map of resource usage data
*/
public Map<ResourceType, Long> getResourceUsageData() {
return resourceUsage;
}

/**
* Returns the list of active tasks.
*
* @return The list of active tasks
*/
public List<Task> getActiveTasks() {
return activeTasks;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.wlm.tracker;

import org.opensearch.search.ResourceType;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.wlm.QueryGroupLevelResourceUsageView;
import org.opensearch.wlm.QueryGroupTask;

import java.util.EnumMap;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* This class tracks resource usage per QueryGroup
*/
public class QueryGroupResourceUsageTrackerService {

public static final EnumSet<ResourceType> TRACKED_RESOURCES = EnumSet.allOf(ResourceType.class);
private final TaskResourceTrackingService taskResourceTrackingService;

/**
* QueryGroupResourceTrackerService constructor
*
* @param taskResourceTrackingService Service that helps track resource usage of tasks running on a node.
*/
public QueryGroupResourceUsageTrackerService(TaskResourceTrackingService taskResourceTrackingService) {
this.taskResourceTrackingService = taskResourceTrackingService;
}

/**
* Constructs a map of QueryGroupLevelResourceUsageView instances for each QueryGroup.
*
* @return Map of QueryGroup views
*/
public Map<String, QueryGroupLevelResourceUsageView> constructQueryGroupLevelUsageViews() {
final Map<String, List<Task>> tasksByQueryGroup = getTasksGroupedByQueryGroup();
final Map<String, QueryGroupLevelResourceUsageView> queryGroupViews = new HashMap<>();

// Iterate over each QueryGroup entry
for (Map.Entry<String, List<Task>> queryGroupEntry : tasksByQueryGroup.entrySet()) {
// Compute the QueryGroup usage
final EnumMap<ResourceType, Long> queryGroupUsage = new EnumMap<>(ResourceType.class);
for (ResourceType resourceType : TRACKED_RESOURCES) {
long queryGroupResourceUsage = 0;
for (Task task : queryGroupEntry.getValue()) {
queryGroupResourceUsage += resourceType.getResourceUsage(task);
}
queryGroupUsage.put(resourceType, queryGroupResourceUsage);
}

// Add to the QueryGroup View
queryGroupViews.put(
queryGroupEntry.getKey(),
new QueryGroupLevelResourceUsageView(queryGroupUsage, queryGroupEntry.getValue())
);
}
return queryGroupViews;
}

/**
* Groups tasks by their associated QueryGroup.
*
* @return Map of tasks grouped by QueryGroup
*/
private Map<String, List<Task>> getTasksGroupedByQueryGroup() {
return taskResourceTrackingService.getResourceAwareTasks()
.values()
.stream()
.filter(QueryGroupTask.class::isInstance)
.map(QueryGroupTask.class::cast)
.collect(Collectors.groupingBy(QueryGroupTask::getQueryGroupId, Collectors.mapping(task -> (Task) task, Collectors.toList())));
}
}
12 changes: 12 additions & 0 deletions server/src/main/java/org/opensearch/wlm/tracker/package-info.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* QueryGroup resource tracking artifacts
*/
package org.opensearch.wlm.tracker;
52 changes: 52 additions & 0 deletions server/src/test/java/org/opensearch/search/ResourceTypeTests.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search;

import org.opensearch.action.search.SearchShardTask;
import org.opensearch.core.tasks.resourcetracker.ResourceStats;
import org.opensearch.tasks.CancellableTask;
import org.opensearch.test.OpenSearchTestCase;

import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class ResourceTypeTests extends OpenSearchTestCase {

public void testFromName() {
assertSame(ResourceType.CPU, ResourceType.fromName("cpu"));
assertThrows(IllegalArgumentException.class, () -> { ResourceType.fromName("CPU"); });
assertThrows(IllegalArgumentException.class, () -> { ResourceType.fromName("Cpu"); });

assertSame(ResourceType.MEMORY, ResourceType.fromName("memory"));
assertThrows(IllegalArgumentException.class, () -> { ResourceType.fromName("Memory"); });
assertThrows(IllegalArgumentException.class, () -> { ResourceType.fromName("MEMORY"); });
assertThrows(IllegalArgumentException.class, () -> { ResourceType.fromName("JVM"); });
assertThrows(IllegalArgumentException.class, () -> { ResourceType.fromName("Heap"); });
assertThrows(IllegalArgumentException.class, () -> { ResourceType.fromName("Disk"); });
}

public void testGetName() {
assertEquals("cpu", ResourceType.CPU.getName());
assertEquals("memory", ResourceType.MEMORY.getName());
}

public void testGetResourceUsage() {
SearchShardTask mockTask = createMockTask(SearchShardTask.class, 100, 200);
assertEquals(100, ResourceType.CPU.getResourceUsage(mockTask));
assertEquals(200, ResourceType.MEMORY.getResourceUsage(mockTask));
}

private <T extends CancellableTask> T createMockTask(Class<T> type, long cpuUsage, long heapUsage) {
T task = mock(type);
when(task.getTotalResourceUtilization(ResourceStats.CPU)).thenReturn(cpuUsage);
when(task.getTotalResourceUtilization(ResourceStats.MEMORY)).thenReturn(heapUsage);
return task;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.wlm;

import org.opensearch.action.search.SearchAction;
import org.opensearch.core.tasks.TaskId;
import org.opensearch.search.ResourceType;
import org.opensearch.tasks.Task;
import org.opensearch.test.OpenSearchTestCase;

import java.util.Collections;
import java.util.List;
import java.util.Map;

public class QueryGroupLevelResourceUsageViewTests extends OpenSearchTestCase {
Map<ResourceType, Long> resourceUsage;
List<Task> activeTasks;

public void setUp() throws Exception {
super.setUp();
resourceUsage = Map.of(ResourceType.fromName("memory"), 34L, ResourceType.fromName("cpu"), 12L);
activeTasks = List.of(getRandomTask(4321));
}

public void testGetResourceUsageData() {
QueryGroupLevelResourceUsageView queryGroupLevelResourceUsageView = new QueryGroupLevelResourceUsageView(
resourceUsage,
activeTasks
);
Map<ResourceType, Long> resourceUsageData = queryGroupLevelResourceUsageView.getResourceUsageData();
assertTrue(assertResourceUsageData(resourceUsageData));
}

public void testGetActiveTasks() {
QueryGroupLevelResourceUsageView queryGroupLevelResourceUsageView = new QueryGroupLevelResourceUsageView(
resourceUsage,
activeTasks
);
List<Task> activeTasks = queryGroupLevelResourceUsageView.getActiveTasks();
assertEquals(1, activeTasks.size());
assertEquals(4321, activeTasks.get(0).getId());
}

private boolean assertResourceUsageData(Map<ResourceType, Long> resourceUsageData) {
return resourceUsageData.get(ResourceType.fromName("memory")) == 34L && resourceUsageData.get(ResourceType.fromName("cpu")) == 12L;
}

private Task getRandomTask(long id) {
return new Task(
id,
"transport",
SearchAction.NAME,
"test description",
new TaskId(randomLong() + ":" + randomLong()),
Collections.emptyMap()
);
}
}
Loading

0 comments on commit 0e81e4e

Please sign in to comment.