From bf2cad068996c4aee9f38352ef3dab4a04c1823b Mon Sep 17 00:00:00 2001 From: zane-neo Date: Thu, 18 Jan 2024 05:13:55 +0800 Subject: [PATCH 1/2] Fix thread hang in flow framework issue (#413) * Fix hang in flow framework Signed-off-by: zane-neo * Fix spotless Signed-off-by: Daniel Widdis * Fix tests to account for async delay Signed-off-by: Daniel Widdis * Update GHA version for security tests Signed-off-by: Daniel Widdis --------- Signed-off-by: zane-neo Signed-off-by: Daniel Widdis Co-authored-by: Daniel Widdis --- .github/workflows/test_security.yml | 12 ++--- .../DeprovisionWorkflowTransportAction.java | 4 +- ...provisionWorkflowTransportActionTests.java | 48 +++++++++---------- 3 files changed, 31 insertions(+), 33 deletions(-) diff --git a/.github/workflows/test_security.yml b/.github/workflows/test_security.yml index c18b2a11a..fafcec0fa 100644 --- a/.github/workflows/test_security.yml +++ b/.github/workflows/test_security.yml @@ -1,11 +1,11 @@ name: Security test workflow for Flow Framework on: push: - branches: - - "*" + branches-ignore: + - 'whitesource-remediate/**' + - 'backport/**' pull_request: - branches: - - "*" + types: [opened, synchronize, reopened] jobs: Get-CI-Image-Tag: @@ -30,9 +30,9 @@ jobs: steps: - name: Checkout Flow Framework - uses: actions/checkout@v1 + uses: actions/checkout@v3 - name: Setup Java ${{ matrix.java }} - uses: actions/setup-java@v1 + uses: actions/setup-java@v3 with: distribution: 'temurin' java-version: ${{ matrix.java }} diff --git a/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java index b2a5e5028..e026053e1 100644 --- a/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java @@ -46,6 +46,7 @@ import static org.opensearch.flowframework.common.CommonValue.PROVISION_START_TIME_FIELD; import static org.opensearch.flowframework.common.CommonValue.RESOURCES_CREATED_FIELD; import static org.opensearch.flowframework.common.CommonValue.STATE_FIELD; +import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL; import static org.opensearch.flowframework.common.WorkflowResources.getDeprovisionStepByWorkflowStep; import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep; @@ -101,7 +102,8 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener executeDeprovisionSequence(workflowId, response.getWorkflowState().resourcesCreated(), listener)); }, exception -> { String message = "Failed to get workflow state for workflow " + workflowId; logger.error(message, exception); diff --git a/src/test/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportActionTests.java b/src/test/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportActionTests.java index 6259394f2..7841ed193 100644 --- a/src/test/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportActionTests.java +++ b/src/test/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportActionTests.java @@ -8,6 +8,7 @@ */ package org.opensearch.flowframework.transport; +import org.opensearch.action.LatchedActionListener; import org.opensearch.action.support.ActionFilters; import org.opensearch.client.Client; import org.opensearch.common.settings.Settings; @@ -19,12 +20,9 @@ import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; import org.opensearch.flowframework.model.ResourceCreated; import org.opensearch.flowframework.model.WorkflowState; -import org.opensearch.flowframework.workflow.CreateConnectorStep; import org.opensearch.flowframework.workflow.DeleteConnectorStep; -import org.opensearch.flowframework.workflow.ProcessNode; import org.opensearch.flowframework.workflow.WorkflowData; import org.opensearch.flowframework.workflow.WorkflowStepFactory; -import org.opensearch.ml.client.MachineLearningNodeClient; import org.opensearch.tasks.Task; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.FixedExecutorBuilder; @@ -33,10 +31,9 @@ import org.opensearch.transport.TransportService; import org.junit.AfterClass; -import java.io.IOException; -import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.mockito.ArgumentCaptor; @@ -50,6 +47,7 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -77,7 +75,15 @@ public class DeprovisionWorkflowTransportActionTests extends OpenSearchTestCase public void setUp() throws Exception { super.setUp(); this.client = mock(Client.class); + ThreadPool clientThreadPool = spy(threadPool); + when(client.threadPool()).thenReturn(clientThreadPool); + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + when(clientThreadPool.getThreadContext()).thenReturn(threadContext); + this.workflowStepFactory = mock(WorkflowStepFactory.class); + this.deleteConnectorStep = mock(DeleteConnectorStep.class); + when(this.workflowStepFactory.createStep("delete_connector")).thenReturn(deleteConnectorStep); + this.flowFrameworkIndicesHandler = mock(FlowFrameworkIndicesHandler.class); flowFrameworkSettings = mock(FlowFrameworkSettings.class); when(flowFrameworkSettings.getRequestTimeout()).thenReturn(TimeValue.timeValueSeconds(10)); @@ -85,28 +91,12 @@ public void setUp() throws Exception { this.deprovisionWorkflowTransportAction = new DeprovisionWorkflowTransportAction( mock(TransportService.class), mock(ActionFilters.class), - threadPool, + clientThreadPool, client, workflowStepFactory, flowFrameworkIndicesHandler, flowFrameworkSettings ); - - MachineLearningNodeClient mlClient = new MachineLearningNodeClient(client); - ProcessNode processNode = mock(ProcessNode.class); - when(processNode.id()).thenReturn("step_1"); - when(processNode.workflowStep()).thenReturn(new CreateConnectorStep(mlClient, flowFrameworkIndicesHandler)); - when(processNode.previousNodeInputs()).thenReturn(Collections.emptyMap()); - when(processNode.input()).thenReturn(WorkflowData.EMPTY); - when(processNode.nodeTimeout()).thenReturn(TimeValue.timeValueSeconds(5)); - this.deleteConnectorStep = mock(DeleteConnectorStep.class); - when(this.workflowStepFactory.createStep("delete_connector")).thenReturn(deleteConnectorStep); - - ThreadPool clientThreadPool = mock(ThreadPool.class); - ThreadContext threadContext = new ThreadContext(Settings.EMPTY); - - when(client.threadPool()).thenReturn(clientThreadPool); - when(clientThreadPool.getThreadContext()).thenReturn(threadContext); } @AfterClass @@ -114,10 +104,12 @@ public static void cleanup() { ThreadPool.terminate(threadPool, 500, TimeUnit.MILLISECONDS); } - public void testDeprovisionWorkflow() throws IOException { + public void testDeprovisionWorkflow() throws Exception { String workflowId = "1"; + + CountDownLatch latch = new CountDownLatch(1); @SuppressWarnings("unchecked") - ActionListener listener = mock(ActionListener.class); + ActionListener listener = spy(new LatchedActionListener(mock(ActionListener.class), latch)); WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null); doAnswer(invocation -> { @@ -137,14 +129,17 @@ public void testDeprovisionWorkflow() throws IOException { deprovisionWorkflowTransportAction.doExecute(mock(Task.class), workflowRequest, listener); ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(WorkflowResponse.class); + latch.await(5, TimeUnit.SECONDS); verify(listener, times(1)).onResponse(responseCaptor.capture()); assertEquals(workflowId, responseCaptor.getValue().getWorkflowId()); } - public void testFailToDeprovision() throws IOException { + public void testFailToDeprovision() throws Exception { String workflowId = "1"; + + CountDownLatch latch = new CountDownLatch(1); @SuppressWarnings("unchecked") - ActionListener listener = mock(ActionListener.class); + ActionListener listener = spy(new LatchedActionListener(mock(ActionListener.class), latch)); WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null); doAnswer(invocation -> { @@ -164,6 +159,7 @@ public void testFailToDeprovision() throws IOException { deprovisionWorkflowTransportAction.doExecute(mock(Task.class), workflowRequest, listener); ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); + latch.await(5, TimeUnit.SECONDS); verify(listener, times(1)).onFailure(exceptionCaptor.capture()); assertEquals("Failed to deprovision some resources: [model_id modelId].", exceptionCaptor.getValue().getMessage()); } From 789fc4d0aa9684c93a84576e7e0bd52325befd3e Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Fri, 19 Jan 2024 14:41:23 -0800 Subject: [PATCH 2/2] Change thread pool to a ScalingExecutorBuilder (#421) Signed-off-by: Daniel Widdis --- .../opensearch/flowframework/FlowFrameworkPlugin.java | 9 +++++---- .../DeprovisionWorkflowTransportActionTests.java | 8 ++++---- .../flowframework/workflow/DeployModelStepTests.java | 8 ++++---- .../flowframework/workflow/ProcessNodeTests.java | 8 ++++---- .../workflow/RegisterLocalCustomModelStepTests.java | 8 ++++---- .../workflow/RegisterLocalPretrainedModelStepTests.java | 8 ++++---- .../RegisterLocalSparseEncodingModelStepTests.java | 8 ++++---- .../workflow/WorkflowProcessSorterTests.java | 8 ++++---- 8 files changed, 33 insertions(+), 32 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java index 0fe7b724d..b646cf3bf 100644 --- a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java +++ b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java @@ -18,6 +18,7 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.SettingsFilter; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.core.action.ActionResponse; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; @@ -64,7 +65,7 @@ import org.opensearch.rest.RestHandler; import org.opensearch.script.ScriptService; import org.opensearch.threadpool.ExecutorBuilder; -import org.opensearch.threadpool.FixedExecutorBuilder; +import org.opensearch.threadpool.ScalingExecutorBuilder; import org.opensearch.threadpool.ThreadPool; import org.opensearch.watcher.ResourceWatcherService; @@ -176,11 +177,11 @@ public List> getSettings() { @Override public List> getExecutorBuilders(Settings settings) { return List.of( - new FixedExecutorBuilder( - settings, + new ScalingExecutorBuilder( WORKFLOW_THREAD_POOL, + 1, OpenSearchExecutors.allocatedProcessors(settings), - 100, + TimeValue.timeValueMinutes(5), FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL ) ); diff --git a/src/test/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportActionTests.java b/src/test/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportActionTests.java index 7841ed193..4d29fa827 100644 --- a/src/test/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportActionTests.java +++ b/src/test/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportActionTests.java @@ -25,7 +25,7 @@ import org.opensearch.flowframework.workflow.WorkflowStepFactory; import org.opensearch.tasks.Task; import org.opensearch.test.OpenSearchTestCase; -import org.opensearch.threadpool.FixedExecutorBuilder; +import org.opensearch.threadpool.ScalingExecutorBuilder; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; @@ -56,11 +56,11 @@ public class DeprovisionWorkflowTransportActionTests extends OpenSearchTestCase private static ThreadPool threadPool = new TestThreadPool( DeprovisionWorkflowTransportActionTests.class.getName(), - new FixedExecutorBuilder( - Settings.EMPTY, + new ScalingExecutorBuilder( WORKFLOW_THREAD_POOL, + 1, OpenSearchExecutors.allocatedProcessors(Settings.EMPTY), - 100, + TimeValue.timeValueMinutes(5), FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL ) ); diff --git a/src/test/java/org/opensearch/flowframework/workflow/DeployModelStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/DeployModelStepTests.java index 2d17da062..92d5be388 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/DeployModelStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/DeployModelStepTests.java @@ -26,7 +26,7 @@ import org.opensearch.ml.common.MLTaskType; import org.opensearch.ml.common.transport.deploy.MLDeployModelResponse; import org.opensearch.test.OpenSearchTestCase; -import org.opensearch.threadpool.FixedExecutorBuilder; +import org.opensearch.threadpool.ScalingExecutorBuilder; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.junit.AfterClass; @@ -82,11 +82,11 @@ public void setUp() throws Exception { testThreadPool = new TestThreadPool( DeployModelStepTests.class.getName(), - new FixedExecutorBuilder( - Settings.EMPTY, + new ScalingExecutorBuilder( WORKFLOW_THREAD_POOL, + 1, OpenSearchExecutors.allocatedProcessors(Settings.EMPTY), - 100, + TimeValue.timeValueMinutes(5), FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL ) ); diff --git a/src/test/java/org/opensearch/flowframework/workflow/ProcessNodeTests.java b/src/test/java/org/opensearch/flowframework/workflow/ProcessNodeTests.java index 56a8e715e..1f67d2b0b 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/ProcessNodeTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/ProcessNodeTests.java @@ -12,7 +12,7 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.test.OpenSearchTestCase; -import org.opensearch.threadpool.FixedExecutorBuilder; +import org.opensearch.threadpool.ScalingExecutorBuilder; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.junit.AfterClass; @@ -42,11 +42,11 @@ public class ProcessNodeTests extends OpenSearchTestCase { public static void setup() { testThreadPool = new TestThreadPool( ProcessNodeTests.class.getName(), - new FixedExecutorBuilder( - Settings.EMPTY, + new ScalingExecutorBuilder( WORKFLOW_THREAD_POOL, + 1, OpenSearchExecutors.allocatedProcessors(Settings.EMPTY), - 100, + TimeValue.timeValueMinutes(5), FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL ) ); diff --git a/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalCustomModelStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalCustomModelStepTests.java index 942279e6a..010abcf2d 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalCustomModelStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalCustomModelStepTests.java @@ -25,7 +25,7 @@ import org.opensearch.ml.common.transport.register.MLRegisterModelInput; import org.opensearch.ml.common.transport.register.MLRegisterModelResponse; import org.opensearch.test.OpenSearchTestCase; -import org.opensearch.threadpool.FixedExecutorBuilder; +import org.opensearch.threadpool.ScalingExecutorBuilder; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.junit.AfterClass; @@ -78,11 +78,11 @@ public void setUp() throws Exception { testThreadPool = new TestThreadPool( RegisterLocalCustomModelStepTests.class.getName(), - new FixedExecutorBuilder( - Settings.EMPTY, + new ScalingExecutorBuilder( WORKFLOW_THREAD_POOL, + 1, OpenSearchExecutors.allocatedProcessors(Settings.EMPTY), - 100, + TimeValue.timeValueMinutes(5), FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL ) ); diff --git a/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalPretrainedModelStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalPretrainedModelStepTests.java index afb76d92a..031967713 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalPretrainedModelStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalPretrainedModelStepTests.java @@ -25,7 +25,7 @@ import org.opensearch.ml.common.transport.register.MLRegisterModelInput; import org.opensearch.ml.common.transport.register.MLRegisterModelResponse; import org.opensearch.test.OpenSearchTestCase; -import org.opensearch.threadpool.FixedExecutorBuilder; +import org.opensearch.threadpool.ScalingExecutorBuilder; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.junit.AfterClass; @@ -78,11 +78,11 @@ public void setUp() throws Exception { testThreadPool = new TestThreadPool( RegisterLocalCustomModelStepTests.class.getName(), - new FixedExecutorBuilder( - Settings.EMPTY, + new ScalingExecutorBuilder( WORKFLOW_THREAD_POOL, + 1, OpenSearchExecutors.allocatedProcessors(Settings.EMPTY), - 100, + TimeValue.timeValueMinutes(5), FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL ) ); diff --git a/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalSparseEncodingModelStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalSparseEncodingModelStepTests.java index 6756913ec..6cedf632b 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalSparseEncodingModelStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalSparseEncodingModelStepTests.java @@ -25,7 +25,7 @@ import org.opensearch.ml.common.transport.register.MLRegisterModelInput; import org.opensearch.ml.common.transport.register.MLRegisterModelResponse; import org.opensearch.test.OpenSearchTestCase; -import org.opensearch.threadpool.FixedExecutorBuilder; +import org.opensearch.threadpool.ScalingExecutorBuilder; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.junit.AfterClass; @@ -78,11 +78,11 @@ public void setUp() throws Exception { testThreadPool = new TestThreadPool( RegisterLocalCustomModelStepTests.class.getName(), - new FixedExecutorBuilder( - Settings.EMPTY, + new ScalingExecutorBuilder( WORKFLOW_THREAD_POOL, + 1, OpenSearchExecutors.allocatedProcessors(Settings.EMPTY), - 100, + TimeValue.timeValueMinutes(5), FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL ) ); diff --git a/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java b/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java index 531283483..ef9dd5dcf 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java @@ -27,7 +27,7 @@ import org.opensearch.flowframework.model.WorkflowValidator; import org.opensearch.ml.client.MachineLearningNodeClient; import org.opensearch.test.OpenSearchTestCase; -import org.opensearch.threadpool.FixedExecutorBuilder; +import org.opensearch.threadpool.ScalingExecutorBuilder; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.junit.AfterClass; @@ -111,11 +111,11 @@ public static void setup() throws IOException { testThreadPool = new TestThreadPool( WorkflowProcessSorterTests.class.getName(), - new FixedExecutorBuilder( - Settings.EMPTY, + new ScalingExecutorBuilder( WORKFLOW_THREAD_POOL, + 1, OpenSearchExecutors.allocatedProcessors(Settings.EMPTY), - 100, + TimeValue.timeValueMinutes(5), FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL ) );