diff --git a/CHANGELOG.md b/CHANGELOG.md index 1edd7bb20de2e..a04d43c4f08d2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -75,6 +75,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## [Unreleased 2.x] ### Added +- Add metrics for thread_pool task wait time ([#9681](https://github.com/opensearch-project/OpenSearch/pull/9681)) ### Dependencies - Bump `peter-evans/create-or-update-comment` from 2 to 3 ([#9575](https://github.com/opensearch-project/OpenSearch/pull/9575)) @@ -94,4 +95,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Security [Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD -[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.11...2.x \ No newline at end of file +[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.11...2.x diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/cat.thread_pool/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/cat.thread_pool/10_basic.yml index 1ce8468cb51f9..9d08f2e4ba53b 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/cat.thread_pool/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/cat.thread_pool/10_basic.yml @@ -1,3 +1,30 @@ +"Test cat thread_pool total_wait_time output": + - skip: + version: " - 3.0.0" + reason: thread_pool total_wait_time stats were introduced in V_3.0.0 + + - do: + cat.thread_pool: {} + + - match: + $body: | + / #node_name name active queue rejected + ^ (\S+ \s+ \S+ \s+ \d+ \s+ \d+ \s+ \d+ \n)+ $/ + + - do: + cat.thread_pool: + thread_pool_patterns: search,search_throttled,index_searcher,generic + h: name,total_wait_time,twt + v: true + + - match: + $body: | + /^ id \s+ name \s+ total_wait_time \s+ twt \n + (\S+ \s+ search \s+ \d+s \s+ \d+ \n + \S+ \s+ search_throttled \s+ \d+s \s+ \d+ \n + \S+ \s+ index_searcher \s+ \d+s \s+ \d+ \n + \S+ \s+ generic \s+ -1 \s+ -1 \n)+ $/ + --- "Test cat thread_pool output": - skip: diff --git a/server/src/internalClusterTest/java/org/opensearch/search/stats/ConcurrentSearchStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/search/stats/ConcurrentSearchStatsIT.java index 350f494c35366..f682f768017f0 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/stats/ConcurrentSearchStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/stats/ConcurrentSearchStatsIT.java @@ -8,6 +8,8 @@ package org.opensearch.search.stats; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsAction; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequestBuilder; import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.opensearch.action.admin.indices.stats.IndexStats; import org.opensearch.action.admin.indices.stats.IndicesStatsRequestBuilder; @@ -26,6 +28,8 @@ import org.opensearch.search.SearchService; import org.opensearch.test.InternalSettingsPlugin; import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.threadpool.ThreadPoolStats; import java.util.Arrays; import java.util.Collection; @@ -35,6 +39,7 @@ import java.util.function.Function; import static org.opensearch.index.query.QueryBuilders.scriptQuery; +import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.lessThan; @@ -307,6 +312,54 @@ public void testAvgConcurrencyIndexLevel() throws InterruptedException { assertEquals(expectedConcurrency, stats.getTotal().getSearch().getTotal().getConcurrentAvgSliceCount(), 0); } + public void testThreadPoolWaitTime() throws Exception { + int NUM_SHARDS = 1; + String INDEX = "test-" + randomAlphaOfLength(5).toLowerCase(Locale.ROOT); + createIndex( + INDEX, + Settings.builder() + .put(indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, NUM_SHARDS) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .build() + ); + + ensureGreen(); + + for (int i = 0; i < 10; i++) { + client().prepareIndex(INDEX).setId(Integer.toString(i)).setSource("field", "value" + i).get(); + refresh(); + } + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING.getKey(), 10)) + .execute() + .actionGet(); + + client().prepareSearch(INDEX).execute().actionGet(); + + NodesStatsRequestBuilder nodesStatsRequestBuilder = new NodesStatsRequestBuilder( + client().admin().cluster(), + NodesStatsAction.INSTANCE + ).setNodesIds().all(); + NodesStatsResponse nodesStatsResponse = nodesStatsRequestBuilder.execute().actionGet(); + ThreadPoolStats threadPoolStats = nodesStatsResponse.getNodes().get(0).getThreadPool(); + + for (ThreadPoolStats.Stats stats : threadPoolStats) { + if (stats.getName().equals(ThreadPool.Names.INDEX_SEARCHER)) { + assertThat(stats.getWaitTimeNanos().micros(), greaterThan(0L)); + } + } + + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING.getKey(), 2)) + .execute() + .actionGet(); + } + public static class ScriptedDelayedPlugin extends MockScriptPlugin { static final String SCRIPT_NAME = "search_timeout"; diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchThreadPoolExecutor.java b/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchThreadPoolExecutor.java index d967b7423ca80..afffec4790873 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchThreadPoolExecutor.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchThreadPoolExecutor.java @@ -205,4 +205,15 @@ protected Runnable wrapRunnable(Runnable command) { protected Runnable unwrap(Runnable runnable) { return contextHolder.unwrap(runnable); } + + /** + * Returns the cumulative wait time of the ThreadPool. If the ThreadPool does not support tracking the cumulative pool wait time + * then this should return -1 which will prevent the value from showing up in {@link org.opensearch.threadpool.ThreadPoolStats}. + * ThreadPools that do support this metric should override this method. For example, {@link QueueResizingOpenSearchThreadPoolExecutor} + * does so using the {@link TimedRunnable} to get the difference between Runnable creation and execution. + * + */ + public long getPoolWaitTimeNanos() { + return -1; + } } diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/QueueResizableOpenSearchThreadPoolExecutor.java b/server/src/main/java/org/opensearch/common/util/concurrent/QueueResizableOpenSearchThreadPoolExecutor.java index 7a0ce8244efe4..c06184f5a5483 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/QueueResizableOpenSearchThreadPoolExecutor.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/QueueResizableOpenSearchThreadPoolExecutor.java @@ -9,6 +9,7 @@ package org.opensearch.common.util.concurrent; import org.opensearch.common.ExponentiallyWeightedMovingAverage; +import org.opensearch.common.metrics.CounterMetric; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -27,6 +28,7 @@ public final class QueueResizableOpenSearchThreadPoolExecutor extends OpenSearch private final ResizableBlockingQueue workQueue; private final Function runnableWrapper; private final ExponentiallyWeightedMovingAverage executionEWMA; + private final CounterMetric poolWaitTime; /** * Create new resizable at runtime thread pool executor @@ -101,6 +103,7 @@ public final class QueueResizableOpenSearchThreadPoolExecutor extends OpenSearch this.workQueue = workQueue; this.runnableWrapper = runnableWrapper; this.executionEWMA = new ExponentiallyWeightedMovingAverage(ewmaAlpha, 0); + this.poolWaitTime = new CounterMetric(); } @Override @@ -156,6 +159,7 @@ protected void afterExecute(Runnable r, Throwable t) { // taskExecutionNanos may be -1 if the task threw an exception executionEWMA.addValue(taskExecutionNanos); } + poolWaitTime.inc(timedRunnable.getWaitTimeNanos()); } /** @@ -173,4 +177,9 @@ public synchronized int resize(int capacity) { capacity ); } + + @Override + public long getPoolWaitTimeNanos() { + return poolWaitTime.count(); + } } diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/QueueResizingOpenSearchThreadPoolExecutor.java b/server/src/main/java/org/opensearch/common/util/concurrent/QueueResizingOpenSearchThreadPoolExecutor.java index 684dd7c9d8de5..0c0b437e4f390 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/QueueResizingOpenSearchThreadPoolExecutor.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/QueueResizingOpenSearchThreadPoolExecutor.java @@ -36,6 +36,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.common.ExponentiallyWeightedMovingAverage; +import org.opensearch.common.metrics.CounterMetric; import org.opensearch.common.unit.TimeValue; import java.util.Locale; @@ -66,6 +67,7 @@ public final class QueueResizingOpenSearchThreadPoolExecutor extends OpenSearchT private final int maxQueueSize; private final long targetedResponseTimeNanos; private final ExponentiallyWeightedMovingAverage executionEWMA; + private final CounterMetric poolWaitTime; private final AtomicLong totalTaskNanos = new AtomicLong(0); private final AtomicInteger taskCount = new AtomicInteger(0); @@ -97,6 +99,7 @@ public final class QueueResizingOpenSearchThreadPoolExecutor extends OpenSearchT this.maxQueueSize = maxQueueSize; this.targetedResponseTimeNanos = targetedResponseTime.getNanos(); this.executionEWMA = new ExponentiallyWeightedMovingAverage(EWMA_ALPHA, 0); + this.poolWaitTime = new CounterMetric(); logger.debug( "thread pool [{}] will adjust queue by [{}] when determining automatic queue size", getName(), @@ -190,6 +193,7 @@ protected void afterExecute(Runnable r, Throwable t) { // taskExecutionNanos may be -1 if the task threw an exception executionEWMA.addValue(taskExecutionNanos); } + poolWaitTime.inc(timedRunnable.getWaitTimeNanos()); if (taskCount.incrementAndGet() == this.tasksPerFrame) { final long endTimeNs = System.nanoTime(); @@ -290,4 +294,8 @@ protected void appendThreadPoolExecutorDetails(StringBuilder sb) { sb.append("adjustment amount = ").append(QUEUE_ADJUSTMENT_AMOUNT).append(", "); } + @Override + public long getPoolWaitTimeNanos() { + return poolWaitTime.count(); + } } diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/TimedRunnable.java b/server/src/main/java/org/opensearch/common/util/concurrent/TimedRunnable.java index f3bc50a33453b..2eb6657898008 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/TimedRunnable.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/TimedRunnable.java @@ -107,6 +107,14 @@ long getTotalExecutionNanos() { return Math.max(finishTimeNanos - startTimeNanos, 1); } + long getWaitTimeNanos() { + if (startTimeNanos == -1) { + // There must have been an exception thrown, the total time is unknown (-1) + return -1; + } + return Math.max(startTimeNanos - creationTimeNanos, 1); + } + /** * If the task was failed or rejected, return true. * Otherwise, false. diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestThreadPoolAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestThreadPoolAction.java index 652bc448144e2..bcb8024768551 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestThreadPoolAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestThreadPoolAction.java @@ -163,6 +163,10 @@ protected Table getTableWithHeader(final RestRequest request) { table.addCell("rejected", "alias:r;default:true;text-align:right;desc:number of rejected tasks"); table.addCell("largest", "alias:l;default:false;text-align:right;desc:highest number of seen active threads"); table.addCell("completed", "alias:c;default:false;text-align:right;desc:number of completed tasks"); + table.addCell( + "total_wait_time", + "alias:twt;default:false;text-align:right;desc:total time tasks spent waiting in thread_pool queue" + ); table.addCell("core", "alias:cr;default:false;text-align:right;desc:core number of threads in a scaling thread pool"); table.addCell("max", "alias:mx;default:false;text-align:right;desc:maximum number of threads in a scaling thread pool"); table.addCell("size", "alias:sz;default:false;text-align:right;desc:number of threads in a fixed thread pool"); @@ -267,6 +271,7 @@ private Table buildTable(RestRequest req, ClusterStateResponse state, NodesInfoR table.addCell(poolStats == null ? null : poolStats.getRejected()); table.addCell(poolStats == null ? null : poolStats.getLargest()); table.addCell(poolStats == null ? null : poolStats.getCompleted()); + table.addCell(poolStats == null ? null : poolStats.getWaitTimeNanos()); table.addCell(core); table.addCell(max); table.addCell(size); diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java index 6ddf3ff6b2f6a..8375ac34972af 100644 --- a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java @@ -383,19 +383,21 @@ public ThreadPoolStats stats() { long rejected = -1; int largest = -1; long completed = -1; - if (holder.executor() instanceof ThreadPoolExecutor) { - ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) holder.executor(); + long waitTimeNanos = -1; + if (holder.executor() instanceof OpenSearchThreadPoolExecutor) { + OpenSearchThreadPoolExecutor threadPoolExecutor = (OpenSearchThreadPoolExecutor) holder.executor(); threads = threadPoolExecutor.getPoolSize(); queue = threadPoolExecutor.getQueue().size(); active = threadPoolExecutor.getActiveCount(); largest = threadPoolExecutor.getLargestPoolSize(); completed = threadPoolExecutor.getCompletedTaskCount(); + waitTimeNanos = threadPoolExecutor.getPoolWaitTimeNanos(); RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor.getRejectedExecutionHandler(); if (rejectedExecutionHandler instanceof XRejectedExecutionHandler) { rejected = ((XRejectedExecutionHandler) rejectedExecutionHandler).rejected(); } } - stats.add(new ThreadPoolStats.Stats(name, threads, queue, active, rejected, largest, completed)); + stats.add(new ThreadPoolStats.Stats(name, threads, queue, active, rejected, largest, completed, waitTimeNanos)); } return new ThreadPoolStats(stats); } diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPoolStats.java b/server/src/main/java/org/opensearch/threadpool/ThreadPoolStats.java index b4d7e4a3fbf7a..66106e13a380f 100644 --- a/server/src/main/java/org/opensearch/threadpool/ThreadPoolStats.java +++ b/server/src/main/java/org/opensearch/threadpool/ThreadPoolStats.java @@ -32,6 +32,8 @@ package org.opensearch.threadpool; +import org.opensearch.Version; +import org.opensearch.common.unit.TimeValue; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; @@ -65,8 +67,9 @@ public static class Stats implements Writeable, ToXContentFragment, Comparable stats = new ArrayList<>(); - stats.add(new ThreadPoolStats.Stats("z", -1, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats("m", 3, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats("m", 1, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats("d", -1, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats("m", 2, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats("t", -1, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats("a", -1, 0, 0, 0, 0, 0L)); + stats.add(new ThreadPoolStats.Stats("z", -1, 0, 0, 0, 0, 0L, 0L)); + stats.add(new ThreadPoolStats.Stats("m", 3, 0, 0, 0, 0, 0L, 0L)); + stats.add(new ThreadPoolStats.Stats("m", 1, 0, 0, 0, 0, 0L, 0L)); + stats.add(new ThreadPoolStats.Stats("d", -1, 0, 0, 0, 0, 0L, 0L)); + stats.add(new ThreadPoolStats.Stats("m", 2, 0, 0, 0, 0, 0L, 0L)); + stats.add(new ThreadPoolStats.Stats("t", -1, 0, 0, 0, 0, 0L, 0L)); + stats.add(new ThreadPoolStats.Stats("a", -1, 0, 0, 0, 0, 0L, 0L)); List copy = new ArrayList<>(stats); Collections.sort(copy); @@ -79,11 +79,11 @@ public void testThreadPoolStatsToXContent() throws IOException { try (BytesStreamOutput os = new BytesStreamOutput()) { List stats = new ArrayList<>(); - stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.SEARCH, -1, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.WARMER, -1, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.GENERIC, -1, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.FORCE_MERGE, -1, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.SAME, -1, 0, 0, 0, 0, 0L)); + stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.SEARCH, -1, 0, 0, 0, 0, 0L, 0L)); + stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.WARMER, -1, 0, 0, 0, 0, 0L, -1L)); + stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.GENERIC, -1, 0, 0, 0, 0, 0L, -1L)); + stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.FORCE_MERGE, -1, 0, 0, 0, 0, 0L, -1L)); + stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.SAME, -1, 0, 0, 0, 0, 0L, -1L)); ThreadPoolStats threadPoolStats = new ThreadPoolStats(stats); try (XContentBuilder builder = new XContentBuilder(MediaTypeRegistry.JSON.xContent(), os)) {