From 01a254f25cb1abaa1d1f93aae2069fb5ed2ead30 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Fri, 1 Dec 2023 10:52:10 -0800 Subject: [PATCH] Update Delete Connector Step with parsing util Signed-off-by: Daniel Widdis --- .../workflow/DeleteConnectorStep.java | 39 ++++++++----------- .../workflow/DeleteConnectorStepTests.java | 5 +-- 2 files changed, 19 insertions(+), 25 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/workflow/DeleteConnectorStep.java b/src/main/java/org/opensearch/flowframework/workflow/DeleteConnectorStep.java index bf0fae33e..517e484a7 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/DeleteConnectorStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/DeleteConnectorStep.java @@ -13,13 +13,14 @@ import org.opensearch.ExceptionsHelper; import org.opensearch.action.delete.DeleteResponse; import org.opensearch.core.action.ActionListener; -import org.opensearch.core.rest.RestStatus; import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.flowframework.util.ParseUtils; import org.opensearch.ml.client.MachineLearningNodeClient; import java.io.IOException; +import java.util.Collections; import java.util.Map; -import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import static org.opensearch.flowframework.common.CommonValue.CONNECTOR_ID; @@ -72,29 +73,23 @@ public void onFailure(Exception e) { } }; - String connectorId = null; - - // Previous Node inputs defines which step the connector ID came from - Optional previousNode = previousNodeInputs.entrySet() - .stream() - .filter(e -> CONNECTOR_ID.equals(e.getValue())) - .map(Map.Entry::getKey) - .findFirst(); - if (previousNode.isPresent()) { - WorkflowData previousNodeOutput = outputs.get(previousNode.get()); - if (previousNodeOutput != null && previousNodeOutput.getContent().containsKey(CONNECTOR_ID)) { - connectorId = previousNodeOutput.getContent().get(CONNECTOR_ID).toString(); - } - } + Set requiredKeys = Set.of(CONNECTOR_ID); + Set optionalKeys = Collections.emptySet(); - if (connectorId != null) { - mlClient.deleteConnector(connectorId, actionListener); - } else { - deleteConnectorFuture.completeExceptionally( - new FlowFrameworkException("Required field " + CONNECTOR_ID + " is not provided", RestStatus.BAD_REQUEST) + try { + Map inputs = ParseUtils.getInputsFromPreviousSteps( + requiredKeys, + optionalKeys, + currentNodeInputs, + outputs, + previousNodeInputs ); - } + String connectorId = (String) inputs.get(CONNECTOR_ID); + mlClient.deleteConnector(connectorId, actionListener); + } catch (FlowFrameworkException e) { + deleteConnectorFuture.completeExceptionally(e); + } return deleteConnectorFuture; } diff --git a/src/test/java/org/opensearch/flowframework/workflow/DeleteConnectorStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/DeleteConnectorStepTests.java index 3c997a02e..a766d51c9 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/DeleteConnectorStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/DeleteConnectorStepTests.java @@ -13,7 +13,6 @@ import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.rest.RestStatus; -import org.opensearch.flowframework.common.CommonValue; import org.opensearch.flowframework.exception.FlowFrameworkException; import org.opensearch.ml.client.MachineLearningNodeClient; import org.opensearch.ml.common.transport.connector.MLCreateConnectorResponse; @@ -44,7 +43,7 @@ public void setUp() throws Exception { MockitoAnnotations.openMocks(this); - inputData = new WorkflowData(Map.of(CommonValue.CONNECTOR_ID, "test"), "test-id", "test-node-id"); + inputData = new WorkflowData(Collections.emptyMap(), "test-id", "test-node-id"); } public void testDeleteConnector() throws IOException, ExecutionException, InterruptedException { @@ -86,7 +85,7 @@ public void testNoConnectorIdInOutput() throws IOException { assertTrue(future.isCompletedExceptionally()); ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get().getContent()); assertTrue(ex.getCause() instanceof FlowFrameworkException); - assertEquals("Required field connector_id is not provided", ex.getCause().getMessage()); + assertEquals("Missing required inputs [connector_id] in workflow [test-id] node [test-node-id]", ex.getCause().getMessage()); } public void testDeleteConnectorFailure() throws IOException {