Skip to content

Commit

Permalink
Concurrent Search Tasks Response Updates (opensearch-project#7673)
Browse files Browse the repository at this point in the history
* Add average,min,max and thread info to tasks response

Signed-off-by: Jay Deng <[email protected]>

* Update server/src/main/java/org/opensearch/tasks/TaskResourceStats.java

Signed-off-by: Jay Deng <[email protected]>

---------

Signed-off-by: Jay Deng <[email protected]>
  • Loading branch information
jed326 authored Jun 1, 2023
1 parent a9b53e6 commit e3740f7
Show file tree
Hide file tree
Showing 11 changed files with 311 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

### Changed
- Replace jboss-annotations-api_1.2_spec with jakarta.annotation-api ([#7836](https://github.com/opensearch-project/OpenSearch/pull/7836))
- Add min, max, average and thread info to resource stats in tasks API ([#7673](https://github.com/opensearch-project/OpenSearch/pull/7673))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskId;
import org.opensearch.tasks.TaskInfo;
import org.opensearch.tasks.TaskThreadUsage;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
Expand Down Expand Up @@ -138,12 +139,12 @@ private static RawTaskStatus randomRawTaskStatus() {
}

private static TaskResourceStats randomResourceStats() {
return randomBoolean() ? null : new TaskResourceStats(new HashMap<String, TaskResourceUsage>() {
return randomBoolean() ? null : new TaskResourceStats(new HashMap<>() {
{
for (int i = 0; i < randomInt(5); i++) {
put(randomAlphaOfLength(5), new TaskResourceUsage(randomNonNegativeLong(), randomNonNegativeLong()));
}
}
});
}, new TaskThreadUsage(randomInt(10), randomInt(10)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,13 @@
- do:
tasks.list:
group_by: parents
detailed: true
- set:
tasks._arbitrary_key_: task_id

- is_true: tasks
- is_true: tasks.$task_id.resource_stats
- is_true: tasks.$task_id.resource_stats.total

---
"tasks_list headers":
Expand All @@ -32,3 +37,21 @@

- is_true: tasks
- match: { tasks.0.headers.X-Opaque-Id: "That is me" }

---
"tasks_list detailed":
- skip:
version: " - 2.99.99"
reason: thread_info was introduced in 3.0.0

- do:
tasks.list:
group_by: parents
detailed: true
- set:
tasks._arbitrary_key_: task_id

- is_true: tasks
- is_true: tasks.$task_id.resource_stats
- is_true: tasks.$task_id.resource_stats.thread_info
- is_true: tasks.$task_id.resource_stats.total
104 changes: 103 additions & 1 deletion server/src/main/java/org/opensearch/tasks/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ public class Task {

private static final String TOTAL = "total";

private static final String AVERAGE = "average";

private static final String MIN = "min";

private static final String MAX = "max";

public static final String THREAD_INFO = "thread_info";

private final long id;

private final String type;
Expand Down Expand Up @@ -175,8 +183,11 @@ private TaskInfo taskInfo(String localNodeId, boolean detailed, boolean excludeS
resourceStats = new TaskResourceStats(new HashMap<>() {
{
put(TOTAL, getTotalResourceStats());
put(AVERAGE, getAverageResourceStats());
put(MIN, getMinResourceStats());
put(MAX, getMaxResourceStats());
}
});
}, getThreadUsage());
}
return taskInfo(localNodeId, description, status, resourceStats);
}
Expand Down Expand Up @@ -289,6 +300,27 @@ public TaskResourceUsage getTotalResourceStats() {
return new TaskResourceUsage(getTotalResourceUtilization(ResourceStats.CPU), getTotalResourceUtilization(ResourceStats.MEMORY));
}

/**
* Returns current average per-execution resource usage of the task.
*/
public TaskResourceUsage getAverageResourceStats() {
return new TaskResourceUsage(getAverageResourceUtilization(ResourceStats.CPU), getAverageResourceUtilization(ResourceStats.MEMORY));
}

/**
* Returns current min per-execution resource usage of the task.
*/
public TaskResourceUsage getMinResourceStats() {
return new TaskResourceUsage(getMinResourceUtilization(ResourceStats.CPU), getMinResourceUtilization(ResourceStats.MEMORY));
}

/**
* Returns current max per-execution resource usage of the task.
*/
public TaskResourceUsage getMaxResourceStats() {
return new TaskResourceUsage(getMaxResourceUtilization(ResourceStats.CPU), getMaxResourceUtilization(ResourceStats.MEMORY));
}

/**
* Returns total resource consumption for a specific task stat.
*/
Expand All @@ -305,6 +337,76 @@ public long getTotalResourceUtilization(ResourceStats stats) {
return totalResourceConsumption;
}

/**
* Returns average per-execution resource consumption for a specific task stat.
*/
private long getAverageResourceUtilization(ResourceStats stats) {
long totalResourceConsumption = 0L;
int threadResourceInfoCount = 0;
for (List<ThreadResourceInfo> threadResourceInfosList : resourceStats.values()) {
for (ThreadResourceInfo threadResourceInfo : threadResourceInfosList) {
final ResourceUsageInfo.ResourceStatsInfo statsInfo = threadResourceInfo.getResourceUsageInfo().getStatsInfo().get(stats);
if (threadResourceInfo.getStatsType().isOnlyForAnalysis() == false && statsInfo != null) {
totalResourceConsumption += statsInfo.getTotalValue();
threadResourceInfoCount++;
}
}
}
return (threadResourceInfoCount > 0) ? totalResourceConsumption / threadResourceInfoCount : 0;
}

/**
* Returns minimum per-execution resource consumption for a specific task stat.
*/
private long getMinResourceUtilization(ResourceStats stats) {
if (resourceStats.size() == 0) {
return 0L;
}
long minResourceConsumption = Long.MAX_VALUE;
for (List<ThreadResourceInfo> threadResourceInfosList : resourceStats.values()) {
for (ThreadResourceInfo threadResourceInfo : threadResourceInfosList) {
final ResourceUsageInfo.ResourceStatsInfo statsInfo = threadResourceInfo.getResourceUsageInfo().getStatsInfo().get(stats);
if (threadResourceInfo.getStatsType().isOnlyForAnalysis() == false && statsInfo != null) {
minResourceConsumption = Math.min(minResourceConsumption, statsInfo.getTotalValue());
}
}
}
return minResourceConsumption;
}

/**
* Returns maximum per-execution resource consumption for a specific task stat.
*/
private long getMaxResourceUtilization(ResourceStats stats) {
long maxResourceConsumption = 0L;
for (List<ThreadResourceInfo> threadResourceInfosList : resourceStats.values()) {
for (ThreadResourceInfo threadResourceInfo : threadResourceInfosList) {
final ResourceUsageInfo.ResourceStatsInfo statsInfo = threadResourceInfo.getResourceUsageInfo().getStatsInfo().get(stats);
if (threadResourceInfo.getStatsType().isOnlyForAnalysis() == false && statsInfo != null) {
maxResourceConsumption = Math.max(maxResourceConsumption, statsInfo.getTotalValue());
}
}
}
return maxResourceConsumption;
}

/**
* Returns the total and active number of thread executions for the task.
*/
public TaskThreadUsage getThreadUsage() {
int numThreadExecutions = 0;
int activeThreads = 0;
for (List<ThreadResourceInfo> threadResourceInfosList : resourceStats.values()) {
numThreadExecutions += threadResourceInfosList.size();
for (ThreadResourceInfo threadResourceInfo : threadResourceInfosList) {
if (threadResourceInfo.isActive()) {
activeThreads++;
}
}
}
return new TaskThreadUsage(numThreadExecutions, activeThreads);
}

/**
* Adds thread's starting resource consumption information
* @param threadId ID of the thread
Expand Down
36 changes: 32 additions & 4 deletions server/src/main/java/org/opensearch/tasks/TaskResourceStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.tasks;

import org.opensearch.Version;
import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
Expand All @@ -22,6 +23,8 @@
import java.util.Map;
import java.util.Objects;

import static org.opensearch.tasks.Task.THREAD_INFO;

/**
* Resource information about a currently running task.
* <p>
Expand All @@ -32,27 +35,42 @@
*/
public class TaskResourceStats implements Writeable, ToXContentFragment {
private final Map<String, TaskResourceUsage> resourceUsage;
private final TaskThreadUsage threadUsage;

public TaskResourceStats(Map<String, TaskResourceUsage> resourceUsage) {
public TaskResourceStats(Map<String, TaskResourceUsage> resourceUsage, TaskThreadUsage threadUsage) {
this.resourceUsage = Objects.requireNonNull(resourceUsage, "resource usage is required");
this.threadUsage = Objects.requireNonNull(threadUsage, "thread usage is required");
}

/**
* Read from a stream.
*/
public TaskResourceStats(StreamInput in) throws IOException {
resourceUsage = in.readMap(StreamInput::readString, TaskResourceUsage::readFromStream);
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
threadUsage = TaskThreadUsage.readFromStream(in);
} else {
// Initialize TaskThreadUsage in case it is not found in mixed cluster case
threadUsage = new TaskThreadUsage(0, 0);
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(resourceUsage, StreamOutput::writeString, (stream, stats) -> stats.writeTo(stream));
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
threadUsage.writeTo(out);
}
}

public Map<String, TaskResourceUsage> getResourceUsageInfo() {
return resourceUsage;
}

public TaskThreadUsage getThreadUsage() {
return threadUsage;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
for (Map.Entry<String, TaskResourceUsage> resourceUsageEntry : resourceUsage.entrySet()) {
Expand All @@ -62,6 +80,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}
builder.endObject();
}
builder.startObject(THREAD_INFO);
threadUsage.toXContent(builder, params);
builder.endObject();
return builder;
}

Expand All @@ -74,17 +95,24 @@ public static TaskResourceStats fromXContent(XContentParser parser) throws IOExc
token = parser.nextToken();
}
final Map<String, TaskResourceUsage> resourceStats = new HashMap<>();
// Initialize TaskThreadUsage in case it is not found in mixed cluster case
TaskThreadUsage threadUsage = new TaskThreadUsage(0, 0);
if (token == XContentParser.Token.FIELD_NAME) {
assert parser.currentToken() == XContentParser.Token.FIELD_NAME : "Expected field name but saw [" + parser.currentToken() + "]";
do {
// Must point to field name
String fieldName = parser.currentName();
// And then the value
TaskResourceUsage value = TaskResourceUsage.fromXContent(parser);
resourceStats.put(fieldName, value);

if (fieldName.equals(THREAD_INFO)) {
threadUsage = TaskThreadUsage.fromXContent(parser);
} else {
TaskResourceUsage value = TaskResourceUsage.fromXContent(parser);
resourceStats.put(fieldName, value);
}
} while (parser.nextToken() == XContentParser.Token.FIELD_NAME);
}
return new TaskResourceStats(resourceStats);
return new TaskResourceStats(resourceStats, threadUsage);
}

@Override
Expand Down
Loading

0 comments on commit e3740f7

Please sign in to comment.