From 414cb8187dda547351bce8ada7046abcbbf1d915 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Wed, 29 Nov 2023 09:53:16 -0800 Subject: [PATCH 01/11] Add Delete Connector Step Signed-off-by: Daniel Widdis --- .../workflow/DeleteConnectorStep.java | 88 +++++++++++++++++ .../workflow/WorkflowStepFactory.java | 15 +-- .../resources/mappings/workflow-steps.json | 8 ++ .../workflow/DeleteConnectorStepTests.java | 97 +++++++++++++++++++ 4 files changed, 195 insertions(+), 13 deletions(-) create mode 100644 src/main/java/org/opensearch/flowframework/workflow/DeleteConnectorStep.java create mode 100644 src/test/java/org/opensearch/flowframework/workflow/DeleteConnectorStepTests.java diff --git a/src/main/java/org/opensearch/flowframework/workflow/DeleteConnectorStep.java b/src/main/java/org/opensearch/flowframework/workflow/DeleteConnectorStep.java new file mode 100644 index 000000000..c2cb77dfa --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/workflow/DeleteConnectorStep.java @@ -0,0 +1,88 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.workflow; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.ExceptionsHelper; +import org.opensearch.action.delete.DeleteResponse; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.ml.client.MachineLearningNodeClient; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import static org.opensearch.flowframework.common.CommonValue.CONNECTOR_ID; + +/** + * Step to delete a connector for a remote model + */ +public class DeleteConnectorStep implements WorkflowStep { + + private static final Logger logger = LogManager.getLogger(DeleteConnectorStep.class); + + private MachineLearningNodeClient mlClient; + + static final String NAME = "delete_connector"; + + /** + * Instantiate this class + * @param mlClient Machine Learning client to perform the deletion + */ + public DeleteConnectorStep(MachineLearningNodeClient mlClient) { + this.mlClient = mlClient; + } + + @Override + public CompletableFuture execute(List data) throws IOException { + CompletableFuture deleteConnectorFuture = new CompletableFuture<>(); + + ActionListener actionListener = new ActionListener<>() { + + @Override + public void onResponse(DeleteResponse deleteResponse) { + deleteConnectorFuture.complete( + new WorkflowData(Map.ofEntries(Map.entry("connector_id", deleteResponse.getId())), data.get(0).getWorkflowId()) + ); + } + + @Override + public void onFailure(Exception e) { + logger.error("Failed to delete connector"); + deleteConnectorFuture.completeExceptionally(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e))); + } + }; + + Optional connectorId = data.stream() + .map(WorkflowData::getContent) + .filter(m -> m.containsKey(CONNECTOR_ID)) + .map(m -> m.get(CONNECTOR_ID).toString()) + .findFirst(); + + if (connectorId.isPresent()) { + mlClient.deleteConnector(connectorId.get(), actionListener); + } else { + deleteConnectorFuture.completeExceptionally( + new FlowFrameworkException("Required field " + CONNECTOR_ID + " is not provided", RestStatus.BAD_REQUEST) + ); + } + + return deleteConnectorFuture; + } + + @Override + public String getName() { + return NAME; + } +} diff --git a/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java b/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java index 2e450d5b0..babb468b7 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java +++ b/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java @@ -25,7 +25,6 @@ public class WorkflowStepFactory { private final Map stepMap = new HashMap<>(); - private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler; /** * Instantiate this class. @@ -42,17 +41,6 @@ public WorkflowStepFactory( Client client, MachineLearningNodeClient mlClient, FlowFrameworkIndicesHandler flowFrameworkIndicesHandler - ) { - this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler; - populateMap(settings, clusterService, client, mlClient, flowFrameworkIndicesHandler); - } - - private void populateMap( - Settings settings, - ClusterService clusterService, - Client client, - MachineLearningNodeClient mlClient, - FlowFrameworkIndicesHandler flowFrameworkIndicesHandler ) { stepMap.put(NoOpStep.NAME, new NoOpStep()); stepMap.put(CreateIndexStep.NAME, new CreateIndexStep(clusterService, client)); @@ -61,6 +49,7 @@ private void populateMap( stepMap.put(RegisterRemoteModelStep.NAME, new RegisterRemoteModelStep(mlClient)); stepMap.put(DeployModelStep.NAME, new DeployModelStep(mlClient)); stepMap.put(CreateConnectorStep.NAME, new CreateConnectorStep(mlClient, flowFrameworkIndicesHandler)); + stepMap.put(DeleteConnectorStep.NAME, new DeleteConnectorStep(mlClient)); stepMap.put(ModelGroupStep.NAME, new ModelGroupStep(mlClient)); stepMap.put(GetMLTaskStep.NAME, new GetMLTaskStep(settings, clusterService, mlClient)); } @@ -79,7 +68,7 @@ public WorkflowStep createStep(String type) { /** * Gets the step map - * @return the step map + * @return a read-only copy of the step map */ public Map getStepMap() { return Map.copyOf(this.stepMap); diff --git a/src/main/resources/mappings/workflow-steps.json b/src/main/resources/mappings/workflow-steps.json index 5bd88147b..5769daa90 100644 --- a/src/main/resources/mappings/workflow-steps.json +++ b/src/main/resources/mappings/workflow-steps.json @@ -39,6 +39,14 @@ "connector_id" ] }, + "delete_connector": { + "inputs": [ + "connector_id" + ], + "outputs":[ + "connector_id" + ] + }, "register_local_model": { "inputs":[ "name", diff --git a/src/test/java/org/opensearch/flowframework/workflow/DeleteConnectorStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/DeleteConnectorStepTests.java new file mode 100644 index 000000000..5cdde128c --- /dev/null +++ b/src/test/java/org/opensearch/flowframework/workflow/DeleteConnectorStepTests.java @@ -0,0 +1,97 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.workflow; + +import org.opensearch.action.delete.DeleteResponse; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.flowframework.common.CommonValue; +import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.ml.client.MachineLearningNodeClient; +import org.opensearch.ml.common.transport.connector.MLCreateConnectorResponse; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.verify; + +public class DeleteConnectorStepTests extends OpenSearchTestCase { + private WorkflowData inputData; + + @Mock + MachineLearningNodeClient machineLearningNodeClient; + + @Override + public void setUp() throws Exception { + super.setUp(); + + MockitoAnnotations.openMocks(this); + + inputData = new WorkflowData(Map.of(CommonValue.CONNECTOR_ID, "test"), "test-id"); + } + + public void testDeleteConnector() throws IOException, ExecutionException, InterruptedException { + + String connectorId = "connectorId"; + DeleteConnectorStep deleteConnectorStep = new DeleteConnectorStep(machineLearningNodeClient); + + @SuppressWarnings("unchecked") + ArgumentCaptor> actionListenerCaptor = ArgumentCaptor.forClass(ActionListener.class); + + doAnswer(invocation -> { + ActionListener actionListener = invocation.getArgument(1); + ShardId shardId = new ShardId(new Index("indexName", "uuid"), 1); + DeleteResponse output = new DeleteResponse(shardId, connectorId, 1, 1, 1, true); + actionListener.onResponse(output); + return null; + }).when(machineLearningNodeClient).deleteConnector(any(String.class), actionListenerCaptor.capture()); + + CompletableFuture future = deleteConnectorStep.execute(List.of(inputData)); + + verify(machineLearningNodeClient).deleteConnector(any(String.class), actionListenerCaptor.capture()); + + assertTrue(future.isDone()); + assertEquals(connectorId, future.get().getContent().get("connector_id")); + + } + + public void testDeleteConnectorFailure() throws IOException { + DeleteConnectorStep deleteConnectorStep = new DeleteConnectorStep(machineLearningNodeClient); + + @SuppressWarnings("unchecked") + ArgumentCaptor> actionListenerCaptor = ArgumentCaptor.forClass(ActionListener.class); + + doAnswer(invocation -> { + ActionListener actionListener = invocation.getArgument(1); + actionListener.onFailure(new FlowFrameworkException("Failed to create connector", RestStatus.INTERNAL_SERVER_ERROR)); + return null; + }).when(machineLearningNodeClient).deleteConnector(any(String.class), actionListenerCaptor.capture()); + + CompletableFuture future = deleteConnectorStep.execute(List.of(inputData)); + + verify(machineLearningNodeClient).deleteConnector(any(String.class), actionListenerCaptor.capture()); + + assertTrue(future.isCompletedExceptionally()); + ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get().getContent()); + assertTrue(ex.getCause() instanceof FlowFrameworkException); + assertEquals("Failed to create connector", ex.getCause().getMessage()); + } +} From 5c439ecb62da8d8362503d848442dbd9b13f9b31 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Wed, 29 Nov 2023 10:38:22 -0800 Subject: [PATCH 02/11] Add eclipse core runtime version resolution Signed-off-by: Daniel Widdis --- build.gradle | 1 + 1 file changed, 1 insertion(+) diff --git a/build.gradle b/build.gradle index a7e6046d4..fc3b629bf 100644 --- a/build.gradle +++ b/build.gradle @@ -146,6 +146,7 @@ dependencies { configurations.all { resolutionStrategy { force("com.google.guava:guava:32.1.3-jre") // CVE for 31.1 + force("org.eclipse.platform:org.eclipse.core.runtime:3.29.0") // CVE for < 3.29.0 force("com.fasterxml.jackson.core:jackson-core:2.16.0") // Dependency Jar Hell } } From 6d59df28b09eeb30443ea55474b1817863dea3af Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Wed, 29 Nov 2023 10:51:12 -0800 Subject: [PATCH 03/11] Use JDK17 for spotless Signed-off-by: Daniel Widdis --- .github/workflows/CI.yml | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index d85c3c361..bbfc48aeb 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -14,6 +14,11 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 + # Spotless requires JDK 17+ + - uses: actions/setup-java@v3 + with: + java-version: 17 + distribution: temurin - name: Spotless Check run: ./gradlew spotlessCheck build: @@ -41,7 +46,7 @@ jobs: - uses: actions/checkout@v4 - name: Build and Run Tests run: | - ./gradlew check + ./gradlew check -x spotlessJava - name: Upload Coverage Report if: matrix.os == 'ubuntu-latest' uses: codecov/codecov-action@v3 From ae6db3ceb457d4b9c6e19a9e25e7615d318e41d7 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Wed, 29 Nov 2023 09:53:16 -0800 Subject: [PATCH 04/11] Add Delete Connector Step Signed-off-by: Daniel Widdis --- .../workflow/DeleteConnectorStep.java | 88 +++++++++++++++++ .../workflow/WorkflowStepFactory.java | 15 +-- .../resources/mappings/workflow-steps.json | 8 ++ .../workflow/DeleteConnectorStepTests.java | 97 +++++++++++++++++++ 4 files changed, 195 insertions(+), 13 deletions(-) create mode 100644 src/main/java/org/opensearch/flowframework/workflow/DeleteConnectorStep.java create mode 100644 src/test/java/org/opensearch/flowframework/workflow/DeleteConnectorStepTests.java diff --git a/src/main/java/org/opensearch/flowframework/workflow/DeleteConnectorStep.java b/src/main/java/org/opensearch/flowframework/workflow/DeleteConnectorStep.java new file mode 100644 index 000000000..c2cb77dfa --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/workflow/DeleteConnectorStep.java @@ -0,0 +1,88 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.workflow; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.ExceptionsHelper; +import org.opensearch.action.delete.DeleteResponse; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.ml.client.MachineLearningNodeClient; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import static org.opensearch.flowframework.common.CommonValue.CONNECTOR_ID; + +/** + * Step to delete a connector for a remote model + */ +public class DeleteConnectorStep implements WorkflowStep { + + private static final Logger logger = LogManager.getLogger(DeleteConnectorStep.class); + + private MachineLearningNodeClient mlClient; + + static final String NAME = "delete_connector"; + + /** + * Instantiate this class + * @param mlClient Machine Learning client to perform the deletion + */ + public DeleteConnectorStep(MachineLearningNodeClient mlClient) { + this.mlClient = mlClient; + } + + @Override + public CompletableFuture execute(List data) throws IOException { + CompletableFuture deleteConnectorFuture = new CompletableFuture<>(); + + ActionListener actionListener = new ActionListener<>() { + + @Override + public void onResponse(DeleteResponse deleteResponse) { + deleteConnectorFuture.complete( + new WorkflowData(Map.ofEntries(Map.entry("connector_id", deleteResponse.getId())), data.get(0).getWorkflowId()) + ); + } + + @Override + public void onFailure(Exception e) { + logger.error("Failed to delete connector"); + deleteConnectorFuture.completeExceptionally(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e))); + } + }; + + Optional connectorId = data.stream() + .map(WorkflowData::getContent) + .filter(m -> m.containsKey(CONNECTOR_ID)) + .map(m -> m.get(CONNECTOR_ID).toString()) + .findFirst(); + + if (connectorId.isPresent()) { + mlClient.deleteConnector(connectorId.get(), actionListener); + } else { + deleteConnectorFuture.completeExceptionally( + new FlowFrameworkException("Required field " + CONNECTOR_ID + " is not provided", RestStatus.BAD_REQUEST) + ); + } + + return deleteConnectorFuture; + } + + @Override + public String getName() { + return NAME; + } +} diff --git a/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java b/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java index 2e450d5b0..babb468b7 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java +++ b/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java @@ -25,7 +25,6 @@ public class WorkflowStepFactory { private final Map stepMap = new HashMap<>(); - private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler; /** * Instantiate this class. @@ -42,17 +41,6 @@ public WorkflowStepFactory( Client client, MachineLearningNodeClient mlClient, FlowFrameworkIndicesHandler flowFrameworkIndicesHandler - ) { - this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler; - populateMap(settings, clusterService, client, mlClient, flowFrameworkIndicesHandler); - } - - private void populateMap( - Settings settings, - ClusterService clusterService, - Client client, - MachineLearningNodeClient mlClient, - FlowFrameworkIndicesHandler flowFrameworkIndicesHandler ) { stepMap.put(NoOpStep.NAME, new NoOpStep()); stepMap.put(CreateIndexStep.NAME, new CreateIndexStep(clusterService, client)); @@ -61,6 +49,7 @@ private void populateMap( stepMap.put(RegisterRemoteModelStep.NAME, new RegisterRemoteModelStep(mlClient)); stepMap.put(DeployModelStep.NAME, new DeployModelStep(mlClient)); stepMap.put(CreateConnectorStep.NAME, new CreateConnectorStep(mlClient, flowFrameworkIndicesHandler)); + stepMap.put(DeleteConnectorStep.NAME, new DeleteConnectorStep(mlClient)); stepMap.put(ModelGroupStep.NAME, new ModelGroupStep(mlClient)); stepMap.put(GetMLTaskStep.NAME, new GetMLTaskStep(settings, clusterService, mlClient)); } @@ -79,7 +68,7 @@ public WorkflowStep createStep(String type) { /** * Gets the step map - * @return the step map + * @return a read-only copy of the step map */ public Map getStepMap() { return Map.copyOf(this.stepMap); diff --git a/src/main/resources/mappings/workflow-steps.json b/src/main/resources/mappings/workflow-steps.json index 5bd88147b..5769daa90 100644 --- a/src/main/resources/mappings/workflow-steps.json +++ b/src/main/resources/mappings/workflow-steps.json @@ -39,6 +39,14 @@ "connector_id" ] }, + "delete_connector": { + "inputs": [ + "connector_id" + ], + "outputs":[ + "connector_id" + ] + }, "register_local_model": { "inputs":[ "name", diff --git a/src/test/java/org/opensearch/flowframework/workflow/DeleteConnectorStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/DeleteConnectorStepTests.java new file mode 100644 index 000000000..5cdde128c --- /dev/null +++ b/src/test/java/org/opensearch/flowframework/workflow/DeleteConnectorStepTests.java @@ -0,0 +1,97 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.workflow; + +import org.opensearch.action.delete.DeleteResponse; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.flowframework.common.CommonValue; +import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.ml.client.MachineLearningNodeClient; +import org.opensearch.ml.common.transport.connector.MLCreateConnectorResponse; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.verify; + +public class DeleteConnectorStepTests extends OpenSearchTestCase { + private WorkflowData inputData; + + @Mock + MachineLearningNodeClient machineLearningNodeClient; + + @Override + public void setUp() throws Exception { + super.setUp(); + + MockitoAnnotations.openMocks(this); + + inputData = new WorkflowData(Map.of(CommonValue.CONNECTOR_ID, "test"), "test-id"); + } + + public void testDeleteConnector() throws IOException, ExecutionException, InterruptedException { + + String connectorId = "connectorId"; + DeleteConnectorStep deleteConnectorStep = new DeleteConnectorStep(machineLearningNodeClient); + + @SuppressWarnings("unchecked") + ArgumentCaptor> actionListenerCaptor = ArgumentCaptor.forClass(ActionListener.class); + + doAnswer(invocation -> { + ActionListener actionListener = invocation.getArgument(1); + ShardId shardId = new ShardId(new Index("indexName", "uuid"), 1); + DeleteResponse output = new DeleteResponse(shardId, connectorId, 1, 1, 1, true); + actionListener.onResponse(output); + return null; + }).when(machineLearningNodeClient).deleteConnector(any(String.class), actionListenerCaptor.capture()); + + CompletableFuture future = deleteConnectorStep.execute(List.of(inputData)); + + verify(machineLearningNodeClient).deleteConnector(any(String.class), actionListenerCaptor.capture()); + + assertTrue(future.isDone()); + assertEquals(connectorId, future.get().getContent().get("connector_id")); + + } + + public void testDeleteConnectorFailure() throws IOException { + DeleteConnectorStep deleteConnectorStep = new DeleteConnectorStep(machineLearningNodeClient); + + @SuppressWarnings("unchecked") + ArgumentCaptor> actionListenerCaptor = ArgumentCaptor.forClass(ActionListener.class); + + doAnswer(invocation -> { + ActionListener actionListener = invocation.getArgument(1); + actionListener.onFailure(new FlowFrameworkException("Failed to create connector", RestStatus.INTERNAL_SERVER_ERROR)); + return null; + }).when(machineLearningNodeClient).deleteConnector(any(String.class), actionListenerCaptor.capture()); + + CompletableFuture future = deleteConnectorStep.execute(List.of(inputData)); + + verify(machineLearningNodeClient).deleteConnector(any(String.class), actionListenerCaptor.capture()); + + assertTrue(future.isCompletedExceptionally()); + ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get().getContent()); + assertTrue(ex.getCause() instanceof FlowFrameworkException); + assertEquals("Failed to create connector", ex.getCause().getMessage()); + } +} From f84ca841dee7ee668f556c64ca5901f3573a5ca8 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Wed, 29 Nov 2023 10:38:22 -0800 Subject: [PATCH 05/11] Add eclipse core runtime version resolution Signed-off-by: Daniel Widdis --- build.gradle | 1 + 1 file changed, 1 insertion(+) diff --git a/build.gradle b/build.gradle index 6b66247a6..0109ec935 100644 --- a/build.gradle +++ b/build.gradle @@ -148,6 +148,7 @@ dependencies { configurations.all { resolutionStrategy { force("com.google.guava:guava:32.1.3-jre") // CVE for 31.1 + force("org.eclipse.platform:org.eclipse.core.runtime:3.29.0") // CVE for < 3.29.0 force("com.fasterxml.jackson.core:jackson-core:2.16.0") // Dependency Jar Hell } } From a9b4a83502b2db6c2345df8e40c3980ad328d785 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Wed, 29 Nov 2023 10:51:12 -0800 Subject: [PATCH 06/11] Use JDK17 for spotless Signed-off-by: Daniel Widdis --- .github/workflows/CI.yml | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index d85c3c361..bbfc48aeb 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -14,6 +14,11 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 + # Spotless requires JDK 17+ + - uses: actions/setup-java@v3 + with: + java-version: 17 + distribution: temurin - name: Spotless Check run: ./gradlew spotlessCheck build: @@ -41,7 +46,7 @@ jobs: - uses: actions/checkout@v4 - name: Build and Run Tests run: | - ./gradlew check + ./gradlew check -x spotlessJava - name: Upload Coverage Report if: matrix.os == 'ubuntu-latest' uses: codecov/codecov-action@v3 From 8023f9f1e4025298ebe9e03ce7150c82c8fa5261 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Wed, 29 Nov 2023 19:16:35 -0800 Subject: [PATCH 07/11] Fetch connector ID from appropriate previous node output Signed-off-by: Daniel Widdis --- .../workflow/DeleteConnectorStep.java | 27 ++++++++++--------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/workflow/DeleteConnectorStep.java b/src/main/java/org/opensearch/flowframework/workflow/DeleteConnectorStep.java index 1386931c2..bf0fae33e 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/DeleteConnectorStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/DeleteConnectorStep.java @@ -18,8 +18,6 @@ import org.opensearch.ml.client.MachineLearningNodeClient; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -74,20 +72,23 @@ public void onFailure(Exception e) { } }; - // TODO: Recreating the list to get this compiling - // Need to refactor the below iteration to pull directly from the maps - List data = new ArrayList<>(); - data.add(currentNodeInputs); - data.addAll(outputs.values()); + String connectorId = null; - Optional connectorId = data.stream() - .map(WorkflowData::getContent) - .filter(m -> m.containsKey(CONNECTOR_ID)) - .map(m -> m.get(CONNECTOR_ID).toString()) + // Previous Node inputs defines which step the connector ID came from + Optional previousNode = previousNodeInputs.entrySet() + .stream() + .filter(e -> CONNECTOR_ID.equals(e.getValue())) + .map(Map.Entry::getKey) .findFirst(); + if (previousNode.isPresent()) { + WorkflowData previousNodeOutput = outputs.get(previousNode.get()); + if (previousNodeOutput != null && previousNodeOutput.getContent().containsKey(CONNECTOR_ID)) { + connectorId = previousNodeOutput.getContent().get(CONNECTOR_ID).toString(); + } + } - if (connectorId.isPresent()) { - mlClient.deleteConnector(connectorId.get(), actionListener); + if (connectorId != null) { + mlClient.deleteConnector(connectorId, actionListener); } else { deleteConnectorFuture.completeExceptionally( new FlowFrameworkException("Required field " + CONNECTOR_ID + " is not provided", RestStatus.BAD_REQUEST) From 9de81ecd05042f5083735486f4c881f126f14d06 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Wed, 29 Nov 2023 19:45:56 -0800 Subject: [PATCH 08/11] Fix tests Signed-off-by: Daniel Widdis --- .../workflow/DeleteConnectorStepTests.java | 29 ++++++++++++++----- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/src/test/java/org/opensearch/flowframework/workflow/DeleteConnectorStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/DeleteConnectorStepTests.java index 478d46c91..41e6c1f53 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/DeleteConnectorStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/DeleteConnectorStepTests.java @@ -67,10 +67,9 @@ public void testDeleteConnector() throws IOException, ExecutionException, Interr CompletableFuture future = deleteConnectorStep.execute( inputData.getNodeId(), inputData, - Collections.emptyMap(), - Collections.emptyMap() + Map.of("step_1", new WorkflowData(Map.of("connector_id", "test"), "workflowId", "nodeId")), + Map.of("step_1", "connector_id") ); - verify(machineLearningNodeClient).deleteConnector(any(String.class), actionListenerCaptor.capture()); assertTrue(future.isDone()); @@ -78,6 +77,22 @@ public void testDeleteConnector() throws IOException, ExecutionException, Interr } + public void testNoConnectorIdInOutput() throws IOException { + DeleteConnectorStep deleteConnectorStep = new DeleteConnectorStep(machineLearningNodeClient); + + CompletableFuture future = deleteConnectorStep.execute( + inputData.getNodeId(), + inputData, + Collections.emptyMap(), + Collections.emptyMap() + ); + + assertTrue(future.isCompletedExceptionally()); + ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get().getContent()); + assertTrue(ex.getCause() instanceof FlowFrameworkException); + assertEquals("Required field connector_id is not provided", ex.getCause().getMessage()); + } + public void testDeleteConnectorFailure() throws IOException { DeleteConnectorStep deleteConnectorStep = new DeleteConnectorStep(machineLearningNodeClient); @@ -86,15 +101,15 @@ public void testDeleteConnectorFailure() throws IOException { doAnswer(invocation -> { ActionListener actionListener = invocation.getArgument(1); - actionListener.onFailure(new FlowFrameworkException("Failed to create connector", RestStatus.INTERNAL_SERVER_ERROR)); + actionListener.onFailure(new FlowFrameworkException("Failed to delete connector", RestStatus.INTERNAL_SERVER_ERROR)); return null; }).when(machineLearningNodeClient).deleteConnector(any(String.class), actionListenerCaptor.capture()); CompletableFuture future = deleteConnectorStep.execute( inputData.getNodeId(), inputData, - Collections.emptyMap(), - Collections.emptyMap() + Map.of("step_1", new WorkflowData(Map.of("connector_id", "test"), "workflowId", "nodeId")), + Map.of("step_1", "connector_id") ); verify(machineLearningNodeClient).deleteConnector(any(String.class), actionListenerCaptor.capture()); @@ -102,6 +117,6 @@ public void testDeleteConnectorFailure() throws IOException { assertTrue(future.isCompletedExceptionally()); ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get().getContent()); assertTrue(ex.getCause() instanceof FlowFrameworkException); - assertEquals("Failed to create connector", ex.getCause().getMessage()); + assertEquals("Failed to delete connector", ex.getCause().getMessage()); } } From 346d2883ce5950b9c4ed1b56669351b106690aa9 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Wed, 29 Nov 2023 19:55:04 -0800 Subject: [PATCH 09/11] Test that actual ID is properly passed Signed-off-by: Daniel Widdis --- .../flowframework/workflow/DeleteConnectorStepTests.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/test/java/org/opensearch/flowframework/workflow/DeleteConnectorStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/DeleteConnectorStepTests.java index 41e6c1f53..cfd9c8831 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/DeleteConnectorStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/DeleteConnectorStepTests.java @@ -50,16 +50,17 @@ public void setUp() throws Exception { public void testDeleteConnector() throws IOException, ExecutionException, InterruptedException { - String connectorId = "connectorId"; + String connectorId = randomAlphaOfLength(5); DeleteConnectorStep deleteConnectorStep = new DeleteConnectorStep(machineLearningNodeClient); @SuppressWarnings("unchecked") ArgumentCaptor> actionListenerCaptor = ArgumentCaptor.forClass(ActionListener.class); doAnswer(invocation -> { + String connectorIdArg = invocation.getArgument(0); ActionListener actionListener = invocation.getArgument(1); ShardId shardId = new ShardId(new Index("indexName", "uuid"), 1); - DeleteResponse output = new DeleteResponse(shardId, connectorId, 1, 1, 1, true); + DeleteResponse output = new DeleteResponse(shardId, connectorIdArg, 1, 1, 1, true); actionListener.onResponse(output); return null; }).when(machineLearningNodeClient).deleteConnector(any(String.class), actionListenerCaptor.capture()); @@ -67,14 +68,13 @@ public void testDeleteConnector() throws IOException, ExecutionException, Interr CompletableFuture future = deleteConnectorStep.execute( inputData.getNodeId(), inputData, - Map.of("step_1", new WorkflowData(Map.of("connector_id", "test"), "workflowId", "nodeId")), + Map.of("step_1", new WorkflowData(Map.of("connector_id", connectorId), "workflowId", "nodeId")), Map.of("step_1", "connector_id") ); verify(machineLearningNodeClient).deleteConnector(any(String.class), actionListenerCaptor.capture()); assertTrue(future.isDone()); assertEquals(connectorId, future.get().getContent().get("connector_id")); - } public void testNoConnectorIdInOutput() throws IOException { From 8a81c60d087b8d24333c64691eacf250c97ce5f1 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Thu, 30 Nov 2023 16:17:53 -0800 Subject: [PATCH 10/11] Update to current setup-java version Signed-off-by: Daniel Widdis --- .github/workflows/CI.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index bbfc48aeb..09c8c25c2 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -15,7 +15,7 @@ jobs: steps: - uses: actions/checkout@v4 # Spotless requires JDK 17+ - - uses: actions/setup-java@v3 + - uses: actions/setup-java@v4 with: java-version: 17 distribution: temurin From ba6657b5ae84c6e86a93682536b753d37872c160 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Fri, 1 Dec 2023 10:25:28 -0800 Subject: [PATCH 11/11] Remove unneeded argument captors Signed-off-by: Daniel Widdis --- .../workflow/CreateConnectorStepTests.java | 16 ++++------------ .../workflow/DeleteConnectorStepTests.java | 15 ++++----------- 2 files changed, 8 insertions(+), 23 deletions(-) diff --git a/src/test/java/org/opensearch/flowframework/workflow/CreateConnectorStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/CreateConnectorStepTests.java index de3add996..1135a0ca6 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/CreateConnectorStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/CreateConnectorStepTests.java @@ -25,7 +25,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; @@ -81,15 +80,12 @@ public void testCreateConnector() throws IOException, ExecutionException, Interr String connectorId = "connect"; CreateConnectorStep createConnectorStep = new CreateConnectorStep(machineLearningNodeClient, flowFrameworkIndicesHandler); - @SuppressWarnings("unchecked") - ArgumentCaptor> actionListenerCaptor = ArgumentCaptor.forClass(ActionListener.class); - doAnswer(invocation -> { ActionListener actionListener = invocation.getArgument(1); MLCreateConnectorResponse output = new MLCreateConnectorResponse(connectorId); actionListener.onResponse(output); return null; - }).when(machineLearningNodeClient).createConnector(any(MLCreateConnectorInput.class), actionListenerCaptor.capture()); + }).when(machineLearningNodeClient).createConnector(any(MLCreateConnectorInput.class), any()); CompletableFuture future = createConnectorStep.execute( inputData.getNodeId(), @@ -98,8 +94,7 @@ public void testCreateConnector() throws IOException, ExecutionException, Interr Collections.emptyMap() ); - verify(machineLearningNodeClient).createConnector(any(MLCreateConnectorInput.class), actionListenerCaptor.capture()); - + verify(machineLearningNodeClient).createConnector(any(MLCreateConnectorInput.class), any()); assertTrue(future.isDone()); assertEquals(connectorId, future.get().getContent().get("connector_id")); @@ -108,14 +103,11 @@ public void testCreateConnector() throws IOException, ExecutionException, Interr public void testCreateConnectorFailure() throws IOException { CreateConnectorStep createConnectorStep = new CreateConnectorStep(machineLearningNodeClient, flowFrameworkIndicesHandler); - @SuppressWarnings("unchecked") - ArgumentCaptor> actionListenerCaptor = ArgumentCaptor.forClass(ActionListener.class); - doAnswer(invocation -> { ActionListener actionListener = invocation.getArgument(1); actionListener.onFailure(new FlowFrameworkException("Failed to create connector", RestStatus.INTERNAL_SERVER_ERROR)); return null; - }).when(machineLearningNodeClient).createConnector(any(MLCreateConnectorInput.class), actionListenerCaptor.capture()); + }).when(machineLearningNodeClient).createConnector(any(MLCreateConnectorInput.class), any()); CompletableFuture future = createConnectorStep.execute( inputData.getNodeId(), @@ -124,7 +116,7 @@ public void testCreateConnectorFailure() throws IOException { Collections.emptyMap() ); - verify(machineLearningNodeClient).createConnector(any(MLCreateConnectorInput.class), actionListenerCaptor.capture()); + verify(machineLearningNodeClient).createConnector(any(MLCreateConnectorInput.class), any()); assertTrue(future.isCompletedExceptionally()); ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get().getContent()); diff --git a/src/test/java/org/opensearch/flowframework/workflow/DeleteConnectorStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/DeleteConnectorStepTests.java index cfd9c8831..3c997a02e 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/DeleteConnectorStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/DeleteConnectorStepTests.java @@ -25,7 +25,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; @@ -53,9 +52,6 @@ public void testDeleteConnector() throws IOException, ExecutionException, Interr String connectorId = randomAlphaOfLength(5); DeleteConnectorStep deleteConnectorStep = new DeleteConnectorStep(machineLearningNodeClient); - @SuppressWarnings("unchecked") - ArgumentCaptor> actionListenerCaptor = ArgumentCaptor.forClass(ActionListener.class); - doAnswer(invocation -> { String connectorIdArg = invocation.getArgument(0); ActionListener actionListener = invocation.getArgument(1); @@ -63,7 +59,7 @@ public void testDeleteConnector() throws IOException, ExecutionException, Interr DeleteResponse output = new DeleteResponse(shardId, connectorIdArg, 1, 1, 1, true); actionListener.onResponse(output); return null; - }).when(machineLearningNodeClient).deleteConnector(any(String.class), actionListenerCaptor.capture()); + }).when(machineLearningNodeClient).deleteConnector(any(String.class), any()); CompletableFuture future = deleteConnectorStep.execute( inputData.getNodeId(), @@ -71,7 +67,7 @@ public void testDeleteConnector() throws IOException, ExecutionException, Interr Map.of("step_1", new WorkflowData(Map.of("connector_id", connectorId), "workflowId", "nodeId")), Map.of("step_1", "connector_id") ); - verify(machineLearningNodeClient).deleteConnector(any(String.class), actionListenerCaptor.capture()); + verify(machineLearningNodeClient).deleteConnector(any(String.class), any()); assertTrue(future.isDone()); assertEquals(connectorId, future.get().getContent().get("connector_id")); @@ -96,14 +92,11 @@ public void testNoConnectorIdInOutput() throws IOException { public void testDeleteConnectorFailure() throws IOException { DeleteConnectorStep deleteConnectorStep = new DeleteConnectorStep(machineLearningNodeClient); - @SuppressWarnings("unchecked") - ArgumentCaptor> actionListenerCaptor = ArgumentCaptor.forClass(ActionListener.class); - doAnswer(invocation -> { ActionListener actionListener = invocation.getArgument(1); actionListener.onFailure(new FlowFrameworkException("Failed to delete connector", RestStatus.INTERNAL_SERVER_ERROR)); return null; - }).when(machineLearningNodeClient).deleteConnector(any(String.class), actionListenerCaptor.capture()); + }).when(machineLearningNodeClient).deleteConnector(any(String.class), any()); CompletableFuture future = deleteConnectorStep.execute( inputData.getNodeId(), @@ -112,7 +105,7 @@ public void testDeleteConnectorFailure() throws IOException { Map.of("step_1", "connector_id") ); - verify(machineLearningNodeClient).deleteConnector(any(String.class), actionListenerCaptor.capture()); + verify(machineLearningNodeClient).deleteConnector(any(String.class), any()); assertTrue(future.isCompletedExceptionally()); ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get().getContent());