Skip to content

Commit

Permalink
Add threadpool wait time metric
Browse files Browse the repository at this point in the history
Signed-off-by: Jay Deng <[email protected]>
  • Loading branch information
jed326 authored and Jay Deng committed Sep 8, 2023
1 parent caf4c80 commit 68a9d19
Show file tree
Hide file tree
Showing 10 changed files with 128 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Segment Replication] Adding segment replication statistics rolled up at index, node and cluster level ([#9709](https://github.com/opensearch-project/OpenSearch/pull/9709))
- [Remote Store] Changes to introduce repository registration during bootstrap via node attributes. ([#9105](https://github.com/opensearch-project/OpenSearch/pull/9105))
- [Remote state] Auto restore index metadata from last known cluster state ([#9831](https://github.com/opensearch-project/OpenSearch/pull/9831))
- Add metrics for thread_pool task wait time ([#9681](https://github.com/opensearch-project/OpenSearch/pull/9681))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.search.stats;

import org.opensearch.action.admin.cluster.node.stats.NodesStatsAction;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequestBuilder;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.indices.stats.IndexStats;
import org.opensearch.action.admin.indices.stats.IndicesStatsRequestBuilder;
Expand All @@ -26,6 +28,7 @@
import org.opensearch.search.SearchService;
import org.opensearch.test.InternalSettingsPlugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.threadpool.ThreadPoolStats;

import java.util.Arrays;
import java.util.Collection;
Expand All @@ -35,6 +38,7 @@
import java.util.function.Function;

import static org.opensearch.index.query.QueryBuilders.scriptQuery;
import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;

Expand Down Expand Up @@ -307,6 +311,54 @@ public void testAvgConcurrencyIndexLevel() throws InterruptedException {
assertEquals(expectedConcurrency, stats.getTotal().getSearch().getTotal().getConcurrentAvgSliceCount(), 0);
}

public void testThreadPoolWaitTime() throws Exception {
int NUM_SHARDS = 1;
String INDEX = "test-" + randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
createIndex(
INDEX,
Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, NUM_SHARDS)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build()
);

ensureGreen();

for (int i = 0; i < 10; i++) {
client().prepareIndex(INDEX).setId(Integer.toString(i)).setSource("field", "value" + i).get();
refresh();
}
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING.getKey(), 10))
.execute()
.actionGet();

client().prepareSearch(INDEX).execute().actionGet();

NodesStatsRequestBuilder nodesStatsRequestBuilder = new NodesStatsRequestBuilder(
client().admin().cluster(),
NodesStatsAction.INSTANCE
).setNodesIds().all();
NodesStatsResponse nodesStatsResponse = nodesStatsRequestBuilder.execute().actionGet();
ThreadPoolStats threadPoolStats = nodesStatsResponse.getNodes().get(0).getThreadPool();

for (ThreadPoolStats.Stats stats : threadPoolStats) {
if (stats.getName().equals("index_searcher")) {
assertThat(stats.getPoolWaitTime().micros(), greaterThan(0L));
}
}

client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING.getKey(), 2))
.execute()
.actionGet();
}

public static class ScriptedDelayedPlugin extends MockScriptPlugin {
static final String SCRIPT_NAME = "search_timeout";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,4 +205,15 @@ protected Runnable wrapRunnable(Runnable command) {
protected Runnable unwrap(Runnable runnable) {
return contextHolder.unwrap(runnable);
}

/**
* Returns the cumulative wait time of the ThreadPool. If the ThreadPool does not support tracking the cumulative pool wait time
* then this should return -1 which will prevent the value from showing up in {@link org.opensearch.threadpool.ThreadPoolStats}.
* ThreadPools that do support this metric should override this method. For example, {@link QueueResizingOpenSearchThreadPoolExecutor}
* does so using the {@link TimedRunnable} to get the difference between Runnable creation and execution.
*
*/
public long getPoolWaitTime() {
return -1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.common.util.concurrent;

import org.opensearch.common.ExponentiallyWeightedMovingAverage;
import org.opensearch.common.metrics.CounterMetric;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
Expand All @@ -27,6 +28,7 @@ public final class QueueResizableOpenSearchThreadPoolExecutor extends OpenSearch
private final ResizableBlockingQueue<Runnable> workQueue;
private final Function<Runnable, WrappedRunnable> runnableWrapper;
private final ExponentiallyWeightedMovingAverage executionEWMA;
private final CounterMetric poolWaitTime;

/**
* Create new resizable at runtime thread pool executor
Expand Down Expand Up @@ -101,6 +103,7 @@ public final class QueueResizableOpenSearchThreadPoolExecutor extends OpenSearch
this.workQueue = workQueue;
this.runnableWrapper = runnableWrapper;
this.executionEWMA = new ExponentiallyWeightedMovingAverage(ewmaAlpha, 0);
this.poolWaitTime = new CounterMetric();
}

@Override
Expand Down Expand Up @@ -156,6 +159,7 @@ protected void afterExecute(Runnable r, Throwable t) {
// taskExecutionNanos may be -1 if the task threw an exception
executionEWMA.addValue(taskExecutionNanos);
}
poolWaitTime.inc(timedRunnable.getWaitTimeNanos());
}

/**
Expand All @@ -173,4 +177,9 @@ public synchronized int resize(int capacity) {
capacity
);
}

@Override
public long getPoolWaitTime() {
return poolWaitTime.count();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.common.ExponentiallyWeightedMovingAverage;
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.common.unit.TimeValue;

import java.util.Locale;
Expand Down Expand Up @@ -66,6 +67,7 @@ public final class QueueResizingOpenSearchThreadPoolExecutor extends OpenSearchT
private final int maxQueueSize;
private final long targetedResponseTimeNanos;
private final ExponentiallyWeightedMovingAverage executionEWMA;
private final CounterMetric poolWaitTime;

private final AtomicLong totalTaskNanos = new AtomicLong(0);
private final AtomicInteger taskCount = new AtomicInteger(0);
Expand Down Expand Up @@ -97,6 +99,7 @@ public final class QueueResizingOpenSearchThreadPoolExecutor extends OpenSearchT
this.maxQueueSize = maxQueueSize;
this.targetedResponseTimeNanos = targetedResponseTime.getNanos();
this.executionEWMA = new ExponentiallyWeightedMovingAverage(EWMA_ALPHA, 0);
this.poolWaitTime = new CounterMetric();
logger.debug(
"thread pool [{}] will adjust queue by [{}] when determining automatic queue size",
getName(),
Expand Down Expand Up @@ -190,6 +193,7 @@ protected void afterExecute(Runnable r, Throwable t) {
// taskExecutionNanos may be -1 if the task threw an exception
executionEWMA.addValue(taskExecutionNanos);
}
poolWaitTime.inc(timedRunnable.getWaitTimeNanos());

if (taskCount.incrementAndGet() == this.tasksPerFrame) {
final long endTimeNs = System.nanoTime();
Expand Down Expand Up @@ -290,4 +294,8 @@ protected void appendThreadPoolExecutorDetails(StringBuilder sb) {
sb.append("adjustment amount = ").append(QUEUE_ADJUSTMENT_AMOUNT).append(", ");
}

@Override
public long getPoolWaitTime() {
return poolWaitTime.count();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,14 @@ long getTotalExecutionNanos() {
return Math.max(finishTimeNanos - startTimeNanos, 1);
}

long getWaitTimeNanos() {
if (startTimeNanos == -1) {
// There must have been an exception thrown, the total time is unknown (-1)
return -1;
}
return Math.max(startTimeNanos - creationTimeNanos, 1);
}

/**
* If the task was failed or rejected, return true.
* Otherwise, false.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,19 +383,21 @@ public ThreadPoolStats stats() {
long rejected = -1;
int largest = -1;
long completed = -1;
if (holder.executor() instanceof ThreadPoolExecutor) {
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) holder.executor();
long waitTime = -1;
if (holder.executor() instanceof OpenSearchThreadPoolExecutor) {
OpenSearchThreadPoolExecutor threadPoolExecutor = (OpenSearchThreadPoolExecutor) holder.executor();
threads = threadPoolExecutor.getPoolSize();
queue = threadPoolExecutor.getQueue().size();
active = threadPoolExecutor.getActiveCount();
largest = threadPoolExecutor.getLargestPoolSize();
completed = threadPoolExecutor.getCompletedTaskCount();
waitTime = threadPoolExecutor.getPoolWaitTime();
RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor.getRejectedExecutionHandler();
if (rejectedExecutionHandler instanceof XRejectedExecutionHandler) {
rejected = ((XRejectedExecutionHandler) rejectedExecutionHandler).rejected();
}
}
stats.add(new ThreadPoolStats.Stats(name, threads, queue, active, rejected, largest, completed));
stats.add(new ThreadPoolStats.Stats(name, threads, queue, active, rejected, largest, completed, waitTime));
}
return new ThreadPoolStats(stats);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

package org.opensearch.threadpool;

import org.opensearch.Version;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
Expand All @@ -43,6 +45,7 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
* Stats for a threadpool
Expand All @@ -65,15 +68,17 @@ public static class Stats implements Writeable, ToXContentFragment, Comparable<S
private final long rejected;
private final int largest;
private final long completed;
private long poolWaitTime;

public Stats(String name, int threads, int queue, int active, long rejected, int largest, long completed) {
public Stats(String name, int threads, int queue, int active, long rejected, int largest, long completed, long poolWaitTime) {
this.name = name;
this.threads = threads;
this.queue = queue;
this.active = active;
this.rejected = rejected;
this.largest = largest;
this.completed = completed;
this.poolWaitTime = poolWaitTime;
}

public Stats(StreamInput in) throws IOException {
Expand All @@ -84,6 +89,9 @@ public Stats(StreamInput in) throws IOException {
rejected = in.readLong();
largest = in.readInt();
completed = in.readLong();
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
poolWaitTime = in.readLong();
}
}

@Override
Expand All @@ -95,6 +103,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(rejected);
out.writeInt(largest);
out.writeLong(completed);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeLong(poolWaitTime);
}
}

public String getName() {
Expand Down Expand Up @@ -125,6 +136,10 @@ public long getCompleted() {
return this.completed;
}

public TimeValue getPoolWaitTime() {
return new TimeValue(poolWaitTime, TimeUnit.NANOSECONDS);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(name);
Expand All @@ -146,6 +161,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (completed != -1) {
builder.field(Fields.COMPLETED, completed);
}
if (poolWaitTime != -1) {
builder.field(Fields.WAIT_TIME, getPoolWaitTime());
}
builder.endObject();
return builder;
}
Expand Down Expand Up @@ -197,6 +215,7 @@ static final class Fields {
static final String REJECTED = "rejected";
static final String LARGEST = "largest";
static final String COMPLETED = "completed";
static final String WAIT_TIME = "thread_pool_wait_time";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,8 @@ public static NodeStats createNodeStats(boolean remoteStoreStats) {
randomIntBetween(1, 1000),
randomNonNegativeLong(),
randomIntBetween(1, 1000),
randomIntBetween(1, 1000)
randomIntBetween(1, 1000),
randomIntBetween(-1, 10)
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@
public class ThreadPoolStatsTests extends OpenSearchTestCase {
public void testThreadPoolStatsSort() throws IOException {
List<ThreadPoolStats.Stats> stats = new ArrayList<>();
stats.add(new ThreadPoolStats.Stats("z", -1, 0, 0, 0, 0, 0L));
stats.add(new ThreadPoolStats.Stats("m", 3, 0, 0, 0, 0, 0L));
stats.add(new ThreadPoolStats.Stats("m", 1, 0, 0, 0, 0, 0L));
stats.add(new ThreadPoolStats.Stats("d", -1, 0, 0, 0, 0, 0L));
stats.add(new ThreadPoolStats.Stats("m", 2, 0, 0, 0, 0, 0L));
stats.add(new ThreadPoolStats.Stats("t", -1, 0, 0, 0, 0, 0L));
stats.add(new ThreadPoolStats.Stats("a", -1, 0, 0, 0, 0, 0L));
stats.add(new ThreadPoolStats.Stats("z", -1, 0, 0, 0, 0, 0L, 0L));
stats.add(new ThreadPoolStats.Stats("m", 3, 0, 0, 0, 0, 0L, 0L));
stats.add(new ThreadPoolStats.Stats("m", 1, 0, 0, 0, 0, 0L, 0L));
stats.add(new ThreadPoolStats.Stats("d", -1, 0, 0, 0, 0, 0L, 0L));
stats.add(new ThreadPoolStats.Stats("m", 2, 0, 0, 0, 0, 0L, 0L));
stats.add(new ThreadPoolStats.Stats("t", -1, 0, 0, 0, 0, 0L, 0L));
stats.add(new ThreadPoolStats.Stats("a", -1, 0, 0, 0, 0, 0L, 0L));

List<ThreadPoolStats.Stats> copy = new ArrayList<>(stats);
Collections.sort(copy);
Expand All @@ -79,11 +79,11 @@ public void testThreadPoolStatsToXContent() throws IOException {
try (BytesStreamOutput os = new BytesStreamOutput()) {

List<ThreadPoolStats.Stats> stats = new ArrayList<>();
stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.SEARCH, -1, 0, 0, 0, 0, 0L));
stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.WARMER, -1, 0, 0, 0, 0, 0L));
stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.GENERIC, -1, 0, 0, 0, 0, 0L));
stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.FORCE_MERGE, -1, 0, 0, 0, 0, 0L));
stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.SAME, -1, 0, 0, 0, 0, 0L));
stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.SEARCH, -1, 0, 0, 0, 0, 0L, 0L));
stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.WARMER, -1, 0, 0, 0, 0, 0L, -1L));
stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.GENERIC, -1, 0, 0, 0, 0, 0L, -1L));
stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.FORCE_MERGE, -1, 0, 0, 0, 0, 0L, -1L));
stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.SAME, -1, 0, 0, 0, 0, 0L, -1L));

ThreadPoolStats threadPoolStats = new ThreadPoolStats(stats);
try (XContentBuilder builder = new XContentBuilder(MediaTypeRegistry.JSON.xContent(), os)) {
Expand Down

0 comments on commit 68a9d19

Please sign in to comment.