Skip to content

Commit

Permalink
Merge branch 'main' into fix-build
Browse files Browse the repository at this point in the history
Signed-off-by: owaiskazi19 <[email protected]>
  • Loading branch information
owaiskazi19 committed Jan 20, 2024
2 parents a796196 + 789fc4d commit cb91013
Show file tree
Hide file tree
Showing 11 changed files with 65 additions and 67 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/test_security.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
name: Security test workflow for Flow Framework
on:
push:
branches:
- "*"
branches-ignore:
- 'whitesource-remediate/**'
- 'backport/**'
pull_request:
branches:
- "*"
types: [opened, synchronize, reopened]

jobs:
Get-CI-Image-Tag:
Expand All @@ -30,9 +30,9 @@ jobs:

steps:
- name: Checkout Flow Framework
uses: actions/checkout@v1
uses: actions/checkout@v3
- name: Setup Java ${{ matrix.java }}
uses: actions/setup-java@v1
uses: actions/setup-java@v3
with:
distribution: 'temurin'
java-version: ${{ matrix.java }}
Expand Down
3 changes: 1 addition & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,6 @@ testClusters.integTest {

// Installs all registered zipArchive dependencies on integTest cluster nodes except security
configurations.zipArchive.asFileTree.each {
if(!it.name.contains("opensearch-security")) {
plugin(provider(new Callable<RegularFile>(){
@Override
RegularFile call() throws Exception {
Expand All @@ -361,7 +360,7 @@ testClusters.integTest {
}
})
)
}

}

// Install Flow Framwork Plugin on integTest cluster nodes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsFilter;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
Expand Down Expand Up @@ -64,7 +65,7 @@
import org.opensearch.rest.RestHandler;
import org.opensearch.script.ScriptService;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.FixedExecutorBuilder;
import org.opensearch.threadpool.ScalingExecutorBuilder;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.watcher.ResourceWatcherService;

Expand Down Expand Up @@ -176,11 +177,11 @@ public List<Setting<?>> getSettings() {
@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
return List.of(
new FixedExecutorBuilder(
settings,
new ScalingExecutorBuilder(
WORKFLOW_THREAD_POOL,
1,
OpenSearchExecutors.allocatedProcessors(settings),
100,
TimeValue.timeValueMinutes(5),
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import static org.opensearch.flowframework.common.CommonValue.PROVISION_START_TIME_FIELD;
import static org.opensearch.flowframework.common.CommonValue.RESOURCES_CREATED_FIELD;
import static org.opensearch.flowframework.common.CommonValue.STATE_FIELD;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.WorkflowResources.getDeprovisionStepByWorkflowStep;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;

Expand Down Expand Up @@ -101,7 +102,8 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
context.restore();

// Retrieve resources from workflow state and deprovision
executeDeprovisionSequence(workflowId, response.getWorkflowState().resourcesCreated(), listener);
threadPool.executor(WORKFLOW_THREAD_POOL)
.execute(() -> executeDeprovisionSequence(workflowId, response.getWorkflowState().resourcesCreated(), listener));
}, exception -> {
String message = "Failed to get workflow state for workflow " + workflowId;
logger.error(message, exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
*/
package org.opensearch.flowframework.transport;

import org.opensearch.action.LatchedActionListener;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.client.Client;
import org.opensearch.common.settings.Settings;
Expand All @@ -19,24 +20,20 @@
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.model.ResourceCreated;
import org.opensearch.flowframework.model.WorkflowState;
import org.opensearch.flowframework.workflow.CreateConnectorStep;
import org.opensearch.flowframework.workflow.DeleteConnectorStep;
import org.opensearch.flowframework.workflow.ProcessNode;
import org.opensearch.flowframework.workflow.WorkflowData;
import org.opensearch.flowframework.workflow.WorkflowStepFactory;
import org.opensearch.ml.client.MachineLearningNodeClient;
import org.opensearch.tasks.Task;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.FixedExecutorBuilder;
import org.opensearch.threadpool.ScalingExecutorBuilder;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;
import org.junit.AfterClass;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.mockito.ArgumentCaptor;
Expand All @@ -50,6 +47,7 @@
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -58,11 +56,11 @@ public class DeprovisionWorkflowTransportActionTests extends OpenSearchTestCase

private static ThreadPool threadPool = new TestThreadPool(
DeprovisionWorkflowTransportActionTests.class.getName(),
new FixedExecutorBuilder(
Settings.EMPTY,
new ScalingExecutorBuilder(
WORKFLOW_THREAD_POOL,
1,
OpenSearchExecutors.allocatedProcessors(Settings.EMPTY),
100,
TimeValue.timeValueMinutes(5),
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL
)
);
Expand All @@ -77,47 +75,41 @@ public class DeprovisionWorkflowTransportActionTests extends OpenSearchTestCase
public void setUp() throws Exception {
super.setUp();
this.client = mock(Client.class);
ThreadPool clientThreadPool = spy(threadPool);
when(client.threadPool()).thenReturn(clientThreadPool);
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
when(clientThreadPool.getThreadContext()).thenReturn(threadContext);

this.workflowStepFactory = mock(WorkflowStepFactory.class);
this.deleteConnectorStep = mock(DeleteConnectorStep.class);
when(this.workflowStepFactory.createStep("delete_connector")).thenReturn(deleteConnectorStep);

this.flowFrameworkIndicesHandler = mock(FlowFrameworkIndicesHandler.class);
flowFrameworkSettings = mock(FlowFrameworkSettings.class);
when(flowFrameworkSettings.getRequestTimeout()).thenReturn(TimeValue.timeValueSeconds(10));

this.deprovisionWorkflowTransportAction = new DeprovisionWorkflowTransportAction(
mock(TransportService.class),
mock(ActionFilters.class),
threadPool,
clientThreadPool,
client,
workflowStepFactory,
flowFrameworkIndicesHandler,
flowFrameworkSettings
);

MachineLearningNodeClient mlClient = new MachineLearningNodeClient(client);
ProcessNode processNode = mock(ProcessNode.class);
when(processNode.id()).thenReturn("step_1");
when(processNode.workflowStep()).thenReturn(new CreateConnectorStep(mlClient, flowFrameworkIndicesHandler));
when(processNode.previousNodeInputs()).thenReturn(Collections.emptyMap());
when(processNode.input()).thenReturn(WorkflowData.EMPTY);
when(processNode.nodeTimeout()).thenReturn(TimeValue.timeValueSeconds(5));
this.deleteConnectorStep = mock(DeleteConnectorStep.class);
when(this.workflowStepFactory.createStep("delete_connector")).thenReturn(deleteConnectorStep);

ThreadPool clientThreadPool = mock(ThreadPool.class);
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);

when(client.threadPool()).thenReturn(clientThreadPool);
when(clientThreadPool.getThreadContext()).thenReturn(threadContext);
}

@AfterClass
public static void cleanup() {
ThreadPool.terminate(threadPool, 500, TimeUnit.MILLISECONDS);
}

public void testDeprovisionWorkflow() throws IOException {
public void testDeprovisionWorkflow() throws Exception {
String workflowId = "1";

CountDownLatch latch = new CountDownLatch(1);
@SuppressWarnings("unchecked")
ActionListener<WorkflowResponse> listener = mock(ActionListener.class);
ActionListener<WorkflowResponse> listener = spy(new LatchedActionListener<WorkflowResponse>(mock(ActionListener.class), latch));
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null);

doAnswer(invocation -> {
Expand All @@ -137,14 +129,17 @@ public void testDeprovisionWorkflow() throws IOException {
deprovisionWorkflowTransportAction.doExecute(mock(Task.class), workflowRequest, listener);
ArgumentCaptor<WorkflowResponse> responseCaptor = ArgumentCaptor.forClass(WorkflowResponse.class);

latch.await(5, TimeUnit.SECONDS);
verify(listener, times(1)).onResponse(responseCaptor.capture());
assertEquals(workflowId, responseCaptor.getValue().getWorkflowId());
}

public void testFailToDeprovision() throws IOException {
public void testFailToDeprovision() throws Exception {
String workflowId = "1";

CountDownLatch latch = new CountDownLatch(1);
@SuppressWarnings("unchecked")
ActionListener<WorkflowResponse> listener = mock(ActionListener.class);
ActionListener<WorkflowResponse> listener = spy(new LatchedActionListener<WorkflowResponse>(mock(ActionListener.class), latch));
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null);

doAnswer(invocation -> {
Expand All @@ -164,6 +159,7 @@ public void testFailToDeprovision() throws IOException {
deprovisionWorkflowTransportAction.doExecute(mock(Task.class), workflowRequest, listener);
ArgumentCaptor<Exception> exceptionCaptor = ArgumentCaptor.forClass(Exception.class);

latch.await(5, TimeUnit.SECONDS);
verify(listener, times(1)).onFailure(exceptionCaptor.capture());
assertEquals("Failed to deprovision some resources: [model_id modelId].", exceptionCaptor.getValue().getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.opensearch.ml.common.MLTaskType;
import org.opensearch.ml.common.transport.deploy.MLDeployModelResponse;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.FixedExecutorBuilder;
import org.opensearch.threadpool.ScalingExecutorBuilder;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.junit.AfterClass;
Expand Down Expand Up @@ -82,11 +82,11 @@ public void setUp() throws Exception {

testThreadPool = new TestThreadPool(
DeployModelStepTests.class.getName(),
new FixedExecutorBuilder(
Settings.EMPTY,
new ScalingExecutorBuilder(
WORKFLOW_THREAD_POOL,
1,
OpenSearchExecutors.allocatedProcessors(Settings.EMPTY),
100,
TimeValue.timeValueMinutes(5),
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.FixedExecutorBuilder;
import org.opensearch.threadpool.ScalingExecutorBuilder;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.junit.AfterClass;
Expand Down Expand Up @@ -42,11 +42,11 @@ public class ProcessNodeTests extends OpenSearchTestCase {
public static void setup() {
testThreadPool = new TestThreadPool(
ProcessNodeTests.class.getName(),
new FixedExecutorBuilder(
Settings.EMPTY,
new ScalingExecutorBuilder(
WORKFLOW_THREAD_POOL,
1,
OpenSearchExecutors.allocatedProcessors(Settings.EMPTY),
100,
TimeValue.timeValueMinutes(5),
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.opensearch.ml.common.transport.register.MLRegisterModelInput;
import org.opensearch.ml.common.transport.register.MLRegisterModelResponse;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.FixedExecutorBuilder;
import org.opensearch.threadpool.ScalingExecutorBuilder;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.junit.AfterClass;
Expand Down Expand Up @@ -78,11 +78,11 @@ public void setUp() throws Exception {

testThreadPool = new TestThreadPool(
RegisterLocalCustomModelStepTests.class.getName(),
new FixedExecutorBuilder(
Settings.EMPTY,
new ScalingExecutorBuilder(
WORKFLOW_THREAD_POOL,
1,
OpenSearchExecutors.allocatedProcessors(Settings.EMPTY),
100,
TimeValue.timeValueMinutes(5),
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.opensearch.ml.common.transport.register.MLRegisterModelInput;
import org.opensearch.ml.common.transport.register.MLRegisterModelResponse;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.FixedExecutorBuilder;
import org.opensearch.threadpool.ScalingExecutorBuilder;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.junit.AfterClass;
Expand Down Expand Up @@ -78,11 +78,11 @@ public void setUp() throws Exception {

testThreadPool = new TestThreadPool(
RegisterLocalCustomModelStepTests.class.getName(),
new FixedExecutorBuilder(
Settings.EMPTY,
new ScalingExecutorBuilder(
WORKFLOW_THREAD_POOL,
1,
OpenSearchExecutors.allocatedProcessors(Settings.EMPTY),
100,
TimeValue.timeValueMinutes(5),
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.opensearch.ml.common.transport.register.MLRegisterModelInput;
import org.opensearch.ml.common.transport.register.MLRegisterModelResponse;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.FixedExecutorBuilder;
import org.opensearch.threadpool.ScalingExecutorBuilder;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.junit.AfterClass;
Expand Down Expand Up @@ -78,11 +78,11 @@ public void setUp() throws Exception {

testThreadPool = new TestThreadPool(
RegisterLocalCustomModelStepTests.class.getName(),
new FixedExecutorBuilder(
Settings.EMPTY,
new ScalingExecutorBuilder(
WORKFLOW_THREAD_POOL,
1,
OpenSearchExecutors.allocatedProcessors(Settings.EMPTY),
100,
TimeValue.timeValueMinutes(5),
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.opensearch.flowframework.model.WorkflowValidator;
import org.opensearch.ml.client.MachineLearningNodeClient;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.FixedExecutorBuilder;
import org.opensearch.threadpool.ScalingExecutorBuilder;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.junit.AfterClass;
Expand Down Expand Up @@ -111,11 +111,11 @@ public static void setup() throws IOException {

testThreadPool = new TestThreadPool(
WorkflowProcessSorterTests.class.getName(),
new FixedExecutorBuilder(
Settings.EMPTY,
new ScalingExecutorBuilder(
WORKFLOW_THREAD_POOL,
1,
OpenSearchExecutors.allocatedProcessors(Settings.EMPTY),
100,
TimeValue.timeValueMinutes(5),
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL
)
);
Expand Down

0 comments on commit cb91013

Please sign in to comment.