diff --git a/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java b/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java index 83e9b9310..88a5d67e5 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java +++ b/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java @@ -198,115 +198,210 @@ public List createReprovisionSequence( ); } - List reprovisionSequence = new ArrayList<>(); - Map idToNodeMap = new HashMap<>(); - - // Iterate through sorted Updated Nodes - for (WorkflowNode node : sortedUpdatedNodes) { - - WorkflowData data = new WorkflowData(node.userInputs(), updatedWorkflow.userParams(), workflowId, node.id()); - List predecessorNodes = updatedWorkflow.edges() - .stream() - .filter(e -> e.destination().equals(node.id())) - // since we are iterating in topological order we know all predecessors will be in the map - .map(e -> idToNodeMap.get(e.source())) - .collect(Collectors.toList()); - - TimeValue nodeTimeout = parseTimeout(node); + List reprovisionSequence = createReprovisionSequence( + workflowId, + updatedWorkflow, + sortedUpdatedNodes, + originalTemplateMap, + resourcesCreated + ); - if (!originalTemplateMap.containsKey(node.id())) { + // If the reprovision sequence consists entirely of WorkflowDataSteps, then no modifications were made to the exisiting template. + if (reprovisionSequence.stream().allMatch(n -> n.workflowStep().getName().equals(WorkflowDataStep.NAME))) { + throw new FlowFrameworkException("Template does not contain any modifications", RestStatus.BAD_REQUEST); + } - // Case 1 : Additive modification, create new node + return reprovisionSequence; + } - WorkflowStep step = workflowStepFactory.createStep(node.type()); + /** + * Compares an original and upated template and creates a list of update, create or workflowdatastep nodes + * @param workflowId the workflow ID associated with the template + * @param updatedWorkflow the updated workflow to be processed + * @param sortedUpdatedNodes the topologically sorted updated template nodes + * @param originalTemplateMap a map of node Id to workflow node of the original template + * @param resourcesCreated a list of resources created for this template + * @return a list of process node representing the reprovision sequence + * @throws Exception for issues creating the reprovision sequence + */ + private List createReprovisionSequence( + String workflowId, + Workflow updatedWorkflow, + List sortedUpdatedNodes, + Map originalTemplateMap, + List resourcesCreated + ) throws Exception { + Map idToNodeMap = new HashMap<>(); + List reprovisionSequence = new ArrayList<>(); - ProcessNode processNode = new ProcessNode( - node.id(), - step, - node.previousNodeInputs(), - Collections.emptyMap(), // TODO Add support to reprovision substitution templates - data, - predecessorNodes, - threadPool, - PROVISION_WORKFLOW_THREAD_POOL, - nodeTimeout - ); + for (WorkflowNode node : sortedUpdatedNodes) { + ProcessNode processNode = createProcessNode( + updatedWorkflow, + node, + originalTemplateMap, + resourcesCreated, + workflowId, + idToNodeMap + ); + if (processNode != null) { idToNodeMap.put(processNode.id(), processNode); reprovisionSequence.add(processNode); + } + } - } else { - - // Case 2 : Existing Modification, compare previous node inputs and user inputs - WorkflowNode originalNode = originalTemplateMap.get(node.id()); - - if (!node.previousNodeInputs().equals(originalNode.previousNodeInputs()) - || !ParseUtils.userInputsEquals(originalNode.userInputs(), node.userInputs())) { - - // Create Update Step (if one is available) - String updateStepName = WorkflowResources.getUpdateStepByWorkflowStep(node.type()); - if (updateStepName != null) { - WorkflowStep step = workflowStepFactory.createStep(updateStepName); - ProcessNode processNode = new ProcessNode( - node.id(), - step, - node.previousNodeInputs(), - Collections.emptyMap(), // TODO Add support to reprovision substitution templates - data, - predecessorNodes, - threadPool, - PROVISION_WORKFLOW_THREAD_POOL, - nodeTimeout - ); - idToNodeMap.put(processNode.id(), processNode); - reprovisionSequence.add(processNode); - } else { - - // Case 3 : Cannot update step (not supported) - throw new FlowFrameworkException( - "Workflow Step " + node.id() + " does not support updates when reprovisioning.", - RestStatus.BAD_REQUEST - ); - - } - } else { - - // Case 4 : No modification to existing node, create proxy step to pass down required input to dependent nodes - // Node ID should give us resources created - ResourceCreated nodeResource = resourcesCreated.stream() - .filter(rc -> rc.workflowStepId().equals(node.id())) - .findFirst() - .orElse(null); - - if (nodeResource != null) { - // create process node - ProcessNode processNode = new ProcessNode( - node.id(), - new WorkflowDataStep(nodeResource), - node.previousNodeInputs(), - Collections.emptyMap(), - data, - predecessorNodes, - threadPool, - PROVISION_WORKFLOW_THREAD_POOL, - nodeTimeout - ); - idToNodeMap.put(processNode.id(), processNode); - reprovisionSequence.add(processNode); - - } - - } + return reprovisionSequence; + } + /** + * Determines which type of process node to create for a reprovision sequence + * @param updatedWorkflow the updated workflow to be processed + * @param node the current workflow node + * @param originalTemplateMap a map of node Id to workflow node of the original template + * @param resourcesCreated a list of resources created for this template + * @param workflowId the workflow ID associated with the template + * @param idToNodeMap a map of the current reprovision sequence + * @return a ProcessNode + * @throws Exception for issues creating the process node + */ + private ProcessNode createProcessNode( + Workflow updatedWorkflow, + WorkflowNode node, + Map originalTemplateMap, + List resourcesCreated, + String workflowId, + Map idToNodeMap + ) throws Exception { + WorkflowData data = new WorkflowData(node.userInputs(), updatedWorkflow.userParams(), workflowId, node.id()); + List predecessorNodes = updatedWorkflow.edges() + .stream() + .filter(e -> e.destination().equals(node.id())) + // since we are iterating in topological order we know all predecessors will be in the map + .map(e -> idToNodeMap.get(e.source())) + .collect(Collectors.toList()); + TimeValue nodeTimeout = parseTimeout(node); + + if (!originalTemplateMap.containsKey(node.id())) { + // Case 1: Additive modification, create new node + return createNewProcessNode(node, data, predecessorNodes, nodeTimeout); + } else { + WorkflowNode originalNode = originalTemplateMap.get(node.id()); + if (shouldUpdateNode(node, originalNode)) { + // Case 2: Existing modification, create update step + return createUpdateProcessNode(node, data, predecessorNodes, nodeTimeout); + } else { + // Case 4: No modification to existing node, create proxy step + return createWorkflowDataStepNode(node, data, predecessorNodes, nodeTimeout, resourcesCreated); } + } + } + /** + * Creates a process node to create a new resource + * @param node the current node + * @param data the current node data + * @param predecessorNodes the current node predecessors + * @param nodeTimeout the current node timeout + * @return a Process Node + */ + private ProcessNode createNewProcessNode( + WorkflowNode node, + WorkflowData data, + List predecessorNodes, + TimeValue nodeTimeout + ) { + WorkflowStep step = workflowStepFactory.createStep(node.type()); + return new ProcessNode( + node.id(), + step, + node.previousNodeInputs(), + Collections.emptyMap(), // TODO Add support to reprovision substitution templates + data, + predecessorNodes, + threadPool, + PROVISION_WORKFLOW_THREAD_POOL, + nodeTimeout + ); + } + + /** + * Creates a process node to update an existing resource + * @param node the current node + * @param data the current node data + * @param predecessorNodes the current node predecessors + * @param nodeTimeout the current node timeout + * @return a ProcessNode + * @throws FlowFrameworkException if the current node does not support updates + */ + private ProcessNode createUpdateProcessNode( + WorkflowNode node, + WorkflowData data, + List predecessorNodes, + TimeValue nodeTimeout + ) throws FlowFrameworkException { + String updateStepName = WorkflowResources.getUpdateStepByWorkflowStep(node.type()); + if (updateStepName != null) { + WorkflowStep step = workflowStepFactory.createStep(updateStepName); + return new ProcessNode( + node.id(), + step, + node.previousNodeInputs(), + Collections.emptyMap(), // TODO Add support to reprovision substitution templates + data, + predecessorNodes, + threadPool, + PROVISION_WORKFLOW_THREAD_POOL, + nodeTimeout + ); + } else { + // Case 3 : Cannot update step (not supported) + throw new FlowFrameworkException( + "Workflow Step " + node.id() + " does not support updates when reprovisioning.", + RestStatus.BAD_REQUEST + ); } + } - // If the reprovision sequence consists entirely of WorkflowDataSteps, then no modifications were made to the exisiting template. - if (reprovisionSequence.stream().allMatch(n -> n.workflowStep().getName().equals(WorkflowDataStep.NAME))) { - throw new FlowFrameworkException("Template does not contain any modifications", RestStatus.BAD_REQUEST); + /** + * Creates a process node to pass workflow data to the next step in the reprovision sequence + * @param node the current node + * @param data the current node data + * @param predecessorNodes the current node predecessors + * @param nodeTimeout the current node timeout + * @param resourcesCreated the list of resources created for the template assoicated with this node + * @return a Process node + */ + private ProcessNode createWorkflowDataStepNode( + WorkflowNode node, + WorkflowData data, + List predecessorNodes, + TimeValue nodeTimeout, + List resourcesCreated + ) { + ResourceCreated nodeResource = resourcesCreated.stream() + .filter(rc -> rc.workflowStepId().equals(node.id())) + .findFirst() + .orElse(null); + + if (nodeResource != null) { + return new ProcessNode( + node.id(), + new WorkflowDataStep(nodeResource), + node.previousNodeInputs(), + Collections.emptyMap(), + data, + predecessorNodes, + threadPool, + PROVISION_WORKFLOW_THREAD_POOL, + nodeTimeout + ); + } else { + return null; } + } - return reprovisionSequence; + private boolean shouldUpdateNode(WorkflowNode node, WorkflowNode originalNode) throws Exception { + return !node.previousNodeInputs().equals(originalNode.previousNodeInputs()) + || !ParseUtils.userInputsEquals(originalNode.userInputs(), node.userInputs()); } /**