Skip to content

Commit

Permalink
refactor reprovision sequence creation
Browse files Browse the repository at this point in the history
Signed-off-by: Joshua Palis <[email protected]>
  • Loading branch information
joshpalis committed Aug 4, 2024
1 parent ad8ee5b commit 6517669
Showing 1 changed file with 190 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -198,115 +198,210 @@ public List<ProcessNode> createReprovisionSequence(
);
}

List<ProcessNode> reprovisionSequence = new ArrayList<>();
Map<String, ProcessNode> idToNodeMap = new HashMap<>();

// Iterate through sorted Updated Nodes
for (WorkflowNode node : sortedUpdatedNodes) {

WorkflowData data = new WorkflowData(node.userInputs(), updatedWorkflow.userParams(), workflowId, node.id());
List<ProcessNode> predecessorNodes = updatedWorkflow.edges()
.stream()
.filter(e -> e.destination().equals(node.id()))
// since we are iterating in topological order we know all predecessors will be in the map
.map(e -> idToNodeMap.get(e.source()))
.collect(Collectors.toList());

TimeValue nodeTimeout = parseTimeout(node);
List<ProcessNode> reprovisionSequence = createReprovisionSequence(
workflowId,
updatedWorkflow,
sortedUpdatedNodes,
originalTemplateMap,
resourcesCreated
);

if (!originalTemplateMap.containsKey(node.id())) {
// If the reprovision sequence consists entirely of WorkflowDataSteps, then no modifications were made to the exisiting template.
if (reprovisionSequence.stream().allMatch(n -> n.workflowStep().getName().equals(WorkflowDataStep.NAME))) {
throw new FlowFrameworkException("Template does not contain any modifications", RestStatus.BAD_REQUEST);
}

// Case 1 : Additive modification, create new node
return reprovisionSequence;
}

WorkflowStep step = workflowStepFactory.createStep(node.type());
/**
* Compares an original and upated template and creates a list of update, create or workflowdatastep nodes
* @param workflowId the workflow ID associated with the template
* @param updatedWorkflow the updated workflow to be processed
* @param sortedUpdatedNodes the topologically sorted updated template nodes
* @param originalTemplateMap a map of node Id to workflow node of the original template
* @param resourcesCreated a list of resources created for this template
* @return a list of process node representing the reprovision sequence
* @throws Exception for issues creating the reprovision sequence
*/
private List<ProcessNode> createReprovisionSequence(
String workflowId,
Workflow updatedWorkflow,
List<WorkflowNode> sortedUpdatedNodes,
Map<String, WorkflowNode> originalTemplateMap,
List<ResourceCreated> resourcesCreated
) throws Exception {
Map<String, ProcessNode> idToNodeMap = new HashMap<>();
List<ProcessNode> reprovisionSequence = new ArrayList<>();

ProcessNode processNode = new ProcessNode(
node.id(),
step,
node.previousNodeInputs(),
Collections.emptyMap(), // TODO Add support to reprovision substitution templates
data,
predecessorNodes,
threadPool,
PROVISION_WORKFLOW_THREAD_POOL,
nodeTimeout
);
for (WorkflowNode node : sortedUpdatedNodes) {
ProcessNode processNode = createProcessNode(
updatedWorkflow,
node,
originalTemplateMap,
resourcesCreated,
workflowId,
idToNodeMap
);
if (processNode != null) {
idToNodeMap.put(processNode.id(), processNode);
reprovisionSequence.add(processNode);
}
}

} else {

// Case 2 : Existing Modification, compare previous node inputs and user inputs
WorkflowNode originalNode = originalTemplateMap.get(node.id());

if (!node.previousNodeInputs().equals(originalNode.previousNodeInputs())
|| !ParseUtils.userInputsEquals(originalNode.userInputs(), node.userInputs())) {

// Create Update Step (if one is available)
String updateStepName = WorkflowResources.getUpdateStepByWorkflowStep(node.type());
if (updateStepName != null) {
WorkflowStep step = workflowStepFactory.createStep(updateStepName);
ProcessNode processNode = new ProcessNode(
node.id(),
step,
node.previousNodeInputs(),
Collections.emptyMap(), // TODO Add support to reprovision substitution templates
data,
predecessorNodes,
threadPool,
PROVISION_WORKFLOW_THREAD_POOL,
nodeTimeout
);
idToNodeMap.put(processNode.id(), processNode);
reprovisionSequence.add(processNode);
} else {

// Case 3 : Cannot update step (not supported)
throw new FlowFrameworkException(
"Workflow Step " + node.id() + " does not support updates when reprovisioning.",
RestStatus.BAD_REQUEST
);

}
} else {

// Case 4 : No modification to existing node, create proxy step to pass down required input to dependent nodes
// Node ID should give us resources created
ResourceCreated nodeResource = resourcesCreated.stream()
.filter(rc -> rc.workflowStepId().equals(node.id()))
.findFirst()
.orElse(null);

if (nodeResource != null) {
// create process node
ProcessNode processNode = new ProcessNode(
node.id(),
new WorkflowDataStep(nodeResource),
node.previousNodeInputs(),
Collections.emptyMap(),
data,
predecessorNodes,
threadPool,
PROVISION_WORKFLOW_THREAD_POOL,
nodeTimeout
);
idToNodeMap.put(processNode.id(), processNode);
reprovisionSequence.add(processNode);

}

}
return reprovisionSequence;
}

/**
* Determines which type of process node to create for a reprovision sequence
* @param updatedWorkflow the updated workflow to be processed
* @param node the current workflow node
* @param originalTemplateMap a map of node Id to workflow node of the original template
* @param resourcesCreated a list of resources created for this template
* @param workflowId the workflow ID associated with the template
* @param idToNodeMap a map of the current reprovision sequence
* @return a ProcessNode
* @throws Exception for issues creating the process node
*/
private ProcessNode createProcessNode(
Workflow updatedWorkflow,
WorkflowNode node,
Map<String, WorkflowNode> originalTemplateMap,
List<ResourceCreated> resourcesCreated,
String workflowId,
Map<String, ProcessNode> idToNodeMap
) throws Exception {
WorkflowData data = new WorkflowData(node.userInputs(), updatedWorkflow.userParams(), workflowId, node.id());
List<ProcessNode> predecessorNodes = updatedWorkflow.edges()
.stream()
.filter(e -> e.destination().equals(node.id()))
// since we are iterating in topological order we know all predecessors will be in the map
.map(e -> idToNodeMap.get(e.source()))
.collect(Collectors.toList());
TimeValue nodeTimeout = parseTimeout(node);

if (!originalTemplateMap.containsKey(node.id())) {
// Case 1: Additive modification, create new node
return createNewProcessNode(node, data, predecessorNodes, nodeTimeout);
} else {
WorkflowNode originalNode = originalTemplateMap.get(node.id());
if (shouldUpdateNode(node, originalNode)) {
// Case 2: Existing modification, create update step
return createUpdateProcessNode(node, data, predecessorNodes, nodeTimeout);
} else {
// Case 4: No modification to existing node, create proxy step
return createWorkflowDataStepNode(node, data, predecessorNodes, nodeTimeout, resourcesCreated);
}
}
}

/**
* Creates a process node to create a new resource
* @param node the current node
* @param data the current node data
* @param predecessorNodes the current node predecessors
* @param nodeTimeout the current node timeout
* @return a Process Node
*/
private ProcessNode createNewProcessNode(
WorkflowNode node,
WorkflowData data,
List<ProcessNode> predecessorNodes,
TimeValue nodeTimeout
) {
WorkflowStep step = workflowStepFactory.createStep(node.type());
return new ProcessNode(
node.id(),
step,
node.previousNodeInputs(),
Collections.emptyMap(), // TODO Add support to reprovision substitution templates
data,
predecessorNodes,
threadPool,
PROVISION_WORKFLOW_THREAD_POOL,
nodeTimeout
);
}

/**
* Creates a process node to update an existing resource
* @param node the current node
* @param data the current node data
* @param predecessorNodes the current node predecessors
* @param nodeTimeout the current node timeout
* @return a ProcessNode
* @throws FlowFrameworkException if the current node does not support updates
*/
private ProcessNode createUpdateProcessNode(
WorkflowNode node,
WorkflowData data,
List<ProcessNode> predecessorNodes,
TimeValue nodeTimeout
) throws FlowFrameworkException {
String updateStepName = WorkflowResources.getUpdateStepByWorkflowStep(node.type());
if (updateStepName != null) {
WorkflowStep step = workflowStepFactory.createStep(updateStepName);
return new ProcessNode(
node.id(),
step,
node.previousNodeInputs(),
Collections.emptyMap(), // TODO Add support to reprovision substitution templates
data,
predecessorNodes,
threadPool,
PROVISION_WORKFLOW_THREAD_POOL,
nodeTimeout
);
} else {
// Case 3 : Cannot update step (not supported)
throw new FlowFrameworkException(
"Workflow Step " + node.id() + " does not support updates when reprovisioning.",
RestStatus.BAD_REQUEST
);
}
}

// If the reprovision sequence consists entirely of WorkflowDataSteps, then no modifications were made to the exisiting template.
if (reprovisionSequence.stream().allMatch(n -> n.workflowStep().getName().equals(WorkflowDataStep.NAME))) {
throw new FlowFrameworkException("Template does not contain any modifications", RestStatus.BAD_REQUEST);
/**
* Creates a process node to pass workflow data to the next step in the reprovision sequence
* @param node the current node
* @param data the current node data
* @param predecessorNodes the current node predecessors
* @param nodeTimeout the current node timeout
* @param resourcesCreated the list of resources created for the template assoicated with this node
* @return a Process node
*/
private ProcessNode createWorkflowDataStepNode(
WorkflowNode node,
WorkflowData data,
List<ProcessNode> predecessorNodes,
TimeValue nodeTimeout,
List<ResourceCreated> resourcesCreated
) {
ResourceCreated nodeResource = resourcesCreated.stream()
.filter(rc -> rc.workflowStepId().equals(node.id()))
.findFirst()
.orElse(null);

if (nodeResource != null) {
return new ProcessNode(
node.id(),
new WorkflowDataStep(nodeResource),
node.previousNodeInputs(),
Collections.emptyMap(),
data,
predecessorNodes,
threadPool,
PROVISION_WORKFLOW_THREAD_POOL,
nodeTimeout
);
} else {
return null;
}
}

return reprovisionSequence;
private boolean shouldUpdateNode(WorkflowNode node, WorkflowNode originalNode) throws Exception {
return !node.previousNodeInputs().equals(originalNode.previousNodeInputs())
|| !ParseUtils.userInputsEquals(originalNode.userInputs(), node.userInputs());
}

/**
Expand Down

0 comments on commit 6517669

Please sign in to comment.