From 66e8c816072c4be452471994cb3013e77db91545 Mon Sep 17 00:00:00 2001 From: Joshua Palis Date: Thu, 25 Jul 2024 18:52:15 +0000 Subject: [PATCH] Improves workflow node comparision Signed-off-by: Joshua Palis --- build.gradle | 1 + .../ReprovisionWorkflowTransportAction.java | 6 +++- .../flowframework/util/ParseUtils.java | 17 ++++++++++ .../workflow/UpdateIndexStep.java | 2 ++ .../workflow/WorkflowProcessSorter.java | 32 +++---------------- 5 files changed, 29 insertions(+), 29 deletions(-) diff --git a/build.gradle b/build.gradle index e0f38276c..ad0af25d5 100644 --- a/build.gradle +++ b/build.gradle @@ -175,6 +175,7 @@ dependencies { implementation "jakarta.json.bind:jakarta.json.bind-api:3.0.1" implementation "org.glassfish:jakarta.json:2.0.1" implementation "org.eclipse:yasson:3.0.3" + implementation group: 'com.google.code.gson', name: 'gson', version: '2.10.1' // ZipArchive dependencies used for integration tests zipArchive group: 'org.opensearch.plugin', name:'opensearch-ml-plugin', version: "${opensearch_build}" diff --git a/src/main/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportAction.java index d2889908f..17b728bbe 100644 --- a/src/main/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportAction.java @@ -27,6 +27,7 @@ import org.opensearch.flowframework.model.State; import org.opensearch.flowframework.model.Template; import org.opensearch.flowframework.model.Workflow; +import org.opensearch.flowframework.util.EncryptorUtils; import org.opensearch.flowframework.workflow.ProcessNode; import org.opensearch.flowframework.workflow.WorkflowProcessSorter; import org.opensearch.flowframework.workflow.WorkflowStepFactory; @@ -66,6 +67,7 @@ public class ReprovisionWorkflowTransportAction extends HandledTransportAction listener) { String workflowId = request.getWorkflowId(); - Template originalTemplate = request.getOriginalTemplate(); + Template originalTemplate = encryptorUtils.decryptTemplateCredentials(request.getOriginalTemplate()); Template updatedTemplate = request.getUpdatedTemplate(); // Validate updated template prior to execution diff --git a/src/main/java/org/opensearch/flowframework/util/ParseUtils.java b/src/main/java/org/opensearch/flowframework/util/ParseUtils.java index 7cd8645fe..640ce069e 100644 --- a/src/main/java/org/opensearch/flowframework/util/ParseUtils.java +++ b/src/main/java/org/opensearch/flowframework/util/ParseUtils.java @@ -8,6 +8,8 @@ */ package org.opensearch.flowframework.util; +import com.google.gson.JsonElement; +import com.google.gson.JsonParser; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.client.Client; @@ -498,4 +500,19 @@ public static T parseIfExists(Map inputs, String key, Class< throw new IllegalArgumentException("Unsupported type: " + type); } } + + /** + * Compares workflow node user inputs + * @param originalInputs the original node user inputs + * @param updatedInputs the updated node user inputs + * @throws Exception for issues processing map + * @return boolean if equivalent + */ + public static boolean userInputsEquals(Map originalInputs, Map updatedInputs) throws Exception { + String originalInputsJson = parseArbitraryStringToObjectMapToString(originalInputs); + String updatedInputsJson = parseArbitraryStringToObjectMapToString(updatedInputs); + JsonElement elem1 = JsonParser.parseString(originalInputsJson); + JsonElement elem2 = JsonParser.parseString(updatedInputsJson); + return elem1.equals(elem2); + } } diff --git a/src/main/java/org/opensearch/flowframework/workflow/UpdateIndexStep.java b/src/main/java/org/opensearch/flowframework/workflow/UpdateIndexStep.java index a9cd616ea..24dc19ef0 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/UpdateIndexStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/UpdateIndexStep.java @@ -95,6 +95,8 @@ public PlainActionFuture execute( Map sourceAsMap = XContentHelper.convertToMap(configurationsBytes, false, MediaTypeRegistry.JSON).v2(); + // TODO : Add support to update index mappings + // extract index settings from configuration if (!sourceAsMap.containsKey("settings")) { String errorMessage = "Failed to update index settings for index " diff --git a/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java b/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java index 08457885f..574d7084d 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java +++ b/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java @@ -20,6 +20,7 @@ import org.opensearch.flowframework.model.Workflow; import org.opensearch.flowframework.model.WorkflowEdge; import org.opensearch.flowframework.model.WorkflowNode; +import org.opensearch.flowframework.util.ParseUtils; import org.opensearch.plugins.PluginInfo; import org.opensearch.plugins.PluginsService; import org.opensearch.threadpool.ThreadPool; @@ -32,7 +33,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.Queue; import java.util.Set; @@ -161,7 +161,7 @@ public List createReprovisionSequence( Template originalTemplate, Template updatedTemplate, List resourcesCreated - ) { + ) throws Exception { Workflow updatedWorkflow = updatedTemplate.workflows().get(PROVISION_WORKFLOW); if (updatedWorkflow.nodes().size() > this.maxWorkflowSteps) { @@ -215,8 +215,6 @@ public List createReprovisionSequence( if (!originalTemplateMap.containsKey(node.id())) { - logger.info("TESTING : Node : " + node.id() + " is an additive modification!"); - // Case 1 : Additive modification, create new node WorkflowStep step = workflowStepFactory.createStep(node.type()); @@ -237,27 +235,11 @@ public List createReprovisionSequence( } else { - logger.info("TESTING : Node : " + node.id() + " is an existing modification!"); - // Case 2 : Existing Modification, compare previous node inputs and user inputs WorkflowNode originalNode = originalTemplateMap.get(node.id()); - Map updatedNodeUserInputs = node.userInputs(); - Map originalNodeUserInputs = originalNode.userInputs(); - - boolean userInputsIsUpdated = false; - for (String key : updatedNodeUserInputs.keySet()) { - Object updatedValue = updatedNodeUserInputs.get(key); - Object originalValue = originalNodeUserInputs.get(key); - if (!Objects.equals(updatedValue, originalValue)) { - userInputsIsUpdated = true; - break; - } - } - - if (!node.previousNodeInputs().equals(originalNode.previousNodeInputs()) || userInputsIsUpdated) { - - logger.info("TESTING : Node : " + node.id() + " needs to be updated"); + if (!node.previousNodeInputs().equals(originalNode.previousNodeInputs()) + || !ParseUtils.userInputsEquals(originalNode.userInputs(), node.userInputs())) { // Create Update Step (if one is available) String updateStepName = WorkflowResources.getUpdateStepByWorkflowStep(node.type()); @@ -279,7 +261,6 @@ public List createReprovisionSequence( } else { // Case 3 : Cannot update step (not supported) - logger.info("TESTING : Node : " + node.id() + "has changed inputs and does not support updates"); throw new FlowFrameworkException( "Workflow Step " + node.id() + " does not support updates when reprovisioning.", RestStatus.BAD_REQUEST @@ -289,15 +270,10 @@ public List createReprovisionSequence( } else { // Case 4 : No modification to existing node, create proxy step to pass down required input to dependent nodes - logger.info("TESTING : Node : " + node.id() + "needs to get resources"); - // Node ID should give us resources created ResourceCreated nodeResource = null; for (ResourceCreated resourceCreated : resourcesCreated) { if (resourceCreated.workflowStepId().equals(node.id())) { - logger.info( - "TESTING : FOUND RESOURCE : Resource Created workflow Step ID : " + resourceCreated.workflowStepId() - ); nodeResource = resourceCreated; } }