From 4bedb9c27a3dcab9266ae2cd59b48d61d228eb73 Mon Sep 17 00:00:00 2001 From: Gaurav Bafna Date: Fri, 11 Oct 2024 12:06:49 +0530 Subject: [PATCH] Address PR comments Signed-off-by: Gaurav Bafna --- .../cluster/settings/ClusterSettingsIT.java | 4 +- .../org/opensearch/threadpool/ThreadPool.java | 175 +++++++++--------- 2 files changed, 94 insertions(+), 85 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/settings/ClusterSettingsIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/settings/ClusterSettingsIT.java index d7edb40afe5d0..4c5316553baf4 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/settings/ClusterSettingsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/settings/ClusterSettingsIT.java @@ -414,7 +414,7 @@ public void testThreadPoolSettings() { .get(); fail("bogus value"); } catch (IllegalArgumentException ex) { - assertEquals(ex.getCause().getMessage(), "illegal thread_pool config : [wrong] should only have max and core"); + assertEquals(ex.getCause().getMessage(), "illegal thread_pool config : [wrong] should only have [core, max]"); } // Scaling threadpool - core > max @@ -449,7 +449,7 @@ public void testThreadPoolSettings() { .get(); fail("bogus value"); } catch (IllegalArgumentException ex) { - assertEquals(ex.getCause().getMessage(), "illegal thread_pool config : [wrong] should only have size"); + assertEquals(ex.getCause().getMessage(), "illegal thread_pool config : [wrong] should only have [size]"); } // Fixed threadpool - 0 value diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java index a962e20aa67c8..3a80a6844fcae 100644 --- a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java @@ -126,6 +126,9 @@ public static class Names { public static final String REMOTE_STATE_CHECKSUM = "remote_state_checksum"; } + static Set scalingThreadPoolKeys = new HashSet<>(Arrays.asList("max", "core")); + static Set fixedThreadPoolKeys = new HashSet<>(Arrays.asList("size")); + /** * The threadpool type. * @@ -238,89 +241,6 @@ public ThreadPool(final Settings settings, final ExecutorBuilder... customBui this(settings, null, customBuilders); } - public void setThreadPool(Settings tpSettings) { - Map tpGroups = tpSettings.getAsGroups(); - for (Map.Entry entry : tpGroups.entrySet()) { - String tpName = entry.getKey(); - Settings tpGroup = entry.getValue(); - ExecutorHolder holder = executors.get(tpName); - assert holder.executor instanceof OpenSearchThreadPoolExecutor; - OpenSearchThreadPoolExecutor o = (OpenSearchThreadPoolExecutor) holder.executor; - if (holder.info.type == ThreadPoolType.SCALING) { - int max = tpGroup.getAsInt("max", o.getMaximumPoolSize()); - int core = tpGroup.getAsInt("core", o.getCorePoolSize()); - /* - If we are decreasing, core pool size has to be decreased first. - If we are increasing ,max pool size has to be increased first - This ensures that core pool is always smaller than max pool size . - */ - if (core < o.getCorePoolSize()) { - o.setCorePoolSize(core); - o.setMaximumPoolSize(max); - } else { - o.setMaximumPoolSize(max); - o.setCorePoolSize(core); - } - } else { - int size = tpGroup.getAsInt("size", o.getMaximumPoolSize()); - if (size < o.getCorePoolSize()) { - o.setCorePoolSize(size); - o.setMaximumPoolSize(size); - } else { - o.setMaximumPoolSize(size); - o.setCorePoolSize(size); - } - } - } - } - - public void setClusterSettings(ClusterSettings clusterSettings) { - this.clusterSettings = clusterSettings; - this.clusterSettings.addSettingsUpdateConsumer(CLUSTER_THREAD_POOL_SIZE_SETTING, this::setThreadPool, this::validateSetting); - } - - private void validateSetting(Settings tpSettings) { - Map tpGroups = tpSettings.getAsGroups(); - for (Map.Entry entry : tpGroups.entrySet()) { - String tpName = entry.getKey(); - if (THREAD_POOL_TYPES.get(tpName) == null) { - throw new IllegalArgumentException("illegal thread_pool name : " + tpName); - } - Settings tpGroup = entry.getValue(); - ExecutorHolder holder = executors.get(tpName); - assert holder.executor instanceof OpenSearchThreadPoolExecutor; - OpenSearchThreadPoolExecutor threadPoolExecutor = (OpenSearchThreadPoolExecutor) holder.executor; - if (holder.info.type == ThreadPoolType.SCALING) { - Set expectedKeys = new HashSet<>(Arrays.asList("max", "core")); - if (tpGroup.keySet().stream().allMatch(expectedKeys::contains) == false) { - throw new IllegalArgumentException( - "illegal thread_pool config : " + tpGroup.keySet() + " should only have max and core" - ); - } - int max = tpGroup.getAsInt("max", threadPoolExecutor.getMaximumPoolSize()); - int core = tpGroup.getAsInt("core", threadPoolExecutor.getCorePoolSize()); - if (core < 1 || max < 1) { - throw new IllegalArgumentException("illegal value for [cluster.thread_pool." + tpName + "], has to be positive value"); - } else if (core > max) { - throw new IllegalArgumentException("core threadpool size cannot be greater than max"); - } - } else { - if (tpGroup.keySet().size() != 1) { - throw new IllegalArgumentException("illegal thread_pool config : " + tpGroup.keySet()); - } else if (tpGroup.keySet().contains("size") == false) { - throw new IllegalArgumentException("illegal thread_pool config : " + tpGroup.keySet() + " should only have size"); - } else { - int size = tpGroup.getAsInt("size", threadPoolExecutor.getMaximumPoolSize()); - if (size < 1) { - throw new IllegalArgumentException( - "illegal value for [cluster.thread_pool." + tpName + "], has to be positive value" - ); - } - } - } - } - } - public ThreadPool( final Settings settings, final AtomicReference runnableTaskListener, @@ -503,6 +423,95 @@ public Info info(String name) { return holder.info; } + public void setClusterSettings(ClusterSettings clusterSettings) { + this.clusterSettings = clusterSettings; + this.clusterSettings.addSettingsUpdateConsumer(CLUSTER_THREAD_POOL_SIZE_SETTING, this::setThreadPool, this::validateSetting); + } + + /* + Scaling threadpool can provide only max and core + Fixed/ResizableQueue can provide only size + + For example valid settings would be for scaling and fixed thead pool + cluster.threadpool.snapshot.max : "5", + cluster.threadpool.snapshot.core : "5", + cluster.threadpool.get.size : "2", + */ + private void validateSetting(Settings tpSettings) { + Map tpGroups = tpSettings.getAsGroups(); + for (Map.Entry entry : tpGroups.entrySet()) { + String tpName = entry.getKey(); + if (THREAD_POOL_TYPES.containsKey(tpName) == false) { + throw new IllegalArgumentException("illegal thread_pool name : " + tpName); + } + Settings tpGroup = entry.getValue(); + ExecutorHolder holder = executors.get(tpName); + assert holder.executor instanceof OpenSearchThreadPoolExecutor; + OpenSearchThreadPoolExecutor threadPoolExecutor = (OpenSearchThreadPoolExecutor) holder.executor; + if (holder.info.type == ThreadPoolType.SCALING) { + if (scalingThreadPoolKeys.containsAll(tpGroup.keySet()) == false) { + throw new IllegalArgumentException( + "illegal thread_pool config : " + tpGroup.keySet() + " should only have " + scalingThreadPoolKeys + ); + } + int max = tpGroup.getAsInt("max", threadPoolExecutor.getMaximumPoolSize()); + int core = tpGroup.getAsInt("core", threadPoolExecutor.getCorePoolSize()); + if (core < 1 || max < 1) { + throw new IllegalArgumentException("illegal value for [cluster.thread_pool." + tpName + "], has to be positive value"); + } else if (core > max) { + throw new IllegalArgumentException("core threadpool size cannot be greater than max"); + } + } else { + if (fixedThreadPoolKeys.containsAll(tpGroup.keySet()) == false) { + throw new IllegalArgumentException( + "illegal thread_pool config : " + tpGroup.keySet() + " should only have " + fixedThreadPoolKeys + ); + } + int size = tpGroup.getAsInt("size", threadPoolExecutor.getMaximumPoolSize()); + if (size < 1) { + throw new IllegalArgumentException("illegal value for [cluster.thread_pool." + tpName + "], has to be positive value"); + } + } + } + } + + public void setThreadPool(Settings tpSettings) { + Map tpGroups = tpSettings.getAsGroups(); + for (Map.Entry entry : tpGroups.entrySet()) { + String tpName = entry.getKey(); + Settings tpGroup = entry.getValue(); + ExecutorHolder holder = executors.get(tpName); + assert holder.executor instanceof OpenSearchThreadPoolExecutor; + OpenSearchThreadPoolExecutor executor = (OpenSearchThreadPoolExecutor) holder.executor; + if (holder.info.type == ThreadPoolType.SCALING) { + int max = tpGroup.getAsInt("max", executor.getMaximumPoolSize()); + int core = tpGroup.getAsInt("core", executor.getCorePoolSize()); + /* + If we are decreasing, core pool size has to be decreased first. + If we are increasing ,max pool size has to be increased first + This ensures that core pool is always smaller than max pool size . + Other wise IllegalArgumentException will be thrown from ThreadPoolExecutor + */ + if (core < executor.getCorePoolSize()) { + executor.setCorePoolSize(core); + executor.setMaximumPoolSize(max); + } else { + executor.setMaximumPoolSize(max); + executor.setCorePoolSize(core); + } + } else { + int size = tpGroup.getAsInt("size", executor.getMaximumPoolSize()); + if (size < executor.getCorePoolSize()) { + executor.setCorePoolSize(size); + executor.setMaximumPoolSize(size); + } else { + executor.setMaximumPoolSize(size); + executor.setCorePoolSize(size); + } + } + } + } + public ThreadPoolStats stats() { List stats = new ArrayList<>(); for (ExecutorHolder holder : executors.values()) {