Skip to content

Commit

Permalink
Add threadpool wait time metric
Browse files Browse the repository at this point in the history
Signed-off-by: Jay Deng <[email protected]>
  • Loading branch information
jed326 authored and Jay Deng committed Sep 12, 2023
1 parent b7dbf46 commit e31c964
Show file tree
Hide file tree
Showing 12 changed files with 162 additions and 18 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 2.x]
### Added
- Add metrics for thread_pool task wait time ([#9681](https://github.com/opensearch-project/OpenSearch/pull/9681))

### Dependencies
- Bump `peter-evans/create-or-update-comment` from 2 to 3 ([#9575](https://github.com/opensearch-project/OpenSearch/pull/9575))
Expand All @@ -94,4 +95,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Security

[Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.11...2.x
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.11...2.x
Original file line number Diff line number Diff line change
@@ -1,3 +1,29 @@
"Test cat thread_pool total_wait_time output":
- skip:
version: " - 3.0.0"
reason: thread_pool total_wait_time stats were introduced in V_3.0.0

- do:
cat.thread_pool: {}

- match:
$body: |
/ #node_name name active queue rejected
^ (\S+ \s+ \S+ \s+ \d+ \s+ \d+ \s+ \d+ \n)+ $/
- do:
cat.thread_pool:
thread_pool_patterns: search,search_throttled,index_searcher
h: name,total_wait_time,twt
v: true

- match:
$body: |
/^ id \s+ name \s+ total_wait_time \s+ twt \n
(\S+ \s+ search \s+ \d+s \s+ \d+ \n
\S+ \s+ search_throttled \s+ \d+s \s+ \d+ \n
\S+ \s+ index_searcher \s+ \d+s \s+ \d+ \n)+ $/
---
"Test cat thread_pool output":
- skip:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.search.stats;

import org.opensearch.action.admin.cluster.node.stats.NodesStatsAction;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequestBuilder;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.indices.stats.IndexStats;
import org.opensearch.action.admin.indices.stats.IndicesStatsRequestBuilder;
Expand All @@ -26,6 +28,8 @@
import org.opensearch.search.SearchService;
import org.opensearch.test.InternalSettingsPlugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.threadpool.ThreadPoolStats;

import java.util.Arrays;
import java.util.Collection;
Expand All @@ -35,6 +39,7 @@
import java.util.function.Function;

import static org.opensearch.index.query.QueryBuilders.scriptQuery;
import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;

Expand Down Expand Up @@ -307,6 +312,54 @@ public void testAvgConcurrencyIndexLevel() throws InterruptedException {
assertEquals(expectedConcurrency, stats.getTotal().getSearch().getTotal().getConcurrentAvgSliceCount(), 0);
}

public void testThreadPoolWaitTime() throws Exception {
int NUM_SHARDS = 1;
String INDEX = "test-" + randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
createIndex(
INDEX,
Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, NUM_SHARDS)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build()
);

ensureGreen();

for (int i = 0; i < 10; i++) {
client().prepareIndex(INDEX).setId(Integer.toString(i)).setSource("field", "value" + i).get();
refresh();
}
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING.getKey(), 10))
.execute()
.actionGet();

client().prepareSearch(INDEX).execute().actionGet();

NodesStatsRequestBuilder nodesStatsRequestBuilder = new NodesStatsRequestBuilder(
client().admin().cluster(),
NodesStatsAction.INSTANCE
).setNodesIds().all();
NodesStatsResponse nodesStatsResponse = nodesStatsRequestBuilder.execute().actionGet();
ThreadPoolStats threadPoolStats = nodesStatsResponse.getNodes().get(0).getThreadPool();

for (ThreadPoolStats.Stats stats : threadPoolStats) {
if (stats.getName().equals(ThreadPool.Names.INDEX_SEARCHER)) {
assertThat(stats.getWaitTimeNanos().micros(), greaterThan(0L));
}
}

client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING.getKey(), 2))
.execute()
.actionGet();
}

public static class ScriptedDelayedPlugin extends MockScriptPlugin {
static final String SCRIPT_NAME = "search_timeout";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,4 +205,15 @@ protected Runnable wrapRunnable(Runnable command) {
protected Runnable unwrap(Runnable runnable) {
return contextHolder.unwrap(runnable);
}

/**
* Returns the cumulative wait time of the ThreadPool. If the ThreadPool does not support tracking the cumulative pool wait time
* then this should return -1 which will prevent the value from showing up in {@link org.opensearch.threadpool.ThreadPoolStats}.
* ThreadPools that do support this metric should override this method. For example, {@link QueueResizingOpenSearchThreadPoolExecutor}
* does so using the {@link TimedRunnable} to get the difference between Runnable creation and execution.
*
*/
public long getPoolWaitTimeNanos() {
return -1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.common.util.concurrent;

import org.opensearch.common.ExponentiallyWeightedMovingAverage;
import org.opensearch.common.metrics.CounterMetric;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
Expand All @@ -27,6 +28,7 @@ public final class QueueResizableOpenSearchThreadPoolExecutor extends OpenSearch
private final ResizableBlockingQueue<Runnable> workQueue;
private final Function<Runnable, WrappedRunnable> runnableWrapper;
private final ExponentiallyWeightedMovingAverage executionEWMA;
private final CounterMetric poolWaitTime;

/**
* Create new resizable at runtime thread pool executor
Expand Down Expand Up @@ -101,6 +103,7 @@ public final class QueueResizableOpenSearchThreadPoolExecutor extends OpenSearch
this.workQueue = workQueue;
this.runnableWrapper = runnableWrapper;
this.executionEWMA = new ExponentiallyWeightedMovingAverage(ewmaAlpha, 0);
this.poolWaitTime = new CounterMetric();
}

@Override
Expand Down Expand Up @@ -156,6 +159,7 @@ protected void afterExecute(Runnable r, Throwable t) {
// taskExecutionNanos may be -1 if the task threw an exception
executionEWMA.addValue(taskExecutionNanos);
}
poolWaitTime.inc(timedRunnable.getWaitTimeNanos());
}

/**
Expand All @@ -173,4 +177,9 @@ public synchronized int resize(int capacity) {
capacity
);
}

@Override
public long getPoolWaitTimeNanos() {
return poolWaitTime.count();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.common.ExponentiallyWeightedMovingAverage;
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.common.unit.TimeValue;

import java.util.Locale;
Expand Down Expand Up @@ -66,6 +67,7 @@ public final class QueueResizingOpenSearchThreadPoolExecutor extends OpenSearchT
private final int maxQueueSize;
private final long targetedResponseTimeNanos;
private final ExponentiallyWeightedMovingAverage executionEWMA;
private final CounterMetric poolWaitTime;

private final AtomicLong totalTaskNanos = new AtomicLong(0);
private final AtomicInteger taskCount = new AtomicInteger(0);
Expand Down Expand Up @@ -97,6 +99,7 @@ public final class QueueResizingOpenSearchThreadPoolExecutor extends OpenSearchT
this.maxQueueSize = maxQueueSize;
this.targetedResponseTimeNanos = targetedResponseTime.getNanos();
this.executionEWMA = new ExponentiallyWeightedMovingAverage(EWMA_ALPHA, 0);
this.poolWaitTime = new CounterMetric();
logger.debug(
"thread pool [{}] will adjust queue by [{}] when determining automatic queue size",
getName(),
Expand Down Expand Up @@ -190,6 +193,7 @@ protected void afterExecute(Runnable r, Throwable t) {
// taskExecutionNanos may be -1 if the task threw an exception
executionEWMA.addValue(taskExecutionNanos);
}
poolWaitTime.inc(timedRunnable.getWaitTimeNanos());

if (taskCount.incrementAndGet() == this.tasksPerFrame) {
final long endTimeNs = System.nanoTime();
Expand Down Expand Up @@ -290,4 +294,8 @@ protected void appendThreadPoolExecutorDetails(StringBuilder sb) {
sb.append("adjustment amount = ").append(QUEUE_ADJUSTMENT_AMOUNT).append(", ");
}

@Override
public long getPoolWaitTimeNanos() {
return poolWaitTime.count();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,14 @@ long getTotalExecutionNanos() {
return Math.max(finishTimeNanos - startTimeNanos, 1);
}

long getWaitTimeNanos() {
if (startTimeNanos == -1) {
// There must have been an exception thrown, the total time is unknown (-1)
return -1;
}
return Math.max(startTimeNanos - creationTimeNanos, 1);
}

/**
* If the task was failed or rejected, return true.
* Otherwise, false.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ protected Table getTableWithHeader(final RestRequest request) {
table.addCell("rejected", "alias:r;default:true;text-align:right;desc:number of rejected tasks");
table.addCell("largest", "alias:l;default:false;text-align:right;desc:highest number of seen active threads");
table.addCell("completed", "alias:c;default:false;text-align:right;desc:number of completed tasks");
table.addCell(
"total_wait_time",
"alias:twt;default:false;text-align:right;desc:total time tasks spent waiting in thread_pool queue"
);
table.addCell("core", "alias:cr;default:false;text-align:right;desc:core number of threads in a scaling thread pool");
table.addCell("max", "alias:mx;default:false;text-align:right;desc:maximum number of threads in a scaling thread pool");
table.addCell("size", "alias:sz;default:false;text-align:right;desc:number of threads in a fixed thread pool");
Expand Down Expand Up @@ -267,6 +271,7 @@ private Table buildTable(RestRequest req, ClusterStateResponse state, NodesInfoR
table.addCell(poolStats == null ? null : poolStats.getRejected());
table.addCell(poolStats == null ? null : poolStats.getLargest());
table.addCell(poolStats == null ? null : poolStats.getCompleted());
table.addCell(poolStats == null ? null : poolStats.getWaitTimeNanos());
table.addCell(core);
table.addCell(max);
table.addCell(size);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,19 +383,21 @@ public ThreadPoolStats stats() {
long rejected = -1;
int largest = -1;
long completed = -1;
if (holder.executor() instanceof ThreadPoolExecutor) {
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) holder.executor();
long waitTimeNanos = -1;
if (holder.executor() instanceof OpenSearchThreadPoolExecutor) {
OpenSearchThreadPoolExecutor threadPoolExecutor = (OpenSearchThreadPoolExecutor) holder.executor();
threads = threadPoolExecutor.getPoolSize();
queue = threadPoolExecutor.getQueue().size();
active = threadPoolExecutor.getActiveCount();
largest = threadPoolExecutor.getLargestPoolSize();
completed = threadPoolExecutor.getCompletedTaskCount();
waitTimeNanos = threadPoolExecutor.getPoolWaitTimeNanos();
RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor.getRejectedExecutionHandler();
if (rejectedExecutionHandler instanceof XRejectedExecutionHandler) {
rejected = ((XRejectedExecutionHandler) rejectedExecutionHandler).rejected();
}
}
stats.add(new ThreadPoolStats.Stats(name, threads, queue, active, rejected, largest, completed));
stats.add(new ThreadPoolStats.Stats(name, threads, queue, active, rejected, largest, completed, waitTimeNanos));
}
return new ThreadPoolStats(stats);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

package org.opensearch.threadpool;

import org.opensearch.Version;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
Expand Down Expand Up @@ -65,15 +67,17 @@ public static class Stats implements Writeable, ToXContentFragment, Comparable<S
private final long rejected;
private final int largest;
private final long completed;
private long waitTimeNanos;

public Stats(String name, int threads, int queue, int active, long rejected, int largest, long completed) {
public Stats(String name, int threads, int queue, int active, long rejected, int largest, long completed, long waitTimeNanos) {
this.name = name;
this.threads = threads;
this.queue = queue;
this.active = active;
this.rejected = rejected;
this.largest = largest;
this.completed = completed;
this.waitTimeNanos = waitTimeNanos;
}

public Stats(StreamInput in) throws IOException {
Expand All @@ -84,6 +88,9 @@ public Stats(StreamInput in) throws IOException {
rejected = in.readLong();
largest = in.readInt();
completed = in.readLong();
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
waitTimeNanos = in.readLong();
}
}

@Override
Expand All @@ -95,6 +102,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(rejected);
out.writeInt(largest);
out.writeLong(completed);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeLong(waitTimeNanos);
}
}

public String getName() {
Expand Down Expand Up @@ -125,6 +135,10 @@ public long getCompleted() {
return this.completed;
}

public TimeValue getWaitTimeNanos() {
return TimeValue.timeValueNanos(waitTimeNanos);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(name);
Expand All @@ -146,6 +160,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (completed != -1) {
builder.field(Fields.COMPLETED, completed);
}
if (waitTimeNanos != -1) {
// wait time is tracked in nanos but TimeValue raw value is in millis
builder.humanReadableField(Fields.WAIT_TIME_MILLIS, Fields.WAIT_TIME, getWaitTimeNanos());
}
builder.endObject();
return builder;
}
Expand Down Expand Up @@ -197,6 +215,8 @@ static final class Fields {
static final String REJECTED = "rejected";
static final String LARGEST = "largest";
static final String COMPLETED = "completed";
static final String WAIT_TIME = "total_wait_time";
static final String WAIT_TIME_MILLIS = "total_wait_time_in_millis";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,8 @@ public static NodeStats createNodeStats(boolean remoteStoreStats) {
randomIntBetween(1, 1000),
randomNonNegativeLong(),
randomIntBetween(1, 1000),
randomIntBetween(1, 1000)
randomIntBetween(1, 1000),
randomIntBetween(-1, 10)
)
);
}
Expand Down
Loading

0 comments on commit e31c964

Please sign in to comment.