Skip to content

Commit

Permalink
Add support to dynamically resize threadpools size
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Bafna <[email protected]>
  • Loading branch information
gbbafna committed Oct 8, 2024
1 parent 8df3a51 commit 682ae0d
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add _list/indices API as paginated alternate to _cat/indices ([#14718](https://github.com/opensearch-project/OpenSearch/pull/14718))
- Add success and failure metrics for async shard fetch ([#15976](https://github.com/opensearch-project/OpenSearch/pull/15976))
- Add new metric REMOTE_STORE to NodeStats API response ([#15611](https://github.com/opensearch-project/OpenSearch/pull/15611))
- Add support to dynamically resize threadpools size.
- [S3 Repository] Change default retry mechanism of s3 clients to Standard Mode ([#15978](https://github.com/opensearch-project/OpenSearch/pull/15978))
- Add changes to block calls in cat shards, indices and segments based on dynamic limit settings ([#15986](https://github.com/opensearch-project/OpenSearch/pull/15986))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,35 @@ public void testMissingUnits() {
}
}

public void testThreadPoolSettings() {
String key1 = "cluster.thread_pool.snapshot.max";
Settings transientSettings = Settings.builder().put(key1, "-1").build();

String key2 = "cluster.thread_pool.snapshot.max";
Settings persistentSettings = Settings.builder().put(key2, "5").build();

try {
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(transientSettings)
.setPersistentSettings(persistentSettings)
.get();
fail("bogus value");
} catch (IllegalArgumentException ex) {
assertEquals(ex.getMessage(), "illegal value for [cluster.thread_pool.snapshot], has to be positive value");
}

transientSettings = Settings.builder().put(key1, "1").build();
persistentSettings = Settings.builder().put(key2, "5").build();
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(transientSettings)
.setPersistentSettings(persistentSettings)
.get();
}

public void testLoggerLevelUpdate() {
assertAcked(prepareCreate("test"));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -804,8 +804,8 @@ public void apply(Settings value, Settings current, Settings previous) {
ResponseLimitSettings.CAT_SHARDS_RESPONSE_LIMIT_SETTING,
ResponseLimitSettings.CAT_SEGMENTS_RESPONSE_LIMIT_SETTING,

//Thread pool
ThreadPool.THREADPOOL_SNAPSHOT_SETTING
// Thread pool Settings
ThreadPool.CLUSTER_THREAD_POOL_SIZE_SETTING
)
)
);
Expand Down
71 changes: 53 additions & 18 deletions server/src/main/java/org/opensearch/threadpool/ThreadPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,6 @@ public static ThreadPoolType fromType(String type) {

private ClusterSettings clusterSettings = null;


public Collection<ExecutorBuilder> builders() {
return Collections.unmodifiableCollection(builders.values());
}
Expand All @@ -226,32 +225,68 @@ public Collection<ExecutorBuilder> builders() {
Setting.Property.NodeScope
);

public static final Setting<Settings> CLUSTER_THREAD_POOL_SIZE_SETTING = Setting.groupSetting("cluster.thread_pool.", (tpSettings) -> {
Map<String, Settings> tpGroups = tpSettings.getAsGroups();
for (Map.Entry<String, Settings> entry : tpGroups.entrySet()) {
String tpName = entry.getKey();
Settings tpGroup = entry.getValue();
int max = tpGroup.getAsInt("max", 1);
int core = tpGroup.getAsInt("core", 1);
int size = tpGroup.getAsInt("size", 1);
if (max <= 0 || core <= 0 || size <= 0) {
throw new IllegalArgumentException("illegal value for [cluster.thread_pool." + tpName + "], has to be positive value");
}
}
},
Setting.Property.Dynamic,
Setting.Property.NodeScope

);

public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBuilders) {
this(settings, null, customBuilders);
}

public static final Setting<Integer> THREADPOOL_SNAPSHOT_SETTING = Setting.intSetting(
"cluster.thread_pool.snapshot",
-1,
-1,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public void setSnapshotThread(int snapshotThread) {
OpenSearchThreadPoolExecutor o = (OpenSearchThreadPoolExecutor) this.executors.get(Names.SNAPSHOT).executor;
if (snapshotThread != -1) {
o.setCorePoolSize(snapshotThread);
o.setMaximumPoolSize(snapshotThread);
public void setThreadPool(Settings tpSettings) {
Map<String, Settings> tpGroups = tpSettings.getAsGroups();
for (Map.Entry<String, Settings> entry : tpGroups.entrySet()) {
String tpName = entry.getKey();
Settings tpGroup = entry.getValue();
ExecutorHolder holder = executors.get(tpName);
assert holder.executor instanceof OpenSearchThreadPoolExecutor;
OpenSearchThreadPoolExecutor o = (OpenSearchThreadPoolExecutor) holder.executor;
if (holder.info.type == ThreadPoolType.SCALING) {
int max = tpGroup.getAsInt("max", o.getMaximumPoolSize());
int core = tpGroup.getAsInt("core", o.getCorePoolSize());
if (core > max) {
// Can we do better than silently ignoring this as this can't be caught in static validation ?
logger.error("Thread pool {} core {} is higher than maximum value {}. Ignoring it", tpName, core, max);
continue;
}
// Below check makes sure we adhere to the constraint that cores <= max at all the time.
if (core < o.getCorePoolSize()) {
o.setCorePoolSize(core);
o.setMaximumPoolSize(max);
} else {
o.setMaximumPoolSize(max);
o.setCorePoolSize(core);
}
} else {
int size = tpGroup.getAsInt("size", o.getMaximumPoolSize());
if (size < o.getCorePoolSize()) {
o.setCorePoolSize(size);
o.setMaximumPoolSize(size);
} else {
o.setMaximumPoolSize(size);
o.setCorePoolSize(size);
}
}
}
}

public void setClusterSettings(ClusterSettings clusterSettings) {
this.clusterSettings = clusterSettings;
this.clusterSettings.addSettingsUpdateConsumer(
THREADPOOL_SNAPSHOT_SETTING,
this::setSnapshotThread
);
this.clusterSettings.addSettingsUpdateConsumer(CLUSTER_THREAD_POOL_SIZE_SETTING, this::setThreadPool);
}

public ThreadPool(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.FutureUtils;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.common.util.concurrent.OpenSearchThreadPoolExecutor;
import org.opensearch.test.OpenSearchTestCase;

import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -152,4 +153,51 @@ public void testInheritContextOnSchedule() throws InterruptedException {
terminate(threadPool);
}
}

public void testThreadPoolResize() {
TestThreadPool threadPool = new TestThreadPool("test");
try {
// increase it
Settings commonSettings = Settings.builder().put("snapshot.max", "10").put("snapshot.core", "2").put("get.size", "100").build();
threadPool.setThreadPool(commonSettings);
ExecutorService executorService = threadPool.executor("snapshot");
OpenSearchThreadPoolExecutor executor = (OpenSearchThreadPoolExecutor) executorService;
assertEquals(10, executor.getMaximumPoolSize());
assertEquals(2, executor.getCorePoolSize());

executorService = threadPool.executor("get");
executor = (OpenSearchThreadPoolExecutor) executorService;
assertEquals(100, executor.getMaximumPoolSize());
assertEquals(100, executor.getCorePoolSize());

// decrease it
commonSettings = Settings.builder().put("snapshot.max", "2").put("snapshot.core", "1").put("get.size", "90").build();
threadPool.setThreadPool(commonSettings);
executorService = threadPool.executor("snapshot");
executor = (OpenSearchThreadPoolExecutor) executorService;
assertEquals(2, executor.getMaximumPoolSize());
assertEquals(1, executor.getCorePoolSize());

executorService = threadPool.executor("get");
executor = (OpenSearchThreadPoolExecutor) executorService;
assertEquals(90, executor.getMaximumPoolSize());
assertEquals(90, executor.getCorePoolSize());
} finally {
terminate(threadPool);
}
}

public void testThreadPoolResizeFail() {
TestThreadPool threadPool = new TestThreadPool("test");
try {
Settings commonSettings = Settings.builder().put("snapshot.max", "50").put("snapshot.core", "100").build();
threadPool.setThreadPool(commonSettings);
ExecutorService executorService = threadPool.executor("snapshot");
OpenSearchThreadPoolExecutor executor = (OpenSearchThreadPoolExecutor) executorService;
assertNotEquals(50, executor.getMaximumPoolSize());
assertNotEquals(100, executor.getCorePoolSize());
} finally {
terminate(threadPool);
}
}
}

0 comments on commit 682ae0d

Please sign in to comment.