diff --git a/src/main/java/org/opensearch/flowframework/model/Workflow.java b/src/main/java/org/opensearch/flowframework/model/Workflow.java index 7902f4f40..a1fd4f4b5 100644 --- a/src/main/java/org/opensearch/flowframework/model/Workflow.java +++ b/src/main/java/org/opensearch/flowframework/model/Workflow.java @@ -123,6 +123,7 @@ public static Workflow parse(XContentParser parser) throws IOException { // Iterate the nodes and infer edges from previous node inputs List inferredEdges = nodes.stream() .flatMap(node -> node.previousNodeInputs().keySet().stream().map(previousNode -> new WorkflowEdge(previousNode, node.id()))) + .distinct() .collect(Collectors.toList()); // Remove any that are already in edges list inferredEdges.removeAll(edges); diff --git a/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java b/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java index bb1334470..b865786e2 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java +++ b/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java @@ -222,14 +222,14 @@ public void validateGraph(List processNodes, WorkflowValidator vali .collect(Collectors.toList()); // Retrieve all the user input data from this node - List currentNodeUserInputs = new ArrayList(processNode.input().getContent().keySet()); + List currentNodeUserInputs = new ArrayList<>(processNode.input().getContent().keySet()); // Combine both predecessor outputs and current node user inputs List allInputs = Stream.concat(predecessorOutputs.stream(), currentNodeUserInputs.stream()) .collect(Collectors.toList()); // Retrieve list of required inputs from the current process node and compare - List expectedInputs = new ArrayList( + List expectedInputs = new ArrayList<>( validator.getWorkflowStepValidators().get(processNode.workflowStep().getName()).getInputs() ); @@ -321,8 +321,9 @@ private static List topologicalSort(List workflowNod // L <- Empty list that will contain the sorted elements List sortedNodes = new ArrayList<>(); // S <- Set of all nodes with no incoming edge - Queue sourceNodes = new ArrayDeque<>(); - workflowNodes.stream().filter(n -> !predecessorEdges.containsKey(n)).forEach(n -> sourceNodes.add(n)); + Queue sourceNodes = workflowNodes.stream() + .filter(n -> !predecessorEdges.containsKey(n)) + .collect(ArrayDeque::new, ArrayDeque::add, ArrayDeque::addAll); if (sourceNodes.isEmpty()) { throw new FlowFrameworkException("No start node detected: all nodes have a predecessor.", RestStatus.BAD_REQUEST); } @@ -340,7 +341,7 @@ private static List topologicalSort(List workflowNod // remove edge e from the graph graph.remove(e); // if m has no other incoming edges then - if (!predecessorEdges.get(m).stream().anyMatch(i -> graph.contains(i))) { + if (predecessorEdges.get(m).stream().noneMatch(graph::contains)) { // insert m into S sourceNodes.add(m); }