Skip to content

Commit

Permalink
Util method for timeout parsing
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis committed Oct 3, 2023
1 parent ba23075 commit 869cb43
Showing 1 changed file with 13 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.flowframework.model.Workflow;
import org.opensearch.flowframework.model.WorkflowEdge;
Expand Down Expand Up @@ -74,11 +73,7 @@ public List<ProcessNode> sortProcessNodes(Workflow workflow) {
.map(e -> idToNodeMap.get(e.source()))
.collect(Collectors.toList());

TimeValue nodeTimeout = Setting.parseTimeValue(
(String) node.inputs().getOrDefault(NODE_TIMEOUT_FIELD, NODE_TIMEOUT_DEFAULT_VALUE),
TimeValue.ZERO,
String.join(".", node.id(), INPUTS_FIELD, NODE_TIMEOUT_FIELD)
);
TimeValue nodeTimeout = parseTimeout(node);
ProcessNode processNode = new ProcessNode(node.id(), step, data, predecessorNodes, threadPool, nodeTimeout);
idToNodeMap.put(processNode.id(), processNode);
nodes.add(processNode);
Expand All @@ -87,6 +82,18 @@ public List<ProcessNode> sortProcessNodes(Workflow workflow) {
return nodes;
}

private TimeValue parseTimeout(WorkflowNode node) {
String timeoutValue = (String) node.inputs().getOrDefault(NODE_TIMEOUT_FIELD, NODE_TIMEOUT_DEFAULT_VALUE);
String fieldName = String.join(".", node.id(), INPUTS_FIELD, NODE_TIMEOUT_FIELD);
TimeValue timeValue = TimeValue.parseTimeValue(timeoutValue, fieldName);
if (timeValue.millis() < 0) {
throw new IllegalArgumentException(
"Failed to parse timeout value [" + timeoutValue + "] for field [" + fieldName + "]. Must be positive"
);
}
return timeValue;
}

private static List<WorkflowNode> topologicalSort(List<WorkflowNode> workflowNodes, List<WorkflowEdge> workflowEdges) {
// Basic validation
Set<String> nodeIds = workflowNodes.stream().map(n -> n.id()).collect(Collectors.toSet());
Expand Down

0 comments on commit 869cb43

Please sign in to comment.