From 70fd8b22753819c789679a5457ece6c13fde91fb Mon Sep 17 00:00:00 2001 From: Jay Deng Date: Mon, 11 Sep 2023 14:51:01 -0700 Subject: [PATCH] Add threadpool wait time metric Signed-off-by: Jay Deng --- CHANGELOG.md | 3 +- .../search/stats/ConcurrentSearchStatsIT.java | 52 +++++++++++++++++++ .../OpenSearchThreadPoolExecutor.java | 11 ++++ ...ResizableOpenSearchThreadPoolExecutor.java | 9 ++++ ...eResizingOpenSearchThreadPoolExecutor.java | 8 +++ .../common/util/concurrent/TimedRunnable.java | 8 +++ .../rest/action/cat/RestThreadPoolAction.java | 5 ++ .../org/opensearch/threadpool/ThreadPool.java | 8 +-- .../threadpool/ThreadPoolStats.java | 21 +++++++- .../cluster/node/stats/NodeStatsTests.java | 3 +- .../threadpool/ThreadPoolStatsTests.java | 24 ++++----- 11 files changed, 134 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1edd7bb20de2e..a04d43c4f08d2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -75,6 +75,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## [Unreleased 2.x] ### Added +- Add metrics for thread_pool task wait time ([#9681](https://github.com/opensearch-project/OpenSearch/pull/9681)) ### Dependencies - Bump `peter-evans/create-or-update-comment` from 2 to 3 ([#9575](https://github.com/opensearch-project/OpenSearch/pull/9575)) @@ -94,4 +95,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Security [Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD -[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.11...2.x \ No newline at end of file +[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.11...2.x diff --git a/server/src/internalClusterTest/java/org/opensearch/search/stats/ConcurrentSearchStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/search/stats/ConcurrentSearchStatsIT.java index 350f494c35366..b07e195b2f7d0 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/stats/ConcurrentSearchStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/stats/ConcurrentSearchStatsIT.java @@ -8,6 +8,8 @@ package org.opensearch.search.stats; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsAction; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequestBuilder; import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.opensearch.action.admin.indices.stats.IndexStats; import org.opensearch.action.admin.indices.stats.IndicesStatsRequestBuilder; @@ -26,6 +28,7 @@ import org.opensearch.search.SearchService; import org.opensearch.test.InternalSettingsPlugin; import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.threadpool.ThreadPoolStats; import java.util.Arrays; import java.util.Collection; @@ -35,6 +38,7 @@ import java.util.function.Function; import static org.opensearch.index.query.QueryBuilders.scriptQuery; +import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.lessThan; @@ -307,6 +311,54 @@ public void testAvgConcurrencyIndexLevel() throws InterruptedException { assertEquals(expectedConcurrency, stats.getTotal().getSearch().getTotal().getConcurrentAvgSliceCount(), 0); } + public void testThreadPoolWaitTime() throws Exception { + int NUM_SHARDS = 1; + String INDEX = "test-" + randomAlphaOfLength(5).toLowerCase(Locale.ROOT); + createIndex( + INDEX, + Settings.builder() + .put(indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, NUM_SHARDS) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .build() + ); + + ensureGreen(); + + for (int i = 0; i < 10; i++) { + client().prepareIndex(INDEX).setId(Integer.toString(i)).setSource("field", "value" + i).get(); + refresh(); + } + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING.getKey(), 10)) + .execute() + .actionGet(); + + client().prepareSearch(INDEX).execute().actionGet(); + + NodesStatsRequestBuilder nodesStatsRequestBuilder = new NodesStatsRequestBuilder( + client().admin().cluster(), + NodesStatsAction.INSTANCE + ).setNodesIds().all(); + NodesStatsResponse nodesStatsResponse = nodesStatsRequestBuilder.execute().actionGet(); + ThreadPoolStats threadPoolStats = nodesStatsResponse.getNodes().get(0).getThreadPool(); + + for (ThreadPoolStats.Stats stats : threadPoolStats) { + if (stats.getName().equals("index_searcher")) { + assertThat(stats.getTotalWaitTime().micros(), greaterThan(0L)); + } + } + + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING.getKey(), 2)) + .execute() + .actionGet(); + } + public static class ScriptedDelayedPlugin extends MockScriptPlugin { static final String SCRIPT_NAME = "search_timeout"; diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchThreadPoolExecutor.java b/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchThreadPoolExecutor.java index d967b7423ca80..f2e23e7e33060 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchThreadPoolExecutor.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchThreadPoolExecutor.java @@ -205,4 +205,15 @@ protected Runnable wrapRunnable(Runnable command) { protected Runnable unwrap(Runnable runnable) { return contextHolder.unwrap(runnable); } + + /** + * Returns the cumulative wait time of the ThreadPool. If the ThreadPool does not support tracking the cumulative pool wait time + * then this should return -1 which will prevent the value from showing up in {@link org.opensearch.threadpool.ThreadPoolStats}. + * ThreadPools that do support this metric should override this method. For example, {@link QueueResizingOpenSearchThreadPoolExecutor} + * does so using the {@link TimedRunnable} to get the difference between Runnable creation and execution. + * + */ + public long getPoolWaitTime() { + return -1; + } } diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/QueueResizableOpenSearchThreadPoolExecutor.java b/server/src/main/java/org/opensearch/common/util/concurrent/QueueResizableOpenSearchThreadPoolExecutor.java index 7a0ce8244efe4..54d205298e50e 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/QueueResizableOpenSearchThreadPoolExecutor.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/QueueResizableOpenSearchThreadPoolExecutor.java @@ -9,6 +9,7 @@ package org.opensearch.common.util.concurrent; import org.opensearch.common.ExponentiallyWeightedMovingAverage; +import org.opensearch.common.metrics.CounterMetric; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -27,6 +28,7 @@ public final class QueueResizableOpenSearchThreadPoolExecutor extends OpenSearch private final ResizableBlockingQueue workQueue; private final Function runnableWrapper; private final ExponentiallyWeightedMovingAverage executionEWMA; + private final CounterMetric poolWaitTime; /** * Create new resizable at runtime thread pool executor @@ -101,6 +103,7 @@ public final class QueueResizableOpenSearchThreadPoolExecutor extends OpenSearch this.workQueue = workQueue; this.runnableWrapper = runnableWrapper; this.executionEWMA = new ExponentiallyWeightedMovingAverage(ewmaAlpha, 0); + this.poolWaitTime = new CounterMetric(); } @Override @@ -156,6 +159,7 @@ protected void afterExecute(Runnable r, Throwable t) { // taskExecutionNanos may be -1 if the task threw an exception executionEWMA.addValue(taskExecutionNanos); } + poolWaitTime.inc(timedRunnable.getWaitTimeNanos()); } /** @@ -173,4 +177,9 @@ public synchronized int resize(int capacity) { capacity ); } + + @Override + public long getPoolWaitTime() { + return poolWaitTime.count(); + } } diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/QueueResizingOpenSearchThreadPoolExecutor.java b/server/src/main/java/org/opensearch/common/util/concurrent/QueueResizingOpenSearchThreadPoolExecutor.java index 684dd7c9d8de5..a9497781e9948 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/QueueResizingOpenSearchThreadPoolExecutor.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/QueueResizingOpenSearchThreadPoolExecutor.java @@ -36,6 +36,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.common.ExponentiallyWeightedMovingAverage; +import org.opensearch.common.metrics.CounterMetric; import org.opensearch.common.unit.TimeValue; import java.util.Locale; @@ -66,6 +67,7 @@ public final class QueueResizingOpenSearchThreadPoolExecutor extends OpenSearchT private final int maxQueueSize; private final long targetedResponseTimeNanos; private final ExponentiallyWeightedMovingAverage executionEWMA; + private final CounterMetric poolWaitTime; private final AtomicLong totalTaskNanos = new AtomicLong(0); private final AtomicInteger taskCount = new AtomicInteger(0); @@ -97,6 +99,7 @@ public final class QueueResizingOpenSearchThreadPoolExecutor extends OpenSearchT this.maxQueueSize = maxQueueSize; this.targetedResponseTimeNanos = targetedResponseTime.getNanos(); this.executionEWMA = new ExponentiallyWeightedMovingAverage(EWMA_ALPHA, 0); + this.poolWaitTime = new CounterMetric(); logger.debug( "thread pool [{}] will adjust queue by [{}] when determining automatic queue size", getName(), @@ -190,6 +193,7 @@ protected void afterExecute(Runnable r, Throwable t) { // taskExecutionNanos may be -1 if the task threw an exception executionEWMA.addValue(taskExecutionNanos); } + poolWaitTime.inc(timedRunnable.getWaitTimeNanos()); if (taskCount.incrementAndGet() == this.tasksPerFrame) { final long endTimeNs = System.nanoTime(); @@ -290,4 +294,8 @@ protected void appendThreadPoolExecutorDetails(StringBuilder sb) { sb.append("adjustment amount = ").append(QUEUE_ADJUSTMENT_AMOUNT).append(", "); } + @Override + public long getPoolWaitTime() { + return poolWaitTime.count(); + } } diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/TimedRunnable.java b/server/src/main/java/org/opensearch/common/util/concurrent/TimedRunnable.java index f3bc50a33453b..2eb6657898008 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/TimedRunnable.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/TimedRunnable.java @@ -107,6 +107,14 @@ long getTotalExecutionNanos() { return Math.max(finishTimeNanos - startTimeNanos, 1); } + long getWaitTimeNanos() { + if (startTimeNanos == -1) { + // There must have been an exception thrown, the total time is unknown (-1) + return -1; + } + return Math.max(startTimeNanos - creationTimeNanos, 1); + } + /** * If the task was failed or rejected, return true. * Otherwise, false. diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestThreadPoolAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestThreadPoolAction.java index 652bc448144e2..1f28a9e3bfe86 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestThreadPoolAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestThreadPoolAction.java @@ -163,6 +163,10 @@ protected Table getTableWithHeader(final RestRequest request) { table.addCell("rejected", "alias:r;default:true;text-align:right;desc:number of rejected tasks"); table.addCell("largest", "alias:l;default:false;text-align:right;desc:highest number of seen active threads"); table.addCell("completed", "alias:c;default:false;text-align:right;desc:number of completed tasks"); + table.addCell( + "total_wait_time", + "alias:twt;default:false;text-align:right;desc:total time tasks spent waiting in thread_pool queue" + ); table.addCell("core", "alias:cr;default:false;text-align:right;desc:core number of threads in a scaling thread pool"); table.addCell("max", "alias:mx;default:false;text-align:right;desc:maximum number of threads in a scaling thread pool"); table.addCell("size", "alias:sz;default:false;text-align:right;desc:number of threads in a fixed thread pool"); @@ -267,6 +271,7 @@ private Table buildTable(RestRequest req, ClusterStateResponse state, NodesInfoR table.addCell(poolStats == null ? null : poolStats.getRejected()); table.addCell(poolStats == null ? null : poolStats.getLargest()); table.addCell(poolStats == null ? null : poolStats.getCompleted()); + table.addCell(poolStats == null ? null : poolStats.getTotalWaitTime()); table.addCell(core); table.addCell(max); table.addCell(size); diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java index 6ddf3ff6b2f6a..c5c52cc62c04c 100644 --- a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java @@ -383,19 +383,21 @@ public ThreadPoolStats stats() { long rejected = -1; int largest = -1; long completed = -1; - if (holder.executor() instanceof ThreadPoolExecutor) { - ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) holder.executor(); + long waitTime = -1; + if (holder.executor() instanceof OpenSearchThreadPoolExecutor) { + OpenSearchThreadPoolExecutor threadPoolExecutor = (OpenSearchThreadPoolExecutor) holder.executor(); threads = threadPoolExecutor.getPoolSize(); queue = threadPoolExecutor.getQueue().size(); active = threadPoolExecutor.getActiveCount(); largest = threadPoolExecutor.getLargestPoolSize(); completed = threadPoolExecutor.getCompletedTaskCount(); + waitTime = threadPoolExecutor.getPoolWaitTime(); RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor.getRejectedExecutionHandler(); if (rejectedExecutionHandler instanceof XRejectedExecutionHandler) { rejected = ((XRejectedExecutionHandler) rejectedExecutionHandler).rejected(); } } - stats.add(new ThreadPoolStats.Stats(name, threads, queue, active, rejected, largest, completed)); + stats.add(new ThreadPoolStats.Stats(name, threads, queue, active, rejected, largest, completed, waitTime)); } return new ThreadPoolStats(stats); } diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPoolStats.java b/server/src/main/java/org/opensearch/threadpool/ThreadPoolStats.java index b4d7e4a3fbf7a..9e8274913363d 100644 --- a/server/src/main/java/org/opensearch/threadpool/ThreadPoolStats.java +++ b/server/src/main/java/org/opensearch/threadpool/ThreadPoolStats.java @@ -32,6 +32,8 @@ package org.opensearch.threadpool; +import org.opensearch.Version; +import org.opensearch.common.unit.TimeValue; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; @@ -43,6 +45,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.concurrent.TimeUnit; /** * Stats for a threadpool @@ -65,8 +68,9 @@ public static class Stats implements Writeable, ToXContentFragment, Comparable stats = new ArrayList<>(); - stats.add(new ThreadPoolStats.Stats("z", -1, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats("m", 3, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats("m", 1, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats("d", -1, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats("m", 2, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats("t", -1, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats("a", -1, 0, 0, 0, 0, 0L)); + stats.add(new ThreadPoolStats.Stats("z", -1, 0, 0, 0, 0, 0L, 0L)); + stats.add(new ThreadPoolStats.Stats("m", 3, 0, 0, 0, 0, 0L, 0L)); + stats.add(new ThreadPoolStats.Stats("m", 1, 0, 0, 0, 0, 0L, 0L)); + stats.add(new ThreadPoolStats.Stats("d", -1, 0, 0, 0, 0, 0L, 0L)); + stats.add(new ThreadPoolStats.Stats("m", 2, 0, 0, 0, 0, 0L, 0L)); + stats.add(new ThreadPoolStats.Stats("t", -1, 0, 0, 0, 0, 0L, 0L)); + stats.add(new ThreadPoolStats.Stats("a", -1, 0, 0, 0, 0, 0L, 0L)); List copy = new ArrayList<>(stats); Collections.sort(copy); @@ -79,11 +79,11 @@ public void testThreadPoolStatsToXContent() throws IOException { try (BytesStreamOutput os = new BytesStreamOutput()) { List stats = new ArrayList<>(); - stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.SEARCH, -1, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.WARMER, -1, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.GENERIC, -1, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.FORCE_MERGE, -1, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.SAME, -1, 0, 0, 0, 0, 0L)); + stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.SEARCH, -1, 0, 0, 0, 0, 0L, 0L)); + stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.WARMER, -1, 0, 0, 0, 0, 0L, -1L)); + stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.GENERIC, -1, 0, 0, 0, 0, 0L, -1L)); + stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.FORCE_MERGE, -1, 0, 0, 0, 0, 0L, -1L)); + stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.SAME, -1, 0, 0, 0, 0, 0L, -1L)); ThreadPoolStats threadPoolStats = new ThreadPoolStats(stats); try (XContentBuilder builder = new XContentBuilder(MediaTypeRegistry.JSON.xContent(), os)) {