Skip to content

Commit

Permalink
Prevent duplicate inferred edges
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis committed Jan 2, 2024
1 parent 0e9156b commit 7cb96be
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public static Workflow parse(XContentParser parser) throws IOException {
// Iterate the nodes and infer edges from previous node inputs
List<WorkflowEdge> inferredEdges = nodes.stream()
.flatMap(node -> node.previousNodeInputs().keySet().stream().map(previousNode -> new WorkflowEdge(previousNode, node.id())))
.distinct()
.collect(Collectors.toList());
// Remove any that are already in edges list
inferredEdges.removeAll(edges);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,14 +222,14 @@ public void validateGraph(List<ProcessNode> processNodes, WorkflowValidator vali
.collect(Collectors.toList());

// Retrieve all the user input data from this node
List<String> currentNodeUserInputs = new ArrayList<String>(processNode.input().getContent().keySet());
List<String> currentNodeUserInputs = new ArrayList<>(processNode.input().getContent().keySet());

// Combine both predecessor outputs and current node user inputs
List<String> allInputs = Stream.concat(predecessorOutputs.stream(), currentNodeUserInputs.stream())
.collect(Collectors.toList());

// Retrieve list of required inputs from the current process node and compare
List<String> expectedInputs = new ArrayList<String>(
List<String> expectedInputs = new ArrayList<>(
validator.getWorkflowStepValidators().get(processNode.workflowStep().getName()).getInputs()
);

Expand Down Expand Up @@ -321,8 +321,9 @@ private static List<WorkflowNode> topologicalSort(List<WorkflowNode> workflowNod
// L <- Empty list that will contain the sorted elements
List<WorkflowNode> sortedNodes = new ArrayList<>();
// S <- Set of all nodes with no incoming edge
Queue<WorkflowNode> sourceNodes = new ArrayDeque<>();
workflowNodes.stream().filter(n -> !predecessorEdges.containsKey(n)).forEach(n -> sourceNodes.add(n));
Queue<WorkflowNode> sourceNodes = workflowNodes.stream()
.filter(n -> !predecessorEdges.containsKey(n))
.collect(ArrayDeque::new, ArrayDeque::add, ArrayDeque::addAll);
if (sourceNodes.isEmpty()) {
throw new FlowFrameworkException("No start node detected: all nodes have a predecessor.", RestStatus.BAD_REQUEST);
}
Expand All @@ -340,7 +341,7 @@ private static List<WorkflowNode> topologicalSort(List<WorkflowNode> workflowNod
// remove edge e from the graph
graph.remove(e);
// if m has no other incoming edges then
if (!predecessorEdges.get(m).stream().anyMatch(i -> graph.contains(i))) {
if (predecessorEdges.get(m).stream().noneMatch(graph::contains)) {
// insert m into S
sourceNodes.add(m);
}
Expand Down

0 comments on commit 7cb96be

Please sign in to comment.