Skip to content

Commit

Permalink
Add support to dynamically resize threadpools size (#16236)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Bafna <[email protected]>
  • Loading branch information
gbbafna committed Oct 15, 2024
1 parent c7b865e commit c2d8bcf
Show file tree
Hide file tree
Showing 6 changed files with 259 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Tiered Caching] Segmented cache changes ([#16047](https://github.com/opensearch-project/OpenSearch/pull/16047))
- Add support for msearch API to pass search pipeline name - ([#15923](https://github.com/opensearch-project/OpenSearch/pull/15923))
- Add success and failure metrics for async shard fetch ([#15976](https://github.com/opensearch-project/OpenSearch/pull/15976))
- Add support to dynamically resize threadpools size. ([#16236](https://github.com/opensearch-project/OpenSearch/pull/16236))
- [S3 Repository] Change default retry mechanism of s3 clients to Standard Mode ([#15978](https://github.com/opensearch-project/OpenSearch/pull/15978))
- [Workload Management] Add Integration Tests for Workload Management CRUD APIs ([#15955](https://github.com/opensearch-project/OpenSearch/pull/15955))
- Add new metric REMOTE_STORE to NodeStats API response ([#15611](https://github.com/opensearch-project/OpenSearch/pull/15611))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,114 @@ public void testMissingUnits() {
}
}

public void testThreadPoolSettings() {
// wrong threadpool
try {
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put("cluster.thread_pool.wrong.max", "-1").build())
.get();
fail("bogus value");
} catch (IllegalArgumentException ex) {
assertTrue(ex.getCause().getMessage().contains("illegal thread_pool name : "));
}

// Scaling threadpool - negative value
try {
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put("cluster.thread_pool.snapshot.max", "-1").build())
.get();
fail("bogus value");
} catch (IllegalArgumentException ex) {
assertEquals(ex.getCause().getMessage(), "illegal value for [cluster.thread_pool.snapshot], has to be positive value");
}

// Scaling threadpool - Other than max and core
try {
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put("cluster.thread_pool.snapshot.wrong", "-1").build())
.get();
fail("bogus value");
} catch (IllegalArgumentException ex) {
assertEquals(ex.getCause().getMessage(), "illegal thread_pool config : [wrong] should only have [core, max]");
}

// Scaling threadpool - core > max
try {
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(
Settings.builder().put("cluster.thread_pool.snapshot.core", "2").put("cluster.thread_pool.snapshot.max", "1").build()
)
.get();
fail("bogus value");
} catch (IllegalArgumentException ex) {
assertEquals(ex.getCause().getMessage(), "core threadpool size cannot be greater than max");
}

// Scaling threadpool - Max value lesser than default value of 4
try {
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put("cluster.thread_pool.generic.max", "1").build())
.get();
fail("bogus value");
} catch (IllegalArgumentException ex) {
assertEquals(ex.getCause().getMessage(), "core threadpool size cannot be greater than max");
}

// Scaling threadpool - happy case - transient overrides persistent
ClusterUpdateSettingsResponse clusterUpdateSettingsResponse = client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(
Settings.builder().put("cluster.thread_pool.snapshot.core", "2").put("cluster.thread_pool.snapshot.max", "2").build()
)
.setPersistentSettings(Settings.builder().put("cluster.thread_pool.snapshot.max", "1").build())
.get();
assertTrue(clusterUpdateSettingsResponse.isAcknowledged());

// Fixed threadpool - Other than size
try {
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put("cluster.thread_pool.get.wrong", "-1").build())
.get();
fail("bogus value");
} catch (IllegalArgumentException ex) {
assertEquals(ex.getCause().getMessage(), "illegal thread_pool config : [wrong] should only have [size]");
}

// Fixed threadpool - 0 value
try {
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put("cluster.thread_pool.get.size", "0").build())
.get();
fail("bogus value");
} catch (IllegalArgumentException ex) {
assertEquals(ex.getCause().getMessage(), "illegal value for [cluster.thread_pool.get], has to be positive value");
}

// Fixed threadpool - happy case
clusterUpdateSettingsResponse = client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put("cluster.thread_pool.get.size", "1").build())
.setPersistentSettings(Settings.builder().put("cluster.thread_pool.get.size", "1").build())
.get();
assertTrue(clusterUpdateSettingsResponse.isAcknowledged());
}

public void testLoggerLevelUpdate() {
assertAcked(prepareCreate("test"));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,10 @@ public void apply(Settings value, Settings current, Settings previous) {
// Settings to be used for limiting rest requests
ResponseLimitSettings.CAT_INDICES_RESPONSE_LIMIT_SETTING,
ResponseLimitSettings.CAT_SHARDS_RESPONSE_LIMIT_SETTING,
ResponseLimitSettings.CAT_SEGMENTS_RESPONSE_LIMIT_SETTING
ResponseLimitSettings.CAT_SEGMENTS_RESPONSE_LIMIT_SETTING,

// Thread pool Settings
ThreadPool.CLUSTER_THREAD_POOL_SIZE_SETTING
)
)
);
Expand Down
1 change: 1 addition & 0 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,7 @@ protected Node(
additionalSettingsFilter,
settingsUpgraders
);
threadPool.registerClusterSettingsListeners(settingsModule.getClusterSettings());
scriptModule.registerClusterSettingsListeners(scriptService, settingsModule.getClusterSettings());
final NetworkService networkService = new NetworkService(
getCustomNameResolvers(pluginsService.filterPlugins(DiscoveryPlugin.class))
Expand Down
101 changes: 101 additions & 0 deletions server/src/main/java/org/opensearch/threadpool/ThreadPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.common.Nullable;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.SizeValue;
Expand All @@ -57,11 +58,14 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
Expand Down Expand Up @@ -121,6 +125,9 @@ public static class Names {
public static final String REMOTE_STATE_CHECKSUM = "remote_state_checksum";
}

static Set<String> scalingThreadPoolKeys = new HashSet<>(Arrays.asList("max", "core"));
static Set<String> fixedThreadPoolKeys = new HashSet<>(Arrays.asList("size"));

/**
* The threadpool type.
*
Expand Down Expand Up @@ -221,6 +228,12 @@ public Collection<ExecutorBuilder> builders() {
Setting.Property.NodeScope
);

public static final Setting<Settings> CLUSTER_THREAD_POOL_SIZE_SETTING = Setting.groupSetting(
"cluster.thread_pool.",
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBuilders) {
this(settings, null, customBuilders);
}
Expand Down Expand Up @@ -417,6 +430,94 @@ public Info info(String name) {
return holder.info;
}

public void registerClusterSettingsListeners(ClusterSettings clusterSettings) {
clusterSettings.addSettingsUpdateConsumer(CLUSTER_THREAD_POOL_SIZE_SETTING, this::setThreadPool, this::validateSetting);
}

/*
Scaling threadpool can provide only max and core
Fixed/ResizableQueue can provide only size
For example valid settings would be for scaling and fixed thead pool
cluster.threadpool.snapshot.max : "5",
cluster.threadpool.snapshot.core : "5",
cluster.threadpool.get.size : "2",
*/
private void validateSetting(Settings tpSettings) {
Map<String, Settings> tpGroups = tpSettings.getAsGroups();
for (Map.Entry<String, Settings> entry : tpGroups.entrySet()) {
String tpName = entry.getKey();

Check warning on line 449 in server/src/main/java/org/opensearch/threadpool/ThreadPool.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/threadpool/ThreadPool.java#L449

Added line #L449 was not covered by tests
if (THREAD_POOL_TYPES.containsKey(tpName) == false) {
throw new IllegalArgumentException("illegal thread_pool name : " + tpName);

Check warning on line 451 in server/src/main/java/org/opensearch/threadpool/ThreadPool.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/threadpool/ThreadPool.java#L451

Added line #L451 was not covered by tests
}
Settings tpGroup = entry.getValue();
ExecutorHolder holder = executors.get(tpName);

Check warning on line 454 in server/src/main/java/org/opensearch/threadpool/ThreadPool.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/threadpool/ThreadPool.java#L453-L454

Added lines #L453 - L454 were not covered by tests
assert holder.executor instanceof OpenSearchThreadPoolExecutor;
OpenSearchThreadPoolExecutor threadPoolExecutor = (OpenSearchThreadPoolExecutor) holder.executor;

Check warning on line 456 in server/src/main/java/org/opensearch/threadpool/ThreadPool.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/threadpool/ThreadPool.java#L456

Added line #L456 was not covered by tests
if (holder.info.type == ThreadPoolType.SCALING) {
if (scalingThreadPoolKeys.containsAll(tpGroup.keySet()) == false) {
throw new IllegalArgumentException(
"illegal thread_pool config : " + tpGroup.keySet() + " should only have " + scalingThreadPoolKeys

Check warning on line 460 in server/src/main/java/org/opensearch/threadpool/ThreadPool.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/threadpool/ThreadPool.java#L459-L460

Added lines #L459 - L460 were not covered by tests
);
}
int max = tpGroup.getAsInt("max", threadPoolExecutor.getMaximumPoolSize());
int core = tpGroup.getAsInt("core", threadPoolExecutor.getCorePoolSize());

Check warning on line 464 in server/src/main/java/org/opensearch/threadpool/ThreadPool.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/threadpool/ThreadPool.java#L463-L464

Added lines #L463 - L464 were not covered by tests
if (core < 1 || max < 1) {
throw new IllegalArgumentException("illegal value for [cluster.thread_pool." + tpName + "], has to be positive value");

Check warning on line 466 in server/src/main/java/org/opensearch/threadpool/ThreadPool.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/threadpool/ThreadPool.java#L466

Added line #L466 was not covered by tests
} else if (core > max) {
throw new IllegalArgumentException("core threadpool size cannot be greater than max");

Check warning on line 468 in server/src/main/java/org/opensearch/threadpool/ThreadPool.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/threadpool/ThreadPool.java#L468

Added line #L468 was not covered by tests
}
} else {

Check warning on line 470 in server/src/main/java/org/opensearch/threadpool/ThreadPool.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/threadpool/ThreadPool.java#L470

Added line #L470 was not covered by tests
if (fixedThreadPoolKeys.containsAll(tpGroup.keySet()) == false) {
throw new IllegalArgumentException(
"illegal thread_pool config : " + tpGroup.keySet() + " should only have " + fixedThreadPoolKeys

Check warning on line 473 in server/src/main/java/org/opensearch/threadpool/ThreadPool.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/threadpool/ThreadPool.java#L472-L473

Added lines #L472 - L473 were not covered by tests
);
}
int size = tpGroup.getAsInt("size", threadPoolExecutor.getMaximumPoolSize());

Check warning on line 476 in server/src/main/java/org/opensearch/threadpool/ThreadPool.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/threadpool/ThreadPool.java#L476

Added line #L476 was not covered by tests
if (size < 1) {
throw new IllegalArgumentException("illegal value for [cluster.thread_pool." + tpName + "], has to be positive value");

Check warning on line 478 in server/src/main/java/org/opensearch/threadpool/ThreadPool.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/threadpool/ThreadPool.java#L478

Added line #L478 was not covered by tests
}
}
}

Check warning on line 481 in server/src/main/java/org/opensearch/threadpool/ThreadPool.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/threadpool/ThreadPool.java#L481

Added line #L481 was not covered by tests
}

public void setThreadPool(Settings tpSettings) {
Map<String, Settings> tpGroups = tpSettings.getAsGroups();
for (Map.Entry<String, Settings> entry : tpGroups.entrySet()) {
String tpName = entry.getKey();
Settings tpGroup = entry.getValue();
ExecutorHolder holder = executors.get(tpName);
assert holder.executor instanceof OpenSearchThreadPoolExecutor;
OpenSearchThreadPoolExecutor executor = (OpenSearchThreadPoolExecutor) holder.executor;
if (holder.info.type == ThreadPoolType.SCALING) {
int max = tpGroup.getAsInt("max", executor.getMaximumPoolSize());
int core = tpGroup.getAsInt("core", executor.getCorePoolSize());
/*
If we are decreasing, core pool size has to be decreased first.
If we are increasing ,max pool size has to be increased first
This ensures that core pool is always smaller than max pool size .
Other wise IllegalArgumentException will be thrown from ThreadPoolExecutor
*/
if (core < executor.getCorePoolSize()) {
executor.setCorePoolSize(core);
executor.setMaximumPoolSize(max);
} else {
executor.setMaximumPoolSize(max);
executor.setCorePoolSize(core);
}
} else {
int size = tpGroup.getAsInt("size", executor.getMaximumPoolSize());
if (size < executor.getCorePoolSize()) {
executor.setCorePoolSize(size);
executor.setMaximumPoolSize(size);
} else {
executor.setMaximumPoolSize(size);
executor.setCorePoolSize(size);
}
}
}
}

public ThreadPoolStats stats() {
List<ThreadPoolStats.Stats> stats = new ArrayList<>();
for (ExecutorHolder holder : executors.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.FutureUtils;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.common.util.concurrent.OpenSearchThreadPoolExecutor;
import org.opensearch.test.OpenSearchTestCase;

import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -152,4 +153,47 @@ public void testInheritContextOnSchedule() throws InterruptedException {
terminate(threadPool);
}
}

public void testThreadPoolResize() {
TestThreadPool threadPool = new TestThreadPool("test");
try {
// increase it
Settings commonSettings = Settings.builder().put("snapshot.max", "10").put("snapshot.core", "2").put("get.size", "100").build();
threadPool.setThreadPool(commonSettings);
ExecutorService executorService = threadPool.executor("snapshot");
OpenSearchThreadPoolExecutor executor = (OpenSearchThreadPoolExecutor) executorService;
assertEquals(10, executor.getMaximumPoolSize());
assertEquals(2, executor.getCorePoolSize());

executorService = threadPool.executor("get");
executor = (OpenSearchThreadPoolExecutor) executorService;
assertEquals(100, executor.getMaximumPoolSize());
assertEquals(100, executor.getCorePoolSize());

// decrease it
commonSettings = Settings.builder().put("snapshot.max", "2").put("snapshot.core", "1").put("get.size", "90").build();
threadPool.setThreadPool(commonSettings);
executorService = threadPool.executor("snapshot");
executor = (OpenSearchThreadPoolExecutor) executorService;
assertEquals(2, executor.getMaximumPoolSize());
assertEquals(1, executor.getCorePoolSize());

executorService = threadPool.executor("get");
executor = (OpenSearchThreadPoolExecutor) executorService;
assertEquals(90, executor.getMaximumPoolSize());
assertEquals(90, executor.getCorePoolSize());
} finally {
terminate(threadPool);
}
}

public void testThreadPoolResizeFail() {
TestThreadPool threadPool = new TestThreadPool("test");
try {
Settings commonSettings = Settings.builder().put("snapshot.max", "50").put("snapshot.core", "100").build();
assertThrows(IllegalArgumentException.class, () -> threadPool.setThreadPool(commonSettings));
} finally {
terminate(threadPool);
}
}
}

0 comments on commit c2d8bcf

Please sign in to comment.