Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timeout for each workflow steps #317

Merged
merged 5 commits into from
Dec 27, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ private CommonValue() {}
/** Schema Version field name */
public static final String SCHEMA_VERSION_FIELD = "schema_version";
/** Global Context Index Name */
public static final String GLOBAL_CONTEXT_INDEX = ".plugins-ai-global-context";
public static final String GLOBAL_CONTEXT_INDEX = ".plugins-flow-framework-templates";
/** Global Context index mapping file path */
public static final String GLOBAL_CONTEXT_INDEX_MAPPING = "mappings/global-context.json";
/** Global Context index mapping version */
public static final Integer GLOBAL_CONTEXT_INDEX_VERSION = 1;
/** Workflow State Index Name */
public static final String WORKFLOW_STATE_INDEX = ".plugins-workflow-state";
public static final String WORKFLOW_STATE_INDEX = ".plugins-flow-framework-state";
/** Workflow State index mapping file path */
public static final String WORKFLOW_STATE_INDEX_MAPPING = "mappings/workflow-state.json";
/** Workflow State index mapping version */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,26 @@ public class WorkflowStepValidator {
private static final String OUTPUTS_FIELD = "outputs";
/** Required Plugins field name */
private static final String REQUIRED_PLUGINS = "required_plugins";
/** Timeout field name */
private static final String TIMEOUT = "timeout";

private List<String> inputs;
private List<String> outputs;
private List<String> requiredPlugins;
private String timeout;
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved

/**
* Intantiate the object representing a Workflow Step validator
* Instantiate the object representing a Workflow Step validator
* @param inputs the workflow step inputs
* @param outputs the workflow step outputs
* @param requiredPlugins the required plugins for this workflow step
* @param timeout the timeout for this workflow step
*/
public WorkflowStepValidator(List<String> inputs, List<String> outputs, List<String> requiredPlugins) {
public WorkflowStepValidator(List<String> inputs, List<String> outputs, List<String> requiredPlugins, String timeout) {
this.inputs = inputs;
this.outputs = outputs;
this.requiredPlugins = requiredPlugins;
this.timeout = timeout;
}

/**
Expand All @@ -54,6 +59,7 @@ public static WorkflowStepValidator parse(XContentParser parser) throws IOExcept
List<String> parsedInputs = new ArrayList<>();
List<String> parsedOutputs = new ArrayList<>();
List<String> requiredPlugins = new ArrayList<>();
String timeout = null;
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved

ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
Expand All @@ -78,11 +84,14 @@ public static WorkflowStepValidator parse(XContentParser parser) throws IOExcept
requiredPlugins.add(parser.text());
}
break;
case TIMEOUT:
timeout = parser.text();
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved
break;
default:
throw new IOException("Unable to parse field [" + fieldName + "] in a WorkflowStepValidator object.");
}
}
return new WorkflowStepValidator(parsedInputs, parsedOutputs, requiredPlugins);
return new WorkflowStepValidator(parsedInputs, parsedOutputs, requiredPlugins, timeout);
}

/**
Expand All @@ -103,9 +112,17 @@ public List<String> getOutputs() {

/**
* Get the required plugins
* @return the outputs
* @return the required plugins
*/
public List<String> getRequiredPlugins() {
return List.copyOf(requiredPlugins);
}

/**
* Get the timeout
* @return the timeout
*/
public String getTimeout() {
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved
return timeout;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.opensearch.plugins.PluginInfo;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -35,6 +36,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -118,20 +120,28 @@ public List<ProcessNode> sortProcessNodes(Workflow workflow, String workflowId)
.map(e -> idToNodeMap.get(e.source()))
.collect(Collectors.toList());

TimeValue nodeTimeout = parseTimeout(node);
ProcessNode processNode = new ProcessNode(
node.id(),
step,
node.previousNodeInputs(),
data,
predecessorNodes,
threadPool,
nodeTimeout
);
idToNodeMap.put(processNode.id(), processNode);
nodes.add(processNode);
}
try {
jackiehanyang marked this conversation as resolved.
Show resolved Hide resolved
TimeValue nodeTimeout = parseTimeout(node);
ProcessNode processNode = new ProcessNode(
node.id(),
step,
node.previousNodeInputs(),
data,
predecessorNodes,
threadPool,
nodeTimeout
);
idToNodeMap.put(processNode.id(), processNode);
nodes.add(processNode);

} catch (IOException e) {
logger.error("Failed to read workflow-steps mapping file", e);
throw new FlowFrameworkException(
"Workflow " + workflowId + " failed at reading workflow-steps mapping file",
RestStatus.INTERNAL_SERVER_ERROR
);
}
}
return nodes;
}

Expand All @@ -141,7 +151,7 @@ public List<ProcessNode> sortProcessNodes(Workflow workflow, String workflowId)
* @throws Exception if validation fails
*/
public void validate(List<ProcessNode> processNodes) throws Exception {
WorkflowValidator validator = WorkflowValidator.parse("mappings/workflow-steps.json");
WorkflowValidator validator = readWorkflowValidator();
jackiehanyang marked this conversation as resolved.
Show resolved Hide resolved
validatePluginsInstalled(processNodes, validator);
validateGraph(processNodes, validator);
}
Expand Down Expand Up @@ -244,11 +254,17 @@ public void validateGraph(List<ProcessNode> processNodes, WorkflowValidator vali
);
}
}
}

private WorkflowValidator readWorkflowValidator() throws IOException {
return WorkflowValidator.parse("mappings/workflow-steps.json");
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved
}

private TimeValue parseTimeout(WorkflowNode node) {
String timeoutValue = (String) node.userInputs().getOrDefault(NODE_TIMEOUT_FIELD, NODE_TIMEOUT_DEFAULT_VALUE);
private TimeValue parseTimeout(WorkflowNode node) throws IOException {
WorkflowValidator validator = readWorkflowValidator();
String nodeTimeoutValue = Optional.ofNullable(validator.getWorkflowStepValidators().get(node.type()).getTimeout())
.orElse(NODE_TIMEOUT_DEFAULT_VALUE);
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved
String timeoutValue = (String) node.userInputs().getOrDefault(NODE_TIMEOUT_FIELD, nodeTimeoutValue);
jackiehanyang marked this conversation as resolved.
Show resolved Hide resolved
String fieldName = String.join(".", node.id(), USER_INPUTS_FIELD, NODE_TIMEOUT_FIELD);
TimeValue timeValue = TimeValue.parseTimeValue(timeoutValue, fieldName);
if (timeValue.millis() < 0) {
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
39 changes: 26 additions & 13 deletions src/main/resources/mappings/workflow-steps.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
"outputs":[
"index_name"
],
"required_plugins":[]
"required_plugins":[],
"timeout": "10s"
},
"create_ingest_pipeline": {
"inputs":[
Expand All @@ -26,7 +27,8 @@
"outputs":[
"pipeline_id"
],
"required_plugins":[]
"required_plugins":[],
"timeout": "10s"
},
"create_connector": {
"inputs":[
Expand All @@ -43,7 +45,8 @@
],
"required_plugins":[
"opensearch-ml"
]
],
"timeout": "10s"
},
"delete_connector": {
"inputs": [
Expand All @@ -54,7 +57,8 @@
],
"required_plugins":[
"opensearch-ml"
]
],
"timeout": "10s"
},
"register_local_model": {
"inputs":[
Expand All @@ -73,7 +77,8 @@
],
"required_plugins":[
"opensearch-ml"
]
],
"timeout": "10s"
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved
},
"register_remote_model": {
"inputs": [
Expand All @@ -87,7 +92,8 @@
],
"required_plugins":[
"opensearch-ml"
]
],
"timeout": "10s"
},
"delete_model": {
"inputs": [
Expand All @@ -98,7 +104,8 @@
],
"required_plugins":[
"opensearch-ml"
]
],
"timeout": "10s"
},
"deploy_model": {
"inputs":[
Expand All @@ -109,7 +116,8 @@
],
"required_plugins":[
"opensearch-ml"
]
],
"timeout": "15s"
},
"undeploy_model": {
"inputs":[
Expand All @@ -120,7 +128,8 @@
],
"required_plugins":[
"opensearch-ml"
]
],
"timeout": "10s"
},
"register_model_group": {
"inputs":[
Expand All @@ -132,7 +141,8 @@
],
"required_plugins":[
"opensearch-ml"
]
],
"timeout": "10s"
},
"register_agent": {
"inputs":[
Expand All @@ -150,7 +160,8 @@
],
"required_plugins":[
"opensearch-ml"
]
],
"timeout": "10s"
},
"delete_agent": {
"inputs": [
Expand All @@ -161,7 +172,8 @@
],
"required_plugins":[
"opensearch-ml"
]
],
"timeout": "10s"
},
"create_tool": {
"inputs": [
Expand All @@ -172,6 +184,7 @@
],
"required_plugins":[
"opensearch-ml"
]
],
"timeout": "10s"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public void testNodeDetails() throws IOException {
ProcessNode node = workflow.get(0);
assertEquals("default_timeout", node.id());
assertEquals(CreateIngestPipelineStep.class, node.workflowStep().getClass());
assertEquals(15, node.nodeTimeout().seconds());
assertEquals(10, node.nodeTimeout().seconds());
node = workflow.get(1);
assertEquals("custom_timeout", node.id());
assertEquals(CreateIndexStep.class, node.workflowStep().getClass());
Expand Down Expand Up @@ -317,7 +317,7 @@ public void testSuccessfulGraphValidation() throws Exception {
workflowProcessSorter.validateGraph(sortedProcessNodes, validator);
}

public void testFailedGraphValidation() {
public void testFailedGraphValidation() throws IOException {

// Create Register Model workflow node with missing connector_id field
WorkflowNode registerModel = new WorkflowNode(
Expand Down
Loading