Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Prevent duplicate inferred edges #349

Merged
merged 1 commit into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public static Workflow parse(XContentParser parser) throws IOException {
// Iterate the nodes and infer edges from previous node inputs
List<WorkflowEdge> inferredEdges = nodes.stream()
.flatMap(node -> node.previousNodeInputs().keySet().stream().map(previousNode -> new WorkflowEdge(previousNode, node.id())))
.distinct()
.collect(Collectors.toList());
// Remove any that are already in edges list
inferredEdges.removeAll(edges);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,14 +222,14 @@ public void validateGraph(List<ProcessNode> processNodes, WorkflowValidator vali
.collect(Collectors.toList());

// Retrieve all the user input data from this node
List<String> currentNodeUserInputs = new ArrayList<String>(processNode.input().getContent().keySet());
List<String> currentNodeUserInputs = new ArrayList<>(processNode.input().getContent().keySet());

// Combine both predecessor outputs and current node user inputs
List<String> allInputs = Stream.concat(predecessorOutputs.stream(), currentNodeUserInputs.stream())
.collect(Collectors.toList());

// Retrieve list of required inputs from the current process node and compare
List<String> expectedInputs = new ArrayList<String>(
List<String> expectedInputs = new ArrayList<>(
validator.getWorkflowStepValidators().get(processNode.workflowStep().getName()).getInputs()
);

Expand Down Expand Up @@ -321,8 +321,9 @@ private static List<WorkflowNode> topologicalSort(List<WorkflowNode> workflowNod
// L <- Empty list that will contain the sorted elements
List<WorkflowNode> sortedNodes = new ArrayList<>();
// S <- Set of all nodes with no incoming edge
Queue<WorkflowNode> sourceNodes = new ArrayDeque<>();
workflowNodes.stream().filter(n -> !predecessorEdges.containsKey(n)).forEach(n -> sourceNodes.add(n));
Queue<WorkflowNode> sourceNodes = workflowNodes.stream()
.filter(n -> !predecessorEdges.containsKey(n))
.collect(ArrayDeque::new, ArrayDeque::add, ArrayDeque::addAll);
if (sourceNodes.isEmpty()) {
throw new FlowFrameworkException("No start node detected: all nodes have a predecessor.", RestStatus.BAD_REQUEST);
}
Expand All @@ -340,7 +341,7 @@ private static List<WorkflowNode> topologicalSort(List<WorkflowNode> workflowNod
// remove edge e from the graph
graph.remove(e);
// if m has no other incoming edges then
if (!predecessorEdges.get(m).stream().anyMatch(i -> graph.contains(i))) {
if (predecessorEdges.get(m).stream().noneMatch(graph::contains)) {
// insert m into S
sourceNodes.add(m);
}
Expand Down
Loading