Skip to content

Commit

Permalink
Improves workflow node comparision
Browse files Browse the repository at this point in the history
Signed-off-by: Joshua Palis <[email protected]>
  • Loading branch information
joshpalis committed Jul 25, 2024
1 parent 8530df0 commit 66e8c81
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 29 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ dependencies {
implementation "jakarta.json.bind:jakarta.json.bind-api:3.0.1"
implementation "org.glassfish:jakarta.json:2.0.1"
implementation "org.eclipse:yasson:3.0.3"
implementation group: 'com.google.code.gson', name: 'gson', version: '2.10.1'

// ZipArchive dependencies used for integration tests
zipArchive group: 'org.opensearch.plugin', name:'opensearch-ml-plugin', version: "${opensearch_build}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.opensearch.flowframework.model.State;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.model.Workflow;
import org.opensearch.flowframework.util.EncryptorUtils;
import org.opensearch.flowframework.workflow.ProcessNode;
import org.opensearch.flowframework.workflow.WorkflowProcessSorter;
import org.opensearch.flowframework.workflow.WorkflowStepFactory;
Expand Down Expand Up @@ -66,6 +67,7 @@ public class ReprovisionWorkflowTransportAction extends HandledTransportAction<R
private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler;
private final FlowFrameworkSettings flowFrameworkSettings;
private final PluginsService pluginsService;
private final EncryptorUtils encryptorUtils;

@Inject
public ReprovisionWorkflowTransportAction(
Expand All @@ -77,6 +79,7 @@ public ReprovisionWorkflowTransportAction(
WorkflowProcessSorter workflowProcessSorter,
FlowFrameworkIndicesHandler flowFrameworkIndicesHandler,
FlowFrameworkSettings flowFrameworkSettings,
EncryptorUtils encryptorUtils,
PluginsService pluginsService
) {
super(ReprovisionWorkflowAction.NAME, transportService, actionFilters, ReprovisionWorkflowRequest::new);
Expand All @@ -86,14 +89,15 @@ public ReprovisionWorkflowTransportAction(
this.workflowProcessSorter = workflowProcessSorter;
this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler;
this.flowFrameworkSettings = flowFrameworkSettings;
this.encryptorUtils = encryptorUtils;
this.pluginsService = pluginsService;
}

@Override
protected void doExecute(Task task, ReprovisionWorkflowRequest request, ActionListener<WorkflowResponse> listener) {

String workflowId = request.getWorkflowId();
Template originalTemplate = request.getOriginalTemplate();
Template originalTemplate = encryptorUtils.decryptTemplateCredentials(request.getOriginalTemplate());
Template updatedTemplate = request.getUpdatedTemplate();

// Validate updated template prior to execution
Expand Down
17 changes: 17 additions & 0 deletions src/main/java/org/opensearch/flowframework/util/ParseUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
*/
package org.opensearch.flowframework.util;

import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.client.Client;
Expand Down Expand Up @@ -498,4 +500,19 @@ public static <T> T parseIfExists(Map<String, Object> inputs, String key, Class<
throw new IllegalArgumentException("Unsupported type: " + type);
}
}

/**
* Compares workflow node user inputs
* @param originalInputs the original node user inputs
* @param updatedInputs the updated node user inputs
* @throws Exception for issues processing map
* @return boolean if equivalent
*/
public static boolean userInputsEquals(Map<String, Object> originalInputs, Map<String, Object> updatedInputs) throws Exception {
String originalInputsJson = parseArbitraryStringToObjectMapToString(originalInputs);
String updatedInputsJson = parseArbitraryStringToObjectMapToString(updatedInputs);
JsonElement elem1 = JsonParser.parseString(originalInputsJson);
JsonElement elem2 = JsonParser.parseString(updatedInputsJson);
return elem1.equals(elem2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ public PlainActionFuture<WorkflowData> execute(

Map<String, Object> sourceAsMap = XContentHelper.convertToMap(configurationsBytes, false, MediaTypeRegistry.JSON).v2();

// TODO : Add support to update index mappings

// extract index settings from configuration
if (!sourceAsMap.containsKey("settings")) {
String errorMessage = "Failed to update index settings for index "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.flowframework.model.Workflow;
import org.opensearch.flowframework.model.WorkflowEdge;
import org.opensearch.flowframework.model.WorkflowNode;
import org.opensearch.flowframework.util.ParseUtils;
import org.opensearch.plugins.PluginInfo;
import org.opensearch.plugins.PluginsService;
import org.opensearch.threadpool.ThreadPool;
Expand All @@ -32,7 +33,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
Expand Down Expand Up @@ -161,7 +161,7 @@ public List<ProcessNode> createReprovisionSequence(
Template originalTemplate,
Template updatedTemplate,
List<ResourceCreated> resourcesCreated
) {
) throws Exception {

Workflow updatedWorkflow = updatedTemplate.workflows().get(PROVISION_WORKFLOW);
if (updatedWorkflow.nodes().size() > this.maxWorkflowSteps) {
Expand Down Expand Up @@ -215,8 +215,6 @@ public List<ProcessNode> createReprovisionSequence(

if (!originalTemplateMap.containsKey(node.id())) {

logger.info("TESTING : Node : " + node.id() + " is an additive modification!");

// Case 1 : Additive modification, create new node

WorkflowStep step = workflowStepFactory.createStep(node.type());
Expand All @@ -237,27 +235,11 @@ public List<ProcessNode> createReprovisionSequence(

} else {

logger.info("TESTING : Node : " + node.id() + " is an existing modification!");

// Case 2 : Existing Modification, compare previous node inputs and user inputs
WorkflowNode originalNode = originalTemplateMap.get(node.id());

Map<String, Object> updatedNodeUserInputs = node.userInputs();
Map<String, Object> originalNodeUserInputs = originalNode.userInputs();

boolean userInputsIsUpdated = false;
for (String key : updatedNodeUserInputs.keySet()) {
Object updatedValue = updatedNodeUserInputs.get(key);
Object originalValue = originalNodeUserInputs.get(key);
if (!Objects.equals(updatedValue, originalValue)) {
userInputsIsUpdated = true;
break;
}
}

if (!node.previousNodeInputs().equals(originalNode.previousNodeInputs()) || userInputsIsUpdated) {

logger.info("TESTING : Node : " + node.id() + " needs to be updated");
if (!node.previousNodeInputs().equals(originalNode.previousNodeInputs())
|| !ParseUtils.userInputsEquals(originalNode.userInputs(), node.userInputs())) {

// Create Update Step (if one is available)
String updateStepName = WorkflowResources.getUpdateStepByWorkflowStep(node.type());
Expand All @@ -279,7 +261,6 @@ public List<ProcessNode> createReprovisionSequence(
} else {

// Case 3 : Cannot update step (not supported)
logger.info("TESTING : Node : " + node.id() + "has changed inputs and does not support updates");
throw new FlowFrameworkException(
"Workflow Step " + node.id() + " does not support updates when reprovisioning.",
RestStatus.BAD_REQUEST
Expand All @@ -289,15 +270,10 @@ public List<ProcessNode> createReprovisionSequence(
} else {

// Case 4 : No modification to existing node, create proxy step to pass down required input to dependent nodes
logger.info("TESTING : Node : " + node.id() + "needs to get resources");

// Node ID should give us resources created
ResourceCreated nodeResource = null;
for (ResourceCreated resourceCreated : resourcesCreated) {
if (resourceCreated.workflowStepId().equals(node.id())) {
logger.info(
"TESTING : FOUND RESOURCE : Resource Created workflow Step ID : " + resourceCreated.workflowStepId()
);
nodeResource = resourceCreated;
}
}
Expand Down

0 comments on commit 66e8c81

Please sign in to comment.