Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add taskCompletionCount in search_backpressure stats #10028

Merged
merged 12 commits into from
Oct 10, 2023
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Change http code on create index API with bad input raising NotXContentException from 500 to 400 ([#4773](https://github.com/opensearch-project/OpenSearch/pull/4773))
- Improve summary error message for invalid setting updates ([#4792](https://github.com/opensearch-project/OpenSearch/pull/4792))
- Return 409 Conflict HTTP status instead of 503 on failure to concurrently execute snapshots ([#8986](https://github.com/opensearch-project/OpenSearch/pull/5855))
- Add task completion count in search backpressure stats API ([#10028](https://github.com/opensearch-project/OpenSearch/pull/10028/))
- Performance improvement for Datetime field caching ([#4558](https://github.com/opensearch-project/OpenSearch/issues/4558))


### Deprecated

### Removed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ public SearchBackpressureStats nodeStats() {
SearchTaskStats searchTaskStats = new SearchTaskStats(
searchBackpressureStates.get(SearchTask.class).getCancellationCount(),
searchBackpressureStates.get(SearchTask.class).getLimitReachedCount(),
searchBackpressureStates.get(SearchTask.class).getCompletionCount(),
taskTrackers.get(SearchTask.class)
.stream()
.collect(Collectors.toUnmodifiableMap(t -> TaskResourceUsageTrackerType.fromName(t.name()), t -> t.stats(searchTasks)))
Expand All @@ -407,6 +408,7 @@ public SearchBackpressureStats nodeStats() {
SearchShardTaskStats searchShardTaskStats = new SearchShardTaskStats(
searchBackpressureStates.get(SearchShardTask.class).getCancellationCount(),
searchBackpressureStates.get(SearchShardTask.class).getLimitReachedCount(),
searchBackpressureStates.get(SearchShardTask.class).getCompletionCount(),
taskTrackers.get(SearchShardTask.class)
.stream()
.collect(Collectors.toUnmodifiableMap(t -> TaskResourceUsageTrackerType.fromName(t.name()), t -> t.stats(searchShardTasks)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.search.backpressure.stats;

import org.opensearch.Version;
import org.opensearch.common.collect.MapBuilder;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
Expand All @@ -30,21 +31,29 @@
public class SearchShardTaskStats implements ToXContentObject, Writeable {
private final long cancellationCount;
private final long limitReachedCount;
private final long completionCount;
private final Map<TaskResourceUsageTrackerType, TaskResourceUsageTracker.Stats> resourceUsageTrackerStats;

public SearchShardTaskStats(
long cancellationCount,
long limitReachedCount,
long completionCount,
Map<TaskResourceUsageTrackerType, TaskResourceUsageTracker.Stats> resourceUsageTrackerStats
) {
this.cancellationCount = cancellationCount;
this.limitReachedCount = limitReachedCount;
this.completionCount = completionCount;
this.resourceUsageTrackerStats = resourceUsageTrackerStats;
}

public SearchShardTaskStats(StreamInput in) throws IOException {
this.cancellationCount = in.readVLong();
this.limitReachedCount = in.readVLong();
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
completionCount = in.readVLong();
} else {
completionCount = -1;

Check warning on line 55 in server/src/main/java/org/opensearch/search/backpressure/stats/SearchShardTaskStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/backpressure/stats/SearchShardTaskStats.java#L55

Added line #L55 was not covered by tests
}

MapBuilder<TaskResourceUsageTrackerType, TaskResourceUsageTracker.Stats> builder = new MapBuilder<>();
builder.put(TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, in.readOptionalWriteable(CpuUsageTracker.Stats::new));
Expand All @@ -62,6 +71,9 @@
builder.field(entry.getKey().getName(), entry.getValue());
}
builder.endObject();
if (completionCount != -1) {
builder.field("completion_count", completionCount);

Check warning on line 75 in server/src/main/java/org/opensearch/search/backpressure/stats/SearchShardTaskStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/backpressure/stats/SearchShardTaskStats.java#L75

Added line #L75 was not covered by tests
}

builder.startObject("cancellation_stats")
.field("cancellation_count", cancellationCount)
Expand All @@ -75,6 +87,9 @@
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(cancellationCount);
out.writeVLong(limitReachedCount);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeVLong(completionCount);
}

out.writeOptionalWriteable(resourceUsageTrackerStats.get(TaskResourceUsageTrackerType.CPU_USAGE_TRACKER));
out.writeOptionalWriteable(resourceUsageTrackerStats.get(TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER));
Expand All @@ -88,11 +103,12 @@
SearchShardTaskStats that = (SearchShardTaskStats) o;
return cancellationCount == that.cancellationCount
&& limitReachedCount == that.limitReachedCount
&& completionCount == that.completionCount
&& resourceUsageTrackerStats.equals(that.resourceUsageTrackerStats);
}

@Override
public int hashCode() {
return Objects.hash(cancellationCount, limitReachedCount, resourceUsageTrackerStats);
return Objects.hash(cancellationCount, limitReachedCount, resourceUsageTrackerStats, completionCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.search.backpressure.stats;

import org.opensearch.Version;
import org.opensearch.common.collect.MapBuilder;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
Expand All @@ -31,21 +32,29 @@
public class SearchTaskStats implements ToXContentObject, Writeable {
private final long cancellationCount;
private final long limitReachedCount;
private final long completionCount;
private final Map<TaskResourceUsageTrackerType, TaskResourceUsageTracker.Stats> resourceUsageTrackerStats;

public SearchTaskStats(
long cancellationCount,
long limitReachedCount,
long completionCount,
Map<TaskResourceUsageTrackerType, TaskResourceUsageTracker.Stats> resourceUsageTrackerStats
) {
this.cancellationCount = cancellationCount;
this.limitReachedCount = limitReachedCount;
this.completionCount = completionCount;
this.resourceUsageTrackerStats = resourceUsageTrackerStats;
}

public SearchTaskStats(StreamInput in) throws IOException {
this.cancellationCount = in.readVLong();
this.limitReachedCount = in.readVLong();
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
this.completionCount = in.readVLong();
} else {
this.completionCount = -1;

Check warning on line 56 in server/src/main/java/org/opensearch/search/backpressure/stats/SearchTaskStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/backpressure/stats/SearchTaskStats.java#L56

Added line #L56 was not covered by tests
}

MapBuilder<TaskResourceUsageTrackerType, TaskResourceUsageTracker.Stats> builder = new MapBuilder<>();
builder.put(TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, in.readOptionalWriteable(CpuUsageTracker.Stats::new));
Expand All @@ -63,6 +72,9 @@
builder.field(entry.getKey().getName(), entry.getValue());
}
builder.endObject();
if (completionCount != -1) {
builder.field("completion_count", completionCount);

Check warning on line 76 in server/src/main/java/org/opensearch/search/backpressure/stats/SearchTaskStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/backpressure/stats/SearchTaskStats.java#L76

Added line #L76 was not covered by tests
}

builder.startObject("cancellation_stats")
.field("cancellation_count", cancellationCount)
Expand All @@ -76,6 +88,9 @@
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(cancellationCount);
out.writeVLong(limitReachedCount);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeVLong(completionCount);
}

out.writeOptionalWriteable(resourceUsageTrackerStats.get(TaskResourceUsageTrackerType.CPU_USAGE_TRACKER));
out.writeOptionalWriteable(resourceUsageTrackerStats.get(TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER));
Expand All @@ -89,11 +104,12 @@
SearchTaskStats that = (SearchTaskStats) o;
return cancellationCount == that.cancellationCount
&& limitReachedCount == that.limitReachedCount
&& completionCount == that.completionCount
&& resourceUsageTrackerStats.equals(that.resourceUsageTrackerStats);
}

@Override
public int hashCode() {
return Objects.hash(cancellationCount, limitReachedCount, resourceUsageTrackerStats);
return Objects.hash(cancellationCount, limitReachedCount, resourceUsageTrackerStats, completionCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -249,10 +249,11 @@ public void testSearchTaskInFlightCancellation() {
verify(mockTaskManager, times(10)).cancelTaskAndDescendants(any(), anyString(), anyBoolean(), any());
assertEquals(3, service.getSearchBackpressureState(SearchTask.class).getLimitReachedCount());

// Verify search backpressure stats.
// Verify search backpressure stats. Since we are not marking any task as completed the completionCount will be 0
// for SearchTaskStats here.
SearchBackpressureStats expectedStats = new SearchBackpressureStats(
new SearchTaskStats(10, 3, Map.of(TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, new MockStats(10))),
new SearchShardTaskStats(0, 0, Collections.emptyMap()),
new SearchTaskStats(10, 3, 0, Map.of(TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, new MockStats(10))),
new SearchShardTaskStats(0, 0, 0, Collections.emptyMap()),
SearchBackpressureMode.ENFORCED
);
SearchBackpressureStats actualStats = service.nodeStats();
Expand Down Expand Up @@ -323,10 +324,11 @@ public void testSearchShardTaskInFlightCancellation() {
verify(mockTaskManager, times(12)).cancelTaskAndDescendants(any(), anyString(), anyBoolean(), any());
assertEquals(3, service.getSearchBackpressureState(SearchShardTask.class).getLimitReachedCount());

// Verify search backpressure stats.
// Verify search backpressure stats. We are marking 20 SearchShardTasks as completed this should get
// reflected in SearchShardTaskStats.
SearchBackpressureStats expectedStats = new SearchBackpressureStats(
new SearchTaskStats(0, 0, Collections.emptyMap()),
new SearchShardTaskStats(12, 3, Map.of(TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, new MockStats(12))),
new SearchTaskStats(0, 0, 0, Collections.emptyMap()),
new SearchShardTaskStats(12, 3, 20, Map.of(TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, new MockStats(12))),
SearchBackpressureMode.ENFORCED
);
SearchBackpressureStats actualStats = service.nodeStats();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ public static SearchShardTaskStats randomInstance() {
new ElapsedTimeTracker.Stats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong())
);

return new SearchShardTaskStats(randomNonNegativeLong(), randomNonNegativeLong(), resourceUsageTrackerStats);
return new SearchShardTaskStats(
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
resourceUsageTrackerStats
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,6 @@ public static SearchTaskStats randomInstance() {
new ElapsedTimeTracker.Stats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong())
);

return new SearchTaskStats(randomNonNegativeLong(), randomNonNegativeLong(), resourceUsageTrackerStats);
return new SearchTaskStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), resourceUsageTrackerStats);
}
}
Loading