From cf26b0367a3551d5386a527f3d5abe086e4ec144 Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Fri, 20 Oct 2023 22:24:52 -0700 Subject: [PATCH] Increase remote recovery thread pool size (#10750) (#10796) The remote recovery thread pool does blocking I/O when downloading files, so the "half processor count max 10" was definitely too small. This can be shown by triggering recoveries on a node that is also doing segment replication, and the replication lag will increase due to contention on that thread pool. Some amount of contention is inevitable, but the change here to increase the download thread pool, and also limit the concurrent usage of that thread pool by any single recovery/replication to 25% of the threads does help. Long term, we can improve this even further by moving to fully async I/O to avoid blocking threads in the application on draining InputStreams. (cherry picked from commit 1e28738b8c966011bf1ae1f00431f0377761cb0a) Signed-off-by: Andrew Ross Signed-off-by: github-actions[bot] Co-authored-by: github-actions[bot] --- .../org/opensearch/indices/recovery/RecoverySettings.java | 7 ++++--- .../main/java/org/opensearch/threadpool/ThreadPool.java | 7 ++++++- .../org/opensearch/threadpool/ScalingThreadPoolTests.java | 2 +- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java index 44dfb2f4cb00a..0f3025369833d 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java @@ -41,6 +41,7 @@ import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.common.unit.ByteSizeValue; @@ -87,10 +88,10 @@ public class RecoverySettings { /** * Controls the maximum number of streams that can be started concurrently per recovery when downloading from the remote store. */ - public static final Setting INDICES_RECOVERY_MAX_CONCURRENT_REMOTE_STORE_STREAMS_SETTING = Setting.intSetting( + public static final Setting INDICES_RECOVERY_MAX_CONCURRENT_REMOTE_STORE_STREAMS_SETTING = new Setting<>( "indices.recovery.max_concurrent_remote_store_streams", - 10, - 1, + (s) -> Integer.toString(Math.max(1, OpenSearchExecutors.allocatedProcessors(s) / 2)), + (s) -> Setting.parseInt(s, 1, "indices.recovery.max_concurrent_remote_store_streams"), Property.Dynamic, Property.NodeScope ); diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java index f609f9b1d5c93..9741fc9f2f892 100644 --- a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java @@ -284,7 +284,12 @@ public ThreadPool( ); builders.put( Names.REMOTE_RECOVERY, - new ScalingExecutorBuilder(Names.REMOTE_RECOVERY, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)) + new ScalingExecutorBuilder( + Names.REMOTE_RECOVERY, + 1, + twiceAllocatedProcessors(allocatedProcessors), + TimeValue.timeValueMinutes(5) + ) ); if (FeatureFlags.isEnabled(FeatureFlags.CONCURRENT_SEGMENT_SEARCH)) { builders.put( diff --git a/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java b/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java index ba2d4b8c247bb..19271bbf30e80 100644 --- a/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java +++ b/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java @@ -154,7 +154,7 @@ private int expectedSize(final String threadPoolName, final int numberOfProcesso sizes.put(ThreadPool.Names.TRANSLOG_SYNC, n -> 4 * n); sizes.put(ThreadPool.Names.REMOTE_PURGE, ThreadPool::halfAllocatedProcessorsMaxFive); sizes.put(ThreadPool.Names.REMOTE_REFRESH_RETRY, ThreadPool::halfAllocatedProcessorsMaxTen); - sizes.put(ThreadPool.Names.REMOTE_RECOVERY, ThreadPool::halfAllocatedProcessorsMaxTen); + sizes.put(ThreadPool.Names.REMOTE_RECOVERY, ThreadPool::twiceAllocatedProcessors); return sizes.get(threadPoolName).apply(numberOfProcessors); }