From ee91ec699edcaf02be9445080078a22d005653df Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Fri, 5 Jan 2024 23:26:37 +0000 Subject: [PATCH] Use provisioning thread pool in Process Node (#374) (cherry picked from commit 7419e9fda085ced8b78185a509b2714ea4d3dd11) Signed-off-by: github-actions[bot] --- .../flowframework/FlowFrameworkPlugin.java | 7 +++--- .../flowframework/common/CommonValue.java | 2 +- .../ProvisionWorkflowTransportAction.java | 4 ++-- .../AbstractRetryableWorkflowStep.java | 4 ++-- .../flowframework/workflow/ProcessNode.java | 11 ++++----- ...provisionWorkflowTransportActionTests.java | 15 +++++++++++- .../workflow/DeployModelStepTests.java | 6 ++--- .../workflow/ProcessNodeTests.java | 24 +++++++++++++------ .../workflow/RegisterLocalModelStepTests.java | 6 ++--- .../workflow/WorkflowProcessSorterTests.java | 6 ++--- 10 files changed, 53 insertions(+), 32 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java index 5cab4e17c..48a3f51bc 100644 --- a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java +++ b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java @@ -70,7 +70,7 @@ import java.util.function.Supplier; import static org.opensearch.flowframework.common.CommonValue.FLOW_FRAMEWORK_THREAD_POOL_PREFIX; -import static org.opensearch.flowframework.common.CommonValue.PROVISION_THREAD_POOL; +import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL; import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED; import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_GET_TASK_REQUEST_RETRY; import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOWS; @@ -173,14 +173,13 @@ public List> getSettings() { @Override public List> getExecutorBuilders(Settings settings) { - // TODO : Determine final size/queueSize values for the provision thread pool return List.of( new FixedExecutorBuilder( settings, - PROVISION_THREAD_POOL, + WORKFLOW_THREAD_POOL, OpenSearchExecutors.allocatedProcessors(settings), 100, - FLOW_FRAMEWORK_THREAD_POOL_PREFIX + PROVISION_THREAD_POOL + FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL ) ); } diff --git a/src/main/java/org/opensearch/flowframework/common/CommonValue.java b/src/main/java/org/opensearch/flowframework/common/CommonValue.java index b4955de15..ed3eb0395 100644 --- a/src/main/java/org/opensearch/flowframework/common/CommonValue.java +++ b/src/main/java/org/opensearch/flowframework/common/CommonValue.java @@ -71,7 +71,7 @@ private CommonValue() {} /** Flow Framework plugin thread pool name prefix */ public static final String FLOW_FRAMEWORK_THREAD_POOL_PREFIX = "thread_pool.flow_framework."; /** The provision workflow thread pool name */ - public static final String PROVISION_THREAD_POOL = "opensearch_workflow_provision"; + public static final String WORKFLOW_THREAD_POOL = "opensearch_workflow"; /* * Field names common to multiple classes diff --git a/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java index 45ece281e..f625aafb6 100644 --- a/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java @@ -48,10 +48,10 @@ import static org.opensearch.flowframework.common.CommonValue.PROVISIONING_PROGRESS_FIELD; import static org.opensearch.flowframework.common.CommonValue.PROVISION_END_TIME_FIELD; import static org.opensearch.flowframework.common.CommonValue.PROVISION_START_TIME_FIELD; -import static org.opensearch.flowframework.common.CommonValue.PROVISION_THREAD_POOL; import static org.opensearch.flowframework.common.CommonValue.PROVISION_WORKFLOW; import static org.opensearch.flowframework.common.CommonValue.RESOURCES_CREATED_FIELD; import static org.opensearch.flowframework.common.CommonValue.STATE_FIELD; +import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL; /** * Transport Action to provision a workflow from a stored use case template @@ -169,7 +169,7 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener workflowSequence, ActionListener listener) { try { - threadPool.executor(PROVISION_THREAD_POOL).execute(() -> { executeWorkflow(workflowSequence, workflowId); }); + threadPool.executor(WORKFLOW_THREAD_POOL).execute(() -> { executeWorkflow(workflowSequence, workflowId); }); } catch (Exception exception) { listener.onFailure(new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))); } diff --git a/src/main/java/org/opensearch/flowframework/workflow/AbstractRetryableWorkflowStep.java b/src/main/java/org/opensearch/flowframework/workflow/AbstractRetryableWorkflowStep.java index 5f2c118e5..fa2512c70 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/AbstractRetryableWorkflowStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/AbstractRetryableWorkflowStep.java @@ -24,7 +24,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; -import static org.opensearch.flowframework.common.CommonValue.PROVISION_THREAD_POOL; +import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL; import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep; /** @@ -136,7 +136,7 @@ protected void retryableGetMlTask( logger.error(errorMessage); mlTaskListener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.REQUEST_TIMEOUT)); } - }, threadPool.executor(PROVISION_THREAD_POOL)); + }, threadPool.executor(WORKFLOW_THREAD_POOL)); } /** diff --git a/src/main/java/org/opensearch/flowframework/workflow/ProcessNode.java b/src/main/java/org/opensearch/flowframework/workflow/ProcessNode.java index c6bdcc6a5..c4fd4c6fa 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/ProcessNode.java +++ b/src/main/java/org/opensearch/flowframework/workflow/ProcessNode.java @@ -21,6 +21,8 @@ import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; +import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL; + /** * Representation of a process node in a workflow graph. * Tracks predecessor nodes which must be completed before it can start execution. @@ -180,13 +182,10 @@ public CompletableFuture execute() { delayExec.cancel(); } logger.info("Finished {}.", this.id); - } catch (Throwable e) { - // TODO: better handling of getCause - this.future.completeExceptionally(e); + } catch (Throwable t) { + this.future.completeExceptionally(t.getCause() == null ? t : t.getCause()); } - // TODO: improve use of thread pool beyond generic - // https://github.com/opensearch-project/flow-framework/issues/61 - }, threadPool.generic()); + }, threadPool.executor(WORKFLOW_THREAD_POOL)); return this.future; } diff --git a/src/test/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportActionTests.java b/src/test/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportActionTests.java index 0369807ef..fa83264ca 100644 --- a/src/test/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportActionTests.java +++ b/src/test/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportActionTests.java @@ -12,6 +12,7 @@ import org.opensearch.client.Client; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.core.action.ActionListener; import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; @@ -25,6 +26,7 @@ import org.opensearch.ml.client.MachineLearningNodeClient; import org.opensearch.tasks.Task; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.FixedExecutorBuilder; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; @@ -38,6 +40,8 @@ import org.mockito.ArgumentCaptor; +import static org.opensearch.flowframework.common.CommonValue.FLOW_FRAMEWORK_THREAD_POOL_PREFIX; +import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL; import static org.opensearch.flowframework.common.WorkflowResources.CONNECTOR_ID; import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID; import static org.mockito.ArgumentMatchers.any; @@ -51,7 +55,16 @@ public class DeprovisionWorkflowTransportActionTests extends OpenSearchTestCase { - private static ThreadPool threadPool = new TestThreadPool(DeprovisionWorkflowTransportActionTests.class.getName()); + private static ThreadPool threadPool = new TestThreadPool( + DeprovisionWorkflowTransportActionTests.class.getName(), + new FixedExecutorBuilder( + Settings.EMPTY, + WORKFLOW_THREAD_POOL, + OpenSearchExecutors.allocatedProcessors(Settings.EMPTY), + 100, + FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL + ) + ); private Client client; private WorkflowStepFactory workflowStepFactory; private DeleteConnectorStep deleteConnectorStep; diff --git a/src/test/java/org/opensearch/flowframework/workflow/DeployModelStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/DeployModelStepTests.java index bc174a849..90f2afc8d 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/DeployModelStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/DeployModelStepTests.java @@ -49,9 +49,9 @@ import static org.opensearch.action.DocWriteResponse.Result.UPDATED; import static org.opensearch.flowframework.common.CommonValue.FLOW_FRAMEWORK_THREAD_POOL_PREFIX; -import static org.opensearch.flowframework.common.CommonValue.PROVISION_THREAD_POOL; import static org.opensearch.flowframework.common.CommonValue.REGISTER_MODEL_STATUS; import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX; +import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL; import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_GET_TASK_REQUEST_RETRY; import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID; import static org.mockito.ArgumentMatchers.any; @@ -101,10 +101,10 @@ public void setUp() throws Exception { DeployModelStepTests.class.getName(), new FixedExecutorBuilder( Settings.EMPTY, - PROVISION_THREAD_POOL, + WORKFLOW_THREAD_POOL, OpenSearchExecutors.allocatedProcessors(Settings.EMPTY), 100, - FLOW_FRAMEWORK_THREAD_POOL_PREFIX + PROVISION_THREAD_POOL + FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL ) ); this.deployModel = new DeployModelStep( diff --git a/src/test/java/org/opensearch/flowframework/workflow/ProcessNodeTests.java b/src/test/java/org/opensearch/flowframework/workflow/ProcessNodeTests.java index 9a5e027ca..56a8e715e 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/ProcessNodeTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/ProcessNodeTests.java @@ -8,8 +8,11 @@ */ package org.opensearch.flowframework.workflow; +import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.FixedExecutorBuilder; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.junit.AfterClass; @@ -24,6 +27,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import static org.opensearch.flowframework.common.CommonValue.FLOW_FRAMEWORK_THREAD_POOL_PREFIX; +import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -35,7 +40,16 @@ public class ProcessNodeTests extends OpenSearchTestCase { @BeforeClass public static void setup() { - testThreadPool = new TestThreadPool(ProcessNodeTests.class.getName()); + testThreadPool = new TestThreadPool( + ProcessNodeTests.class.getName(), + new FixedExecutorBuilder( + Settings.EMPTY, + WORKFLOW_THREAD_POOL, + OpenSearchExecutors.allocatedProcessors(Settings.EMPTY), + 100, + FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL + ) + ); CompletableFuture successfulFuture = new CompletableFuture<>(); successfulFuture.complete(WorkflowData.EMPTY); @@ -104,11 +118,7 @@ public CompletableFuture execute( Map previousNodeInputs ) { CompletableFuture future = new CompletableFuture<>(); - testThreadPool.schedule( - () -> future.complete(WorkflowData.EMPTY), - TimeValue.timeValueMillis(100), - ThreadPool.Names.GENERIC - ); + testThreadPool.schedule(() -> future.complete(WorkflowData.EMPTY), TimeValue.timeValueMillis(100), WORKFLOW_THREAD_POOL); return future; } @@ -139,7 +149,7 @@ public CompletableFuture execute( Map previousNodeInputs ) { CompletableFuture future = new CompletableFuture<>(); - testThreadPool.schedule(() -> future.complete(WorkflowData.EMPTY), TimeValue.timeValueMinutes(1), ThreadPool.Names.GENERIC); + testThreadPool.schedule(() -> future.complete(WorkflowData.EMPTY), TimeValue.timeValueMinutes(1), WORKFLOW_THREAD_POOL); return future; } diff --git a/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalModelStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalModelStepTests.java index 4c80d8f3a..1a604df46 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalModelStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalModelStepTests.java @@ -46,9 +46,9 @@ import static org.opensearch.action.DocWriteResponse.Result.UPDATED; import static org.opensearch.flowframework.common.CommonValue.FLOW_FRAMEWORK_THREAD_POOL_PREFIX; -import static org.opensearch.flowframework.common.CommonValue.PROVISION_THREAD_POOL; import static org.opensearch.flowframework.common.CommonValue.REGISTER_MODEL_STATUS; import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX; +import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL; import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_GET_TASK_REQUEST_RETRY; import static org.opensearch.flowframework.common.WorkflowResources.MODEL_GROUP_ID; import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID; @@ -96,10 +96,10 @@ public void setUp() throws Exception { RegisterLocalModelStepTests.class.getName(), new FixedExecutorBuilder( Settings.EMPTY, - PROVISION_THREAD_POOL, + WORKFLOW_THREAD_POOL, OpenSearchExecutors.allocatedProcessors(Settings.EMPTY), 100, - FLOW_FRAMEWORK_THREAD_POOL_PREFIX + PROVISION_THREAD_POOL + FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL ) ); this.registerLocalModelStep = new RegisterLocalModelStep( diff --git a/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java b/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java index cb82a1865..ccf47ac70 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java @@ -44,7 +44,7 @@ import java.util.stream.Stream; import static org.opensearch.flowframework.common.CommonValue.FLOW_FRAMEWORK_THREAD_POOL_PREFIX; -import static org.opensearch.flowframework.common.CommonValue.PROVISION_THREAD_POOL; +import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL; import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED; import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_GET_TASK_REQUEST_RETRY; import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOWS; @@ -113,10 +113,10 @@ public static void setup() throws IOException { WorkflowProcessSorterTests.class.getName(), new FixedExecutorBuilder( Settings.EMPTY, - PROVISION_THREAD_POOL, + WORKFLOW_THREAD_POOL, OpenSearchExecutors.allocatedProcessors(Settings.EMPTY), 100, - FLOW_FRAMEWORK_THREAD_POOL_PREFIX + PROVISION_THREAD_POOL + FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL ) ); WorkflowStepFactory factory = new WorkflowStepFactory(