Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] QueryGroup Resource Tracking framework and implementation (#13897) #15154

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Concurrent Segment Search] Support composite aggregations with scripting ([#15072](https://github.com/opensearch-project/OpenSearch/pull/15072))
- Add `rangeQuery` and `regexpQuery` for `constant_keyword` field type ([#14711](https://github.com/opensearch-project/OpenSearch/pull/14711))
- Add took time to request nodes stats ([#15054](https://github.com/opensearch-project/OpenSearch/pull/15054))
- [Workload Management] QueryGroup resource tracking framework changes ([#13897](https://github.com/opensearch-project/OpenSearch/pull/13897))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1416,7 +1416,7 @@ public Builder put(final QueryGroup queryGroup) {
return queryGroups(existing);
}

private Map<String, QueryGroup> getQueryGroups() {
public Map<String, QueryGroup> getQueryGroups() {
return Optional.ofNullable(this.customs.get(QueryGroupMetadata.TYPE))
.map(o -> (QueryGroupMetadata) o)
.map(QueryGroupMetadata::queryGroups)
Expand Down
21 changes: 18 additions & 3 deletions server/src/main/java/org/opensearch/search/ResourceType.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,26 @@

import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.tasks.resourcetracker.ResourceStats;
import org.opensearch.tasks.Task;

import java.io.IOException;
import java.util.function.Function;

/**
* Enum to hold the resource type
*/
@PublicApi(since = "2.x")
public enum ResourceType {
CPU("cpu"),
MEMORY("memory");
CPU("cpu", task -> task.getTotalResourceUtilization(ResourceStats.CPU)),
MEMORY("memory", task -> task.getTotalResourceUtilization(ResourceStats.MEMORY));

private final String name;
private final Function<Task, Long> getResourceUsage;

ResourceType(String name) {
ResourceType(String name, Function<Task, Long> getResourceUsage) {
this.name = name;
this.getResourceUsage = getResourceUsage;
}

/**
Expand All @@ -48,4 +53,14 @@ public static void writeTo(StreamOutput out, ResourceType resourceType) throws I
public String getName() {
return name;
}

/**
* Gets the resource usage for a given resource type and task.
*
* @param task the task for which to calculate resource usage
* @return the resource usage
*/
public long getResourceUsage(Task task) {
return getResourceUsage.apply(task);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.wlm;

import org.opensearch.search.ResourceType;
import org.opensearch.tasks.Task;

import java.util.List;
import java.util.Map;

/**
* Represents the point in time view of resource usage of a QueryGroup and
* has a 1:1 relation with a QueryGroup.
* This class holds the resource usage data and the list of active tasks.
*/
public class QueryGroupLevelResourceUsageView {
// resourceUsage holds the resource usage data for a QueryGroup at a point in time
private final Map<ResourceType, Long> resourceUsage;
// activeTasks holds the list of active tasks for a QueryGroup at a point in time
private final List<Task> activeTasks;

public QueryGroupLevelResourceUsageView(Map<ResourceType, Long> resourceUsage, List<Task> activeTasks) {
this.resourceUsage = resourceUsage;
this.activeTasks = activeTasks;
}

/**
* Returns the resource usage data.
*
* @return The map of resource usage data
*/
public Map<ResourceType, Long> getResourceUsageData() {
return resourceUsage;
}

/**
* Returns the list of active tasks.
*
* @return The list of active tasks
*/
public List<Task> getActiveTasks() {
return activeTasks;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.wlm.tracker;

import org.opensearch.search.ResourceType;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.wlm.QueryGroupLevelResourceUsageView;
import org.opensearch.wlm.QueryGroupTask;

import java.util.EnumMap;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* This class tracks resource usage per QueryGroup
*/
public class QueryGroupResourceUsageTrackerService {

public static final EnumSet<ResourceType> TRACKED_RESOURCES = EnumSet.allOf(ResourceType.class);
private final TaskResourceTrackingService taskResourceTrackingService;

/**
* QueryGroupResourceTrackerService constructor
*
* @param taskResourceTrackingService Service that helps track resource usage of tasks running on a node.
*/
public QueryGroupResourceUsageTrackerService(TaskResourceTrackingService taskResourceTrackingService) {
this.taskResourceTrackingService = taskResourceTrackingService;
}

/**
* Constructs a map of QueryGroupLevelResourceUsageView instances for each QueryGroup.
*
* @return Map of QueryGroup views
*/
public Map<String, QueryGroupLevelResourceUsageView> constructQueryGroupLevelUsageViews() {
final Map<String, List<Task>> tasksByQueryGroup = getTasksGroupedByQueryGroup();
final Map<String, QueryGroupLevelResourceUsageView> queryGroupViews = new HashMap<>();

// Iterate over each QueryGroup entry
for (Map.Entry<String, List<Task>> queryGroupEntry : tasksByQueryGroup.entrySet()) {
// Compute the QueryGroup usage
final EnumMap<ResourceType, Long> queryGroupUsage = new EnumMap<>(ResourceType.class);
for (ResourceType resourceType : TRACKED_RESOURCES) {
long queryGroupResourceUsage = 0;
for (Task task : queryGroupEntry.getValue()) {
queryGroupResourceUsage += resourceType.getResourceUsage(task);
}
queryGroupUsage.put(resourceType, queryGroupResourceUsage);
}

// Add to the QueryGroup View
queryGroupViews.put(
queryGroupEntry.getKey(),
new QueryGroupLevelResourceUsageView(queryGroupUsage, queryGroupEntry.getValue())
);
}
return queryGroupViews;
}

/**
* Groups tasks by their associated QueryGroup.
*
* @return Map of tasks grouped by QueryGroup
*/
private Map<String, List<Task>> getTasksGroupedByQueryGroup() {
return taskResourceTrackingService.getResourceAwareTasks()
.values()
.stream()
.filter(QueryGroupTask.class::isInstance)
.map(QueryGroupTask.class::cast)
.collect(Collectors.groupingBy(QueryGroupTask::getQueryGroupId, Collectors.mapping(task -> (Task) task, Collectors.toList())));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* QueryGroup resource tracking artifacts
*/
package org.opensearch.wlm.tracker;
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search;

import org.opensearch.action.search.SearchShardTask;
import org.opensearch.core.tasks.resourcetracker.ResourceStats;
import org.opensearch.tasks.CancellableTask;
import org.opensearch.test.OpenSearchTestCase;

import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class ResourceTypeTests extends OpenSearchTestCase {

public void testFromName() {
assertSame(ResourceType.CPU, ResourceType.fromName("cpu"));
assertThrows(IllegalArgumentException.class, () -> { ResourceType.fromName("CPU"); });
assertThrows(IllegalArgumentException.class, () -> { ResourceType.fromName("Cpu"); });

assertSame(ResourceType.MEMORY, ResourceType.fromName("memory"));
assertThrows(IllegalArgumentException.class, () -> { ResourceType.fromName("Memory"); });
assertThrows(IllegalArgumentException.class, () -> { ResourceType.fromName("MEMORY"); });
assertThrows(IllegalArgumentException.class, () -> { ResourceType.fromName("JVM"); });
assertThrows(IllegalArgumentException.class, () -> { ResourceType.fromName("Heap"); });
assertThrows(IllegalArgumentException.class, () -> { ResourceType.fromName("Disk"); });
}

public void testGetName() {
assertEquals("cpu", ResourceType.CPU.getName());
assertEquals("memory", ResourceType.MEMORY.getName());
}

public void testGetResourceUsage() {
SearchShardTask mockTask = createMockTask(SearchShardTask.class, 100, 200);
assertEquals(100, ResourceType.CPU.getResourceUsage(mockTask));
assertEquals(200, ResourceType.MEMORY.getResourceUsage(mockTask));
}

private <T extends CancellableTask> T createMockTask(Class<T> type, long cpuUsage, long heapUsage) {
T task = mock(type);
when(task.getTotalResourceUtilization(ResourceStats.CPU)).thenReturn(cpuUsage);
when(task.getTotalResourceUtilization(ResourceStats.MEMORY)).thenReturn(heapUsage);
return task;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.wlm;

import org.opensearch.action.search.SearchAction;
import org.opensearch.core.tasks.TaskId;
import org.opensearch.search.ResourceType;
import org.opensearch.tasks.Task;
import org.opensearch.test.OpenSearchTestCase;

import java.util.Collections;
import java.util.List;
import java.util.Map;

public class QueryGroupLevelResourceUsageViewTests extends OpenSearchTestCase {
Map<ResourceType, Long> resourceUsage;
List<Task> activeTasks;

public void setUp() throws Exception {
super.setUp();
resourceUsage = Map.of(ResourceType.fromName("memory"), 34L, ResourceType.fromName("cpu"), 12L);
activeTasks = List.of(getRandomTask(4321));
}

public void testGetResourceUsageData() {
QueryGroupLevelResourceUsageView queryGroupLevelResourceUsageView = new QueryGroupLevelResourceUsageView(
resourceUsage,
activeTasks
);
Map<ResourceType, Long> resourceUsageData = queryGroupLevelResourceUsageView.getResourceUsageData();
assertTrue(assertResourceUsageData(resourceUsageData));
}

public void testGetActiveTasks() {
QueryGroupLevelResourceUsageView queryGroupLevelResourceUsageView = new QueryGroupLevelResourceUsageView(
resourceUsage,
activeTasks
);
List<Task> activeTasks = queryGroupLevelResourceUsageView.getActiveTasks();
assertEquals(1, activeTasks.size());
assertEquals(4321, activeTasks.get(0).getId());
}

private boolean assertResourceUsageData(Map<ResourceType, Long> resourceUsageData) {
return resourceUsageData.get(ResourceType.fromName("memory")) == 34L && resourceUsageData.get(ResourceType.fromName("cpu")) == 12L;
}

private Task getRandomTask(long id) {
return new Task(
id,
"transport",
SearchAction.NAME,
"test description",
new TaskId(randomLong() + ":" + randomLong()),
Collections.emptyMap()
);
}
}
Loading
Loading