Skip to content

Commit

Permalink
Change thread pool to a ScalingExecutorBuilder (#421)
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis authored Jan 19, 2024
1 parent bf2cad0 commit 789fc4d
Show file tree
Hide file tree
Showing 8 changed files with 33 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsFilter;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
Expand Down Expand Up @@ -64,7 +65,7 @@
import org.opensearch.rest.RestHandler;
import org.opensearch.script.ScriptService;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.FixedExecutorBuilder;
import org.opensearch.threadpool.ScalingExecutorBuilder;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.watcher.ResourceWatcherService;

Expand Down Expand Up @@ -176,11 +177,11 @@ public List<Setting<?>> getSettings() {
@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
return List.of(
new FixedExecutorBuilder(
settings,
new ScalingExecutorBuilder(
WORKFLOW_THREAD_POOL,
1,
OpenSearchExecutors.allocatedProcessors(settings),
100,
TimeValue.timeValueMinutes(5),
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.opensearch.flowframework.workflow.WorkflowStepFactory;
import org.opensearch.tasks.Task;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.FixedExecutorBuilder;
import org.opensearch.threadpool.ScalingExecutorBuilder;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;
Expand Down Expand Up @@ -56,11 +56,11 @@ public class DeprovisionWorkflowTransportActionTests extends OpenSearchTestCase

private static ThreadPool threadPool = new TestThreadPool(
DeprovisionWorkflowTransportActionTests.class.getName(),
new FixedExecutorBuilder(
Settings.EMPTY,
new ScalingExecutorBuilder(
WORKFLOW_THREAD_POOL,
1,
OpenSearchExecutors.allocatedProcessors(Settings.EMPTY),
100,
TimeValue.timeValueMinutes(5),
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.opensearch.ml.common.MLTaskType;
import org.opensearch.ml.common.transport.deploy.MLDeployModelResponse;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.FixedExecutorBuilder;
import org.opensearch.threadpool.ScalingExecutorBuilder;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.junit.AfterClass;
Expand Down Expand Up @@ -82,11 +82,11 @@ public void setUp() throws Exception {

testThreadPool = new TestThreadPool(
DeployModelStepTests.class.getName(),
new FixedExecutorBuilder(
Settings.EMPTY,
new ScalingExecutorBuilder(
WORKFLOW_THREAD_POOL,
1,
OpenSearchExecutors.allocatedProcessors(Settings.EMPTY),
100,
TimeValue.timeValueMinutes(5),
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.FixedExecutorBuilder;
import org.opensearch.threadpool.ScalingExecutorBuilder;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.junit.AfterClass;
Expand Down Expand Up @@ -42,11 +42,11 @@ public class ProcessNodeTests extends OpenSearchTestCase {
public static void setup() {
testThreadPool = new TestThreadPool(
ProcessNodeTests.class.getName(),
new FixedExecutorBuilder(
Settings.EMPTY,
new ScalingExecutorBuilder(
WORKFLOW_THREAD_POOL,
1,
OpenSearchExecutors.allocatedProcessors(Settings.EMPTY),
100,
TimeValue.timeValueMinutes(5),
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.opensearch.ml.common.transport.register.MLRegisterModelInput;
import org.opensearch.ml.common.transport.register.MLRegisterModelResponse;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.FixedExecutorBuilder;
import org.opensearch.threadpool.ScalingExecutorBuilder;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.junit.AfterClass;
Expand Down Expand Up @@ -78,11 +78,11 @@ public void setUp() throws Exception {

testThreadPool = new TestThreadPool(
RegisterLocalCustomModelStepTests.class.getName(),
new FixedExecutorBuilder(
Settings.EMPTY,
new ScalingExecutorBuilder(
WORKFLOW_THREAD_POOL,
1,
OpenSearchExecutors.allocatedProcessors(Settings.EMPTY),
100,
TimeValue.timeValueMinutes(5),
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.opensearch.ml.common.transport.register.MLRegisterModelInput;
import org.opensearch.ml.common.transport.register.MLRegisterModelResponse;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.FixedExecutorBuilder;
import org.opensearch.threadpool.ScalingExecutorBuilder;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.junit.AfterClass;
Expand Down Expand Up @@ -78,11 +78,11 @@ public void setUp() throws Exception {

testThreadPool = new TestThreadPool(
RegisterLocalCustomModelStepTests.class.getName(),
new FixedExecutorBuilder(
Settings.EMPTY,
new ScalingExecutorBuilder(
WORKFLOW_THREAD_POOL,
1,
OpenSearchExecutors.allocatedProcessors(Settings.EMPTY),
100,
TimeValue.timeValueMinutes(5),
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.opensearch.ml.common.transport.register.MLRegisterModelInput;
import org.opensearch.ml.common.transport.register.MLRegisterModelResponse;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.FixedExecutorBuilder;
import org.opensearch.threadpool.ScalingExecutorBuilder;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.junit.AfterClass;
Expand Down Expand Up @@ -78,11 +78,11 @@ public void setUp() throws Exception {

testThreadPool = new TestThreadPool(
RegisterLocalCustomModelStepTests.class.getName(),
new FixedExecutorBuilder(
Settings.EMPTY,
new ScalingExecutorBuilder(
WORKFLOW_THREAD_POOL,
1,
OpenSearchExecutors.allocatedProcessors(Settings.EMPTY),
100,
TimeValue.timeValueMinutes(5),
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.opensearch.flowframework.model.WorkflowValidator;
import org.opensearch.ml.client.MachineLearningNodeClient;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.FixedExecutorBuilder;
import org.opensearch.threadpool.ScalingExecutorBuilder;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.junit.AfterClass;
Expand Down Expand Up @@ -111,11 +111,11 @@ public static void setup() throws IOException {

testThreadPool = new TestThreadPool(
WorkflowProcessSorterTests.class.getName(),
new FixedExecutorBuilder(
Settings.EMPTY,
new ScalingExecutorBuilder(
WORKFLOW_THREAD_POOL,
1,
OpenSearchExecutors.allocatedProcessors(Settings.EMPTY),
100,
TimeValue.timeValueMinutes(5),
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL
)
);
Expand Down

0 comments on commit 789fc4d

Please sign in to comment.