Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Bafna <[email protected]>
  • Loading branch information
gbbafna committed Oct 11, 2024
1 parent 600b360 commit 4bedb9c
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ public void testThreadPoolSettings() {
.get();
fail("bogus value");
} catch (IllegalArgumentException ex) {
assertEquals(ex.getCause().getMessage(), "illegal thread_pool config : [wrong] should only have max and core");
assertEquals(ex.getCause().getMessage(), "illegal thread_pool config : [wrong] should only have [core, max]");
}

// Scaling threadpool - core > max
Expand Down Expand Up @@ -449,7 +449,7 @@ public void testThreadPoolSettings() {
.get();
fail("bogus value");
} catch (IllegalArgumentException ex) {
assertEquals(ex.getCause().getMessage(), "illegal thread_pool config : [wrong] should only have size");
assertEquals(ex.getCause().getMessage(), "illegal thread_pool config : [wrong] should only have [size]");
}

// Fixed threadpool - 0 value
Expand Down
175 changes: 92 additions & 83 deletions server/src/main/java/org/opensearch/threadpool/ThreadPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ public static class Names {
public static final String REMOTE_STATE_CHECKSUM = "remote_state_checksum";
}

static Set<String> scalingThreadPoolKeys = new HashSet<>(Arrays.asList("max", "core"));
static Set<String> fixedThreadPoolKeys = new HashSet<>(Arrays.asList("size"));

/**
* The threadpool type.
*
Expand Down Expand Up @@ -238,89 +241,6 @@ public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBui
this(settings, null, customBuilders);
}

public void setThreadPool(Settings tpSettings) {
Map<String, Settings> tpGroups = tpSettings.getAsGroups();
for (Map.Entry<String, Settings> entry : tpGroups.entrySet()) {
String tpName = entry.getKey();
Settings tpGroup = entry.getValue();
ExecutorHolder holder = executors.get(tpName);
assert holder.executor instanceof OpenSearchThreadPoolExecutor;
OpenSearchThreadPoolExecutor o = (OpenSearchThreadPoolExecutor) holder.executor;
if (holder.info.type == ThreadPoolType.SCALING) {
int max = tpGroup.getAsInt("max", o.getMaximumPoolSize());
int core = tpGroup.getAsInt("core", o.getCorePoolSize());
/*
If we are decreasing, core pool size has to be decreased first.
If we are increasing ,max pool size has to be increased first
This ensures that core pool is always smaller than max pool size .
*/
if (core < o.getCorePoolSize()) {
o.setCorePoolSize(core);
o.setMaximumPoolSize(max);
} else {
o.setMaximumPoolSize(max);
o.setCorePoolSize(core);
}
} else {
int size = tpGroup.getAsInt("size", o.getMaximumPoolSize());
if (size < o.getCorePoolSize()) {
o.setCorePoolSize(size);
o.setMaximumPoolSize(size);
} else {
o.setMaximumPoolSize(size);
o.setCorePoolSize(size);
}
}
}
}

public void setClusterSettings(ClusterSettings clusterSettings) {
this.clusterSettings = clusterSettings;
this.clusterSettings.addSettingsUpdateConsumer(CLUSTER_THREAD_POOL_SIZE_SETTING, this::setThreadPool, this::validateSetting);
}

private void validateSetting(Settings tpSettings) {
Map<String, Settings> tpGroups = tpSettings.getAsGroups();
for (Map.Entry<String, Settings> entry : tpGroups.entrySet()) {
String tpName = entry.getKey();
if (THREAD_POOL_TYPES.get(tpName) == null) {
throw new IllegalArgumentException("illegal thread_pool name : " + tpName);
}
Settings tpGroup = entry.getValue();
ExecutorHolder holder = executors.get(tpName);
assert holder.executor instanceof OpenSearchThreadPoolExecutor;
OpenSearchThreadPoolExecutor threadPoolExecutor = (OpenSearchThreadPoolExecutor) holder.executor;
if (holder.info.type == ThreadPoolType.SCALING) {
Set<String> expectedKeys = new HashSet<>(Arrays.asList("max", "core"));
if (tpGroup.keySet().stream().allMatch(expectedKeys::contains) == false) {
throw new IllegalArgumentException(
"illegal thread_pool config : " + tpGroup.keySet() + " should only have max and core"
);
}
int max = tpGroup.getAsInt("max", threadPoolExecutor.getMaximumPoolSize());
int core = tpGroup.getAsInt("core", threadPoolExecutor.getCorePoolSize());
if (core < 1 || max < 1) {
throw new IllegalArgumentException("illegal value for [cluster.thread_pool." + tpName + "], has to be positive value");
} else if (core > max) {
throw new IllegalArgumentException("core threadpool size cannot be greater than max");
}
} else {
if (tpGroup.keySet().size() != 1) {
throw new IllegalArgumentException("illegal thread_pool config : " + tpGroup.keySet());
} else if (tpGroup.keySet().contains("size") == false) {
throw new IllegalArgumentException("illegal thread_pool config : " + tpGroup.keySet() + " should only have size");
} else {
int size = tpGroup.getAsInt("size", threadPoolExecutor.getMaximumPoolSize());
if (size < 1) {
throw new IllegalArgumentException(
"illegal value for [cluster.thread_pool." + tpName + "], has to be positive value"
);
}
}
}
}
}

public ThreadPool(
final Settings settings,
final AtomicReference<RunnableTaskExecutionListener> runnableTaskListener,
Expand Down Expand Up @@ -503,6 +423,95 @@ public Info info(String name) {
return holder.info;
}

public void setClusterSettings(ClusterSettings clusterSettings) {
this.clusterSettings = clusterSettings;
this.clusterSettings.addSettingsUpdateConsumer(CLUSTER_THREAD_POOL_SIZE_SETTING, this::setThreadPool, this::validateSetting);
}

/*
Scaling threadpool can provide only max and core
Fixed/ResizableQueue can provide only size
For example valid settings would be for scaling and fixed thead pool
cluster.threadpool.snapshot.max : "5",
cluster.threadpool.snapshot.core : "5",
cluster.threadpool.get.size : "2",
*/
private void validateSetting(Settings tpSettings) {
Map<String, Settings> tpGroups = tpSettings.getAsGroups();
for (Map.Entry<String, Settings> entry : tpGroups.entrySet()) {
String tpName = entry.getKey();
if (THREAD_POOL_TYPES.containsKey(tpName) == false) {
throw new IllegalArgumentException("illegal thread_pool name : " + tpName);
}
Settings tpGroup = entry.getValue();
ExecutorHolder holder = executors.get(tpName);
assert holder.executor instanceof OpenSearchThreadPoolExecutor;
OpenSearchThreadPoolExecutor threadPoolExecutor = (OpenSearchThreadPoolExecutor) holder.executor;
if (holder.info.type == ThreadPoolType.SCALING) {
if (scalingThreadPoolKeys.containsAll(tpGroup.keySet()) == false) {
throw new IllegalArgumentException(
"illegal thread_pool config : " + tpGroup.keySet() + " should only have " + scalingThreadPoolKeys
);
}
int max = tpGroup.getAsInt("max", threadPoolExecutor.getMaximumPoolSize());
int core = tpGroup.getAsInt("core", threadPoolExecutor.getCorePoolSize());
if (core < 1 || max < 1) {
throw new IllegalArgumentException("illegal value for [cluster.thread_pool." + tpName + "], has to be positive value");
} else if (core > max) {
throw new IllegalArgumentException("core threadpool size cannot be greater than max");
}
} else {
if (fixedThreadPoolKeys.containsAll(tpGroup.keySet()) == false) {
throw new IllegalArgumentException(
"illegal thread_pool config : " + tpGroup.keySet() + " should only have " + fixedThreadPoolKeys
);
}
int size = tpGroup.getAsInt("size", threadPoolExecutor.getMaximumPoolSize());
if (size < 1) {
throw new IllegalArgumentException("illegal value for [cluster.thread_pool." + tpName + "], has to be positive value");
}
}
}
}

public void setThreadPool(Settings tpSettings) {
Map<String, Settings> tpGroups = tpSettings.getAsGroups();
for (Map.Entry<String, Settings> entry : tpGroups.entrySet()) {
String tpName = entry.getKey();
Settings tpGroup = entry.getValue();
ExecutorHolder holder = executors.get(tpName);
assert holder.executor instanceof OpenSearchThreadPoolExecutor;
OpenSearchThreadPoolExecutor executor = (OpenSearchThreadPoolExecutor) holder.executor;
if (holder.info.type == ThreadPoolType.SCALING) {
int max = tpGroup.getAsInt("max", executor.getMaximumPoolSize());
int core = tpGroup.getAsInt("core", executor.getCorePoolSize());
/*
If we are decreasing, core pool size has to be decreased first.
If we are increasing ,max pool size has to be increased first
This ensures that core pool is always smaller than max pool size .
Other wise IllegalArgumentException will be thrown from ThreadPoolExecutor
*/
if (core < executor.getCorePoolSize()) {
executor.setCorePoolSize(core);
executor.setMaximumPoolSize(max);
} else {
executor.setMaximumPoolSize(max);
executor.setCorePoolSize(core);
}
} else {
int size = tpGroup.getAsInt("size", executor.getMaximumPoolSize());
if (size < executor.getCorePoolSize()) {
executor.setCorePoolSize(size);
executor.setMaximumPoolSize(size);
} else {
executor.setMaximumPoolSize(size);
executor.setCorePoolSize(size);
}
}
}
}

public ThreadPoolStats stats() {
List<ThreadPoolStats.Stats> stats = new ArrayList<>();
for (ExecutorHolder holder : executors.values()) {
Expand Down

0 comments on commit 4bedb9c

Please sign in to comment.