From 6353cf258375d9009766338e34020824a603443a Mon Sep 17 00:00:00 2001 From: Jackie Han Date: Fri, 22 Dec 2023 16:05:17 -0800 Subject: [PATCH 1/5] read timeout from workflow-steps Signed-off-by: Jackie Han --- .../flowframework/FlowFrameworkPlugin.java | 5 +++ .../model/WorkflowStepValidator.java | 24 ++++++++++-- .../workflow/WorkflowProcessSorter.java | 14 +++++-- .../resources/mappings/workflow-steps.json | 39 ++++++++++++------- 4 files changed, 62 insertions(+), 20 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java index 4ffd69342..38220fffc 100644 --- a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java +++ b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java @@ -9,6 +9,8 @@ package org.opensearch.flowframework; import com.google.common.collect.ImmutableList; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.opensearch.action.ActionRequest; import org.opensearch.client.Client; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; @@ -27,6 +29,7 @@ import org.opensearch.env.NodeEnvironment; import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting; import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; +import org.opensearch.flowframework.model.WorkflowValidator; import org.opensearch.flowframework.rest.RestCreateWorkflowAction; import org.opensearch.flowframework.rest.RestDeleteWorkflowAction; import org.opensearch.flowframework.rest.RestDeprovisionWorkflowAction; @@ -52,6 +55,7 @@ import org.opensearch.flowframework.transport.SearchWorkflowStateTransportAction; import org.opensearch.flowframework.transport.SearchWorkflowTransportAction; import org.opensearch.flowframework.util.EncryptorUtils; +import org.opensearch.flowframework.workflow.CreateIndexStep; import org.opensearch.flowframework.workflow.WorkflowProcessSorter; import org.opensearch.flowframework.workflow.WorkflowStepFactory; import org.opensearch.ml.client.MachineLearningNodeClient; @@ -66,6 +70,7 @@ import org.opensearch.threadpool.ThreadPool; import org.opensearch.watcher.ResourceWatcherService; +import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.function.Supplier; diff --git a/src/main/java/org/opensearch/flowframework/model/WorkflowStepValidator.java b/src/main/java/org/opensearch/flowframework/model/WorkflowStepValidator.java index c9689b975..4a5d17417 100644 --- a/src/main/java/org/opensearch/flowframework/model/WorkflowStepValidator.java +++ b/src/main/java/org/opensearch/flowframework/model/WorkflowStepValidator.java @@ -27,21 +27,25 @@ public class WorkflowStepValidator { private static final String OUTPUTS_FIELD = "outputs"; /** Required Plugins field name */ private static final String REQUIRED_PLUGINS = "required_plugins"; + /** Timeout field name */ + private static final String TIMEOUT = "timeout"; private List inputs; private List outputs; private List requiredPlugins; + private String timeout; /** - * Intantiate the object representing a Workflow Step validator + * Instantiate the object representing a Workflow Step validator * @param inputs the workflow step inputs * @param outputs the workflow step outputs * @param requiredPlugins the required plugins for this workflow step */ - public WorkflowStepValidator(List inputs, List outputs, List requiredPlugins) { + public WorkflowStepValidator(List inputs, List outputs, List requiredPlugins, String timeout) { this.inputs = inputs; this.outputs = outputs; this.requiredPlugins = requiredPlugins; + this.timeout = timeout; } /** @@ -54,6 +58,7 @@ public static WorkflowStepValidator parse(XContentParser parser) throws IOExcept List parsedInputs = new ArrayList<>(); List parsedOutputs = new ArrayList<>(); List requiredPlugins = new ArrayList<>(); + String timeout = null; ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); while (parser.nextToken() != XContentParser.Token.END_OBJECT) { @@ -78,11 +83,14 @@ public static WorkflowStepValidator parse(XContentParser parser) throws IOExcept requiredPlugins.add(parser.text()); } break; + case TIMEOUT: + timeout = parser.text(); + break; default: throw new IOException("Unable to parse field [" + fieldName + "] in a WorkflowStepValidator object."); } } - return new WorkflowStepValidator(parsedInputs, parsedOutputs, requiredPlugins); + return new WorkflowStepValidator(parsedInputs, parsedOutputs, requiredPlugins, timeout); } /** @@ -103,9 +111,17 @@ public List getOutputs() { /** * Get the required plugins - * @return the outputs + * @return the required plugins */ public List getRequiredPlugins() { return List.copyOf(requiredPlugins); } + + /** + * Get the timeout + * @return the timeout + */ + public String getTimeout() { + return timeout; + } } diff --git a/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java b/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java index 04f6349a4..f091e3d1a 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java +++ b/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java @@ -27,6 +27,7 @@ import org.opensearch.plugins.PluginInfo; import org.opensearch.threadpool.ThreadPool; +import java.io.IOException; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; @@ -35,6 +36,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Queue; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -141,7 +143,7 @@ public List sortProcessNodes(Workflow workflow, String workflowId) * @throws Exception if validation fails */ public void validate(List processNodes) throws Exception { - WorkflowValidator validator = WorkflowValidator.parse("mappings/workflow-steps.json"); + WorkflowValidator validator = readWorkflowValidator(); validatePluginsInstalled(processNodes, validator); validateGraph(processNodes, validator); } @@ -244,11 +246,17 @@ public void validateGraph(List processNodes, WorkflowValidator vali ); } } + } + private WorkflowValidator readWorkflowValidator() throws IOException { + return WorkflowValidator.parse("mappings/workflow-steps.json"); } - private TimeValue parseTimeout(WorkflowNode node) { - String timeoutValue = (String) node.userInputs().getOrDefault(NODE_TIMEOUT_FIELD, NODE_TIMEOUT_DEFAULT_VALUE); + private TimeValue parseTimeout(WorkflowNode node) throws IOException { + WorkflowValidator validator = readWorkflowValidator(); + String nodeTimeoutValue = Optional.ofNullable(validator.getWorkflowStepValidators().get(node.type()).getTimeout()) + .orElse(NODE_TIMEOUT_DEFAULT_VALUE); + String timeoutValue = (String) node.userInputs().getOrDefault(NODE_TIMEOUT_FIELD, nodeTimeoutValue); String fieldName = String.join(".", node.id(), USER_INPUTS_FIELD, NODE_TIMEOUT_FIELD); TimeValue timeValue = TimeValue.parseTimeValue(timeoutValue, fieldName); if (timeValue.millis() < 0) { diff --git a/src/main/resources/mappings/workflow-steps.json b/src/main/resources/mappings/workflow-steps.json index 989d3c749..0d2c5f260 100644 --- a/src/main/resources/mappings/workflow-steps.json +++ b/src/main/resources/mappings/workflow-steps.json @@ -12,7 +12,8 @@ "outputs":[ "index_name" ], - "required_plugins":[] + "required_plugins":[], + "timeout": "10s" }, "create_ingest_pipeline": { "inputs":[ @@ -26,7 +27,8 @@ "outputs":[ "pipeline_id" ], - "required_plugins":[] + "required_plugins":[], + "timeout": "10s" }, "create_connector": { "inputs":[ @@ -43,7 +45,8 @@ ], "required_plugins":[ "opensearch-ml" - ] + ], + "timeout": "10s" }, "delete_connector": { "inputs": [ @@ -54,7 +57,8 @@ ], "required_plugins":[ "opensearch-ml" - ] + ], + "timeout": "10s" }, "register_local_model": { "inputs":[ @@ -73,7 +77,8 @@ ], "required_plugins":[ "opensearch-ml" - ] + ], + "timeout": "10s" }, "register_remote_model": { "inputs": [ @@ -87,7 +92,8 @@ ], "required_plugins":[ "opensearch-ml" - ] + ], + "timeout": "10s" }, "delete_model": { "inputs": [ @@ -98,7 +104,8 @@ ], "required_plugins":[ "opensearch-ml" - ] + ], + "timeout": "10s" }, "deploy_model": { "inputs":[ @@ -109,7 +116,8 @@ ], "required_plugins":[ "opensearch-ml" - ] + ], + "timeout": "15s" }, "undeploy_model": { "inputs":[ @@ -120,7 +128,8 @@ ], "required_plugins":[ "opensearch-ml" - ] + ], + "timeout": "10s" }, "register_model_group": { "inputs":[ @@ -132,7 +141,8 @@ ], "required_plugins":[ "opensearch-ml" - ] + ], + "timeout": "10s" }, "register_agent": { "inputs":[ @@ -150,7 +160,8 @@ ], "required_plugins":[ "opensearch-ml" - ] + ], + "timeout": "10s" }, "delete_agent": { "inputs": [ @@ -161,7 +172,8 @@ ], "required_plugins":[ "opensearch-ml" - ] + ], + "timeout": "10s" }, "create_tool": { "inputs": [ @@ -172,6 +184,7 @@ ], "required_plugins":[ "opensearch-ml" - ] + ], + "timeout": "10s" } } From 47c475eebcbf2b5b8274f06d02a7515e3ed191bb Mon Sep 17 00:00:00 2001 From: Jackie Han Date: Fri, 22 Dec 2023 16:34:30 -0800 Subject: [PATCH 2/5] change indices name Signed-off-by: Jackie Han --- .../flowframework/FlowFrameworkPlugin.java | 5 --- .../flowframework/common/CommonValue.java | 4 +-- .../model/WorkflowStepValidator.java | 1 + .../workflow/WorkflowProcessSorter.java | 36 +++++++++++-------- .../workflow/WorkflowProcessSorterTests.java | 4 +-- 5 files changed, 27 insertions(+), 23 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java index 38220fffc..4ffd69342 100644 --- a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java +++ b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java @@ -9,8 +9,6 @@ package org.opensearch.flowframework; import com.google.common.collect.ImmutableList; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.opensearch.action.ActionRequest; import org.opensearch.client.Client; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; @@ -29,7 +27,6 @@ import org.opensearch.env.NodeEnvironment; import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting; import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; -import org.opensearch.flowframework.model.WorkflowValidator; import org.opensearch.flowframework.rest.RestCreateWorkflowAction; import org.opensearch.flowframework.rest.RestDeleteWorkflowAction; import org.opensearch.flowframework.rest.RestDeprovisionWorkflowAction; @@ -55,7 +52,6 @@ import org.opensearch.flowframework.transport.SearchWorkflowStateTransportAction; import org.opensearch.flowframework.transport.SearchWorkflowTransportAction; import org.opensearch.flowframework.util.EncryptorUtils; -import org.opensearch.flowframework.workflow.CreateIndexStep; import org.opensearch.flowframework.workflow.WorkflowProcessSorter; import org.opensearch.flowframework.workflow.WorkflowStepFactory; import org.opensearch.ml.client.MachineLearningNodeClient; @@ -70,7 +66,6 @@ import org.opensearch.threadpool.ThreadPool; import org.opensearch.watcher.ResourceWatcherService; -import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.function.Supplier; diff --git a/src/main/java/org/opensearch/flowframework/common/CommonValue.java b/src/main/java/org/opensearch/flowframework/common/CommonValue.java index bb9eaf108..1e33bda0e 100644 --- a/src/main/java/org/opensearch/flowframework/common/CommonValue.java +++ b/src/main/java/org/opensearch/flowframework/common/CommonValue.java @@ -22,13 +22,13 @@ private CommonValue() {} /** Schema Version field name */ public static final String SCHEMA_VERSION_FIELD = "schema_version"; /** Global Context Index Name */ - public static final String GLOBAL_CONTEXT_INDEX = ".plugins-ai-global-context"; + public static final String GLOBAL_CONTEXT_INDEX = ".plugins-flow-framework-templates"; /** Global Context index mapping file path */ public static final String GLOBAL_CONTEXT_INDEX_MAPPING = "mappings/global-context.json"; /** Global Context index mapping version */ public static final Integer GLOBAL_CONTEXT_INDEX_VERSION = 1; /** Workflow State Index Name */ - public static final String WORKFLOW_STATE_INDEX = ".plugins-workflow-state"; + public static final String WORKFLOW_STATE_INDEX = ".plugins-flow-framework-state"; /** Workflow State index mapping file path */ public static final String WORKFLOW_STATE_INDEX_MAPPING = "mappings/workflow-state.json"; /** Workflow State index mapping version */ diff --git a/src/main/java/org/opensearch/flowframework/model/WorkflowStepValidator.java b/src/main/java/org/opensearch/flowframework/model/WorkflowStepValidator.java index 4a5d17417..fb086000c 100644 --- a/src/main/java/org/opensearch/flowframework/model/WorkflowStepValidator.java +++ b/src/main/java/org/opensearch/flowframework/model/WorkflowStepValidator.java @@ -40,6 +40,7 @@ public class WorkflowStepValidator { * @param inputs the workflow step inputs * @param outputs the workflow step outputs * @param requiredPlugins the required plugins for this workflow step + * @param timeout the timeout for this workflow step */ public WorkflowStepValidator(List inputs, List outputs, List requiredPlugins, String timeout) { this.inputs = inputs; diff --git a/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java b/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java index f091e3d1a..e52a88822 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java +++ b/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java @@ -120,20 +120,28 @@ public List sortProcessNodes(Workflow workflow, String workflowId) .map(e -> idToNodeMap.get(e.source())) .collect(Collectors.toList()); - TimeValue nodeTimeout = parseTimeout(node); - ProcessNode processNode = new ProcessNode( - node.id(), - step, - node.previousNodeInputs(), - data, - predecessorNodes, - threadPool, - nodeTimeout - ); - idToNodeMap.put(processNode.id(), processNode); - nodes.add(processNode); - } + try { + TimeValue nodeTimeout = parseTimeout(node); + ProcessNode processNode = new ProcessNode( + node.id(), + step, + node.previousNodeInputs(), + data, + predecessorNodes, + threadPool, + nodeTimeout + ); + idToNodeMap.put(processNode.id(), processNode); + nodes.add(processNode); + } catch (IOException e) { + logger.error("Failed to read workflow-steps mapping file", e); + throw new FlowFrameworkException( + "Workflow " + workflowId + " failed at reading workflow-steps mapping file", + RestStatus.INTERNAL_SERVER_ERROR + ); + } + } return nodes; } @@ -255,7 +263,7 @@ private WorkflowValidator readWorkflowValidator() throws IOException { private TimeValue parseTimeout(WorkflowNode node) throws IOException { WorkflowValidator validator = readWorkflowValidator(); String nodeTimeoutValue = Optional.ofNullable(validator.getWorkflowStepValidators().get(node.type()).getTimeout()) - .orElse(NODE_TIMEOUT_DEFAULT_VALUE); + .orElse(NODE_TIMEOUT_DEFAULT_VALUE); String timeoutValue = (String) node.userInputs().getOrDefault(NODE_TIMEOUT_FIELD, nodeTimeoutValue); String fieldName = String.join(".", node.id(), USER_INPUTS_FIELD, NODE_TIMEOUT_FIELD); TimeValue timeValue = TimeValue.parseTimeValue(timeoutValue, fieldName); diff --git a/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java b/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java index 2da63ef3a..36ffeafaf 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java @@ -136,7 +136,7 @@ public void testNodeDetails() throws IOException { ProcessNode node = workflow.get(0); assertEquals("default_timeout", node.id()); assertEquals(CreateIngestPipelineStep.class, node.workflowStep().getClass()); - assertEquals(15, node.nodeTimeout().seconds()); + assertEquals(10, node.nodeTimeout().seconds()); node = workflow.get(1); assertEquals("custom_timeout", node.id()); assertEquals(CreateIndexStep.class, node.workflowStep().getClass()); @@ -317,7 +317,7 @@ public void testSuccessfulGraphValidation() throws Exception { workflowProcessSorter.validateGraph(sortedProcessNodes, validator); } - public void testFailedGraphValidation() { + public void testFailedGraphValidation() throws IOException { // Create Register Model workflow node with missing connector_id field WorkflowNode registerModel = new WorkflowNode( From 56a0bd84c658648a62c8f644a3e26778fcb3f2b7 Mon Sep 17 00:00:00 2001 From: Jackie Han Date: Tue, 26 Dec 2023 15:22:27 -0800 Subject: [PATCH 3/5] address comments Signed-off-by: Jackie Han --- .../flowframework/model/WorkflowNode.java | 2 +- .../workflow/WorkflowProcessSorter.java | 60 ++++++++++--------- .../resources/mappings/workflow-steps.json | 35 ++++------- .../workflow/WorkflowProcessSorterTests.java | 43 +++++++++++++ 4 files changed, 89 insertions(+), 51 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/model/WorkflowNode.java b/src/main/java/org/opensearch/flowframework/model/WorkflowNode.java index 6d166c68b..e8e33ce81 100644 --- a/src/main/java/org/opensearch/flowframework/model/WorkflowNode.java +++ b/src/main/java/org/opensearch/flowframework/model/WorkflowNode.java @@ -50,7 +50,7 @@ public class WorkflowNode implements ToXContentObject { /** The field defining the timeout value for this node */ public static final String NODE_TIMEOUT_FIELD = "node_timeout"; /** The default timeout value if the template doesn't override it */ - public static final String NODE_TIMEOUT_DEFAULT_VALUE = "15s"; + public static final String NODE_TIMEOUT_DEFAULT_VALUE = "10s"; private final String id; // unique id private final String type; // maps to a WorkflowStep diff --git a/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java b/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java index e52a88822..d105c4cb9 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java +++ b/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java @@ -91,7 +91,7 @@ public WorkflowProcessSorter( * @param workflowId The workflowId associated with the step * @return A list of Process Nodes sorted topologically. All predecessors of any node will occur prior to it in the list. */ - public List sortProcessNodes(Workflow workflow, String workflowId) { + public List sortProcessNodes(Workflow workflow, String workflowId) throws IOException { if (workflow.nodes().size() > this.maxWorkflowSteps) { throw new FlowFrameworkException( "Workflow " @@ -120,27 +120,18 @@ public List sortProcessNodes(Workflow workflow, String workflowId) .map(e -> idToNodeMap.get(e.source())) .collect(Collectors.toList()); - try { - TimeValue nodeTimeout = parseTimeout(node); - ProcessNode processNode = new ProcessNode( - node.id(), - step, - node.previousNodeInputs(), - data, - predecessorNodes, - threadPool, - nodeTimeout - ); - idToNodeMap.put(processNode.id(), processNode); - nodes.add(processNode); - - } catch (IOException e) { - logger.error("Failed to read workflow-steps mapping file", e); - throw new FlowFrameworkException( - "Workflow " + workflowId + " failed at reading workflow-steps mapping file", - RestStatus.INTERNAL_SERVER_ERROR - ); - } + TimeValue nodeTimeout = parseTimeout(node); + ProcessNode processNode = new ProcessNode( + node.id(), + step, + node.previousNodeInputs(), + data, + predecessorNodes, + threadPool, + nodeTimeout + ); + idToNodeMap.put(processNode.id(), processNode); + nodes.add(processNode); } return nodes; } @@ -151,7 +142,7 @@ public List sortProcessNodes(Workflow workflow, String workflowId) * @throws Exception if validation fails */ public void validate(List processNodes) throws Exception { - WorkflowValidator validator = readWorkflowValidator(); + WorkflowValidator validator = readWorkflowValidator(processNodes.get(0).id()); validatePluginsInstalled(processNodes, validator); validateGraph(processNodes, validator); } @@ -256,12 +247,27 @@ public void validateGraph(List processNodes, WorkflowValidator vali } } - private WorkflowValidator readWorkflowValidator() throws IOException { - return WorkflowValidator.parse("mappings/workflow-steps.json"); + private WorkflowValidator readWorkflowValidator(String workflowId) { + try { + return WorkflowValidator.parse("mappings/workflow-steps.json"); + } catch (Exception e) { + logger.error("Failed to read workflow-steps mapping file", e); + throw new FlowFrameworkException( + "Workflow " + workflowId + " failed at reading workflow-steps mapping file", + RestStatus.INTERNAL_SERVER_ERROR + ); + } } - private TimeValue parseTimeout(WorkflowNode node) throws IOException { - WorkflowValidator validator = readWorkflowValidator(); + /** + * A method for parsing workflow timeout value. + * The value could be parsed from node NODE_TIMEOUT_FIELD, the timeout field in workflow-step.json, + * or the default NODE_TIMEOUT_DEFAULT_VALUE + * @param node the workflow nde + * @return the timeout value + */ + protected TimeValue parseTimeout(WorkflowNode node) { + WorkflowValidator validator = readWorkflowValidator(node.id()); String nodeTimeoutValue = Optional.ofNullable(validator.getWorkflowStepValidators().get(node.type()).getTimeout()) .orElse(NODE_TIMEOUT_DEFAULT_VALUE); String timeoutValue = (String) node.userInputs().getOrDefault(NODE_TIMEOUT_FIELD, nodeTimeoutValue); diff --git a/src/main/resources/mappings/workflow-steps.json b/src/main/resources/mappings/workflow-steps.json index 0d2c5f260..ad384b6a8 100644 --- a/src/main/resources/mappings/workflow-steps.json +++ b/src/main/resources/mappings/workflow-steps.json @@ -12,8 +12,7 @@ "outputs":[ "index_name" ], - "required_plugins":[], - "timeout": "10s" + "required_plugins":[] }, "create_ingest_pipeline": { "inputs":[ @@ -27,8 +26,7 @@ "outputs":[ "pipeline_id" ], - "required_plugins":[], - "timeout": "10s" + "required_plugins":[] }, "create_connector": { "inputs":[ @@ -45,8 +43,7 @@ ], "required_plugins":[ "opensearch-ml" - ], - "timeout": "10s" + ] }, "delete_connector": { "inputs": [ @@ -57,8 +54,7 @@ ], "required_plugins":[ "opensearch-ml" - ], - "timeout": "10s" + ] }, "register_local_model": { "inputs":[ @@ -78,7 +74,7 @@ "required_plugins":[ "opensearch-ml" ], - "timeout": "10s" + "timeout": "60s" }, "register_remote_model": { "inputs": [ @@ -92,8 +88,7 @@ ], "required_plugins":[ "opensearch-ml" - ], - "timeout": "10s" + ] }, "delete_model": { "inputs": [ @@ -104,8 +99,7 @@ ], "required_plugins":[ "opensearch-ml" - ], - "timeout": "10s" + ] }, "deploy_model": { "inputs":[ @@ -128,8 +122,7 @@ ], "required_plugins":[ "opensearch-ml" - ], - "timeout": "10s" + ] }, "register_model_group": { "inputs":[ @@ -141,8 +134,7 @@ ], "required_plugins":[ "opensearch-ml" - ], - "timeout": "10s" + ] }, "register_agent": { "inputs":[ @@ -160,8 +152,7 @@ ], "required_plugins":[ "opensearch-ml" - ], - "timeout": "10s" + ] }, "delete_agent": { "inputs": [ @@ -172,8 +163,7 @@ ], "required_plugins":[ "opensearch-ml" - ], - "timeout": "10s" + ] }, "create_tool": { "inputs": [ @@ -184,7 +174,6 @@ ], "required_plugins":[ "opensearch-ml" - ], - "timeout": "10s" + ] } } diff --git a/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java b/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java index 36ffeafaf..3eeaf695d 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java @@ -21,6 +21,7 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.core.action.ActionListener; import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.XContentParser; @@ -509,4 +510,46 @@ public void testFailedInstalledPluginValidation() throws Exception { exception.getMessage() ); } + + public void testReadWorkflowStepFile_withDefaultTimeout() throws IOException { + // read timeout from node NODE_TIMEOUT_FIELD + WorkflowNode createConnector = new WorkflowNode( + "workflow_step_1", + CreateConnectorStep.NAME, + Map.of(), + Map.ofEntries( + Map.entry("name", ""), + Map.entry("description", ""), + Map.entry("version", ""), + Map.entry("protocol", ""), + Map.entry("parameters", ""), + Map.entry("credential", ""), + Map.entry("actions", ""), + Map.entry("node_timeout", "50s") + ) + ); + TimeValue createConnectorTimeout = workflowProcessSorter.parseTimeout(createConnector); + assertEquals(createConnectorTimeout.getSeconds(), 50); + + // read timeout from workflow-step.json overwrite value + WorkflowNode deployModel = new WorkflowNode( + "workflow_step_3", + DeployModelStep.NAME, + Map.ofEntries(Map.entry("workflow_step_2", "model_id")), + Map.of() + ); + TimeValue deployModelTimeout = workflowProcessSorter.parseTimeout(deployModel); + assertEquals(deployModelTimeout.getSeconds(), 15); + + // read timeout from NODE_TIMEOUT_DEFAULT_VALUE when there's no node NODE_TIMEOUT_FIELD + // and no overwrite timeout value in workflow-step.json + WorkflowNode registerModel = new WorkflowNode( + "workflow_step_2", + RegisterRemoteModelStep.NAME, + Map.ofEntries(Map.entry("workflow_step_1", "connector_id")), + Map.ofEntries(Map.entry("name", "name"), Map.entry("function_name", "remote"), Map.entry("description", "description")) + ); + TimeValue registerRemoteModelTimeout = workflowProcessSorter.parseTimeout(registerModel); + assertEquals(registerRemoteModelTimeout.getSeconds(), 10); + } } From 3ab24386d414e24d94b3e49d0ec7cdd251fc7b83 Mon Sep 17 00:00:00 2001 From: Jackie Han Date: Wed, 27 Dec 2023 14:41:04 -0800 Subject: [PATCH 4/5] address comment - parse timeout into TimeValue Signed-off-by: Jackie Han --- .../flowframework/model/WorkflowNode.java | 4 ++- .../model/WorkflowStepValidator.java | 25 +++++++++++++++---- .../workflow/WorkflowProcessSorter.java | 15 +++++------ 3 files changed, 31 insertions(+), 13 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/model/WorkflowNode.java b/src/main/java/org/opensearch/flowframework/model/WorkflowNode.java index e8e33ce81..de9b4ba47 100644 --- a/src/main/java/org/opensearch/flowframework/model/WorkflowNode.java +++ b/src/main/java/org/opensearch/flowframework/model/WorkflowNode.java @@ -8,6 +8,7 @@ */ package org.opensearch.flowframework.model; +import org.opensearch.common.unit.TimeValue; import org.opensearch.core.xcontent.ToXContentObject; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; @@ -23,6 +24,7 @@ import java.util.Map.Entry; import java.util.Objects; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; import static org.opensearch.flowframework.common.CommonValue.TOOLS_ORDER_FIELD; import static org.opensearch.flowframework.util.ParseUtils.buildStringToObjectMap; @@ -50,7 +52,7 @@ public class WorkflowNode implements ToXContentObject { /** The field defining the timeout value for this node */ public static final String NODE_TIMEOUT_FIELD = "node_timeout"; /** The default timeout value if the template doesn't override it */ - public static final String NODE_TIMEOUT_DEFAULT_VALUE = "10s"; + public static final TimeValue NODE_TIMEOUT_DEFAULT_VALUE = new TimeValue(10, SECONDS); private final String id; // unique id private final String type; // maps to a WorkflowStep diff --git a/src/main/java/org/opensearch/flowframework/model/WorkflowStepValidator.java b/src/main/java/org/opensearch/flowframework/model/WorkflowStepValidator.java index fb086000c..001ccd816 100644 --- a/src/main/java/org/opensearch/flowframework/model/WorkflowStepValidator.java +++ b/src/main/java/org/opensearch/flowframework/model/WorkflowStepValidator.java @@ -8,7 +8,12 @@ */ package org.opensearch.flowframework.model; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.flowframework.exception.FlowFrameworkException; import java.io.IOException; import java.util.ArrayList; @@ -21,6 +26,8 @@ */ public class WorkflowStepValidator { + private static final Logger logger = LogManager.getLogger(WorkflowStepValidator.class); + /** Inputs field name */ private static final String INPUTS_FIELD = "inputs"; /** Outputs field name */ @@ -33,7 +40,7 @@ public class WorkflowStepValidator { private List inputs; private List outputs; private List requiredPlugins; - private String timeout; + private TimeValue timeout; /** * Instantiate the object representing a Workflow Step validator @@ -42,7 +49,7 @@ public class WorkflowStepValidator { * @param requiredPlugins the required plugins for this workflow step * @param timeout the timeout for this workflow step */ - public WorkflowStepValidator(List inputs, List outputs, List requiredPlugins, String timeout) { + public WorkflowStepValidator(List inputs, List outputs, List requiredPlugins, TimeValue timeout) { this.inputs = inputs; this.outputs = outputs; this.requiredPlugins = requiredPlugins; @@ -59,7 +66,7 @@ public static WorkflowStepValidator parse(XContentParser parser) throws IOExcept List parsedInputs = new ArrayList<>(); List parsedOutputs = new ArrayList<>(); List requiredPlugins = new ArrayList<>(); - String timeout = null; + TimeValue timeout = null; ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); while (parser.nextToken() != XContentParser.Token.END_OBJECT) { @@ -85,7 +92,15 @@ public static WorkflowStepValidator parse(XContentParser parser) throws IOExcept } break; case TIMEOUT: - timeout = parser.text(); + try { + timeout = TimeValue.parseTimeValue(parser.text(), TIMEOUT); + } catch (IllegalArgumentException e) { + logger.error("Failed to parse TIMEOUT value for field [" + fieldName + "]", e); + throw new FlowFrameworkException( + "Failed to parse workflow-step.json file for field [" + fieldName + "]", + RestStatus.INTERNAL_SERVER_ERROR + ); + } break; default: throw new IOException("Unable to parse field [" + fieldName + "] in a WorkflowStepValidator object."); @@ -122,7 +137,7 @@ public List getRequiredPlugins() { * Get the timeout * @return the timeout */ - public String getTimeout() { + public TimeValue getTimeout() { return timeout; } } diff --git a/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java b/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java index d105c4cb9..64b6d2e68 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java +++ b/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java @@ -27,7 +27,6 @@ import org.opensearch.plugins.PluginInfo; import org.opensearch.threadpool.ThreadPool; -import java.io.IOException; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; @@ -91,7 +90,7 @@ public WorkflowProcessSorter( * @param workflowId The workflowId associated with the step * @return A list of Process Nodes sorted topologically. All predecessors of any node will occur prior to it in the list. */ - public List sortProcessNodes(Workflow workflow, String workflowId) throws IOException { + public List sortProcessNodes(Workflow workflow, String workflowId) { if (workflow.nodes().size() > this.maxWorkflowSteps) { throw new FlowFrameworkException( "Workflow " @@ -268,18 +267,20 @@ private WorkflowValidator readWorkflowValidator(String workflowId) { */ protected TimeValue parseTimeout(WorkflowNode node) { WorkflowValidator validator = readWorkflowValidator(node.id()); - String nodeTimeoutValue = Optional.ofNullable(validator.getWorkflowStepValidators().get(node.type()).getTimeout()) + TimeValue nodeTimeoutValue = Optional.ofNullable(validator.getWorkflowStepValidators().get(node.type()).getTimeout()) .orElse(NODE_TIMEOUT_DEFAULT_VALUE); - String timeoutValue = (String) node.userInputs().getOrDefault(NODE_TIMEOUT_FIELD, nodeTimeoutValue); + String nodeTimeoutAsString = nodeTimeoutValue.getSeconds() + "s"; + String timeoutValue = (String) node.userInputs().getOrDefault(NODE_TIMEOUT_FIELD, nodeTimeoutAsString); String fieldName = String.join(".", node.id(), USER_INPUTS_FIELD, NODE_TIMEOUT_FIELD); - TimeValue timeValue = TimeValue.parseTimeValue(timeoutValue, fieldName); - if (timeValue.millis() < 0) { + TimeValue userInputTimeValue = TimeValue.parseTimeValue(timeoutValue, fieldName); + + if (userInputTimeValue.millis() < 0) { throw new FlowFrameworkException( "Failed to parse timeout value [" + timeoutValue + "] for field [" + fieldName + "]. Must be positive", RestStatus.BAD_REQUEST ); } - return timeValue; + return userInputTimeValue; } private static List topologicalSort(List workflowNodes, List workflowEdges) { From 5f0b865a87707e2c5fc6759a104f0cfadbe0501d Mon Sep 17 00:00:00 2001 From: Jackie Han Date: Wed, 27 Dec 2023 15:40:30 -0800 Subject: [PATCH 5/5] address minor comments Signed-off-by: Jackie Han --- .../flowframework/model/WorkflowStepValidator.java | 2 +- .../workflow/WorkflowProcessSorter.java | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/model/WorkflowStepValidator.java b/src/main/java/org/opensearch/flowframework/model/WorkflowStepValidator.java index 001ccd816..cc0da5b6c 100644 --- a/src/main/java/org/opensearch/flowframework/model/WorkflowStepValidator.java +++ b/src/main/java/org/opensearch/flowframework/model/WorkflowStepValidator.java @@ -95,7 +95,7 @@ public static WorkflowStepValidator parse(XContentParser parser) throws IOExcept try { timeout = TimeValue.parseTimeValue(parser.text(), TIMEOUT); } catch (IllegalArgumentException e) { - logger.error("Failed to parse TIMEOUT value for field [" + fieldName + "]", e); + logger.error("Failed to parse TIMEOUT value for field [{}]", fieldName, e); throw new FlowFrameworkException( "Failed to parse workflow-step.json file for field [" + fieldName + "]", RestStatus.INTERNAL_SERVER_ERROR diff --git a/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java b/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java index 64b6d2e68..bb1334470 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java +++ b/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java @@ -141,7 +141,7 @@ public List sortProcessNodes(Workflow workflow, String workflowId) * @throws Exception if validation fails */ public void validate(List processNodes) throws Exception { - WorkflowValidator validator = readWorkflowValidator(processNodes.get(0).id()); + WorkflowValidator validator = readWorkflowValidator(); validatePluginsInstalled(processNodes, validator); validateGraph(processNodes, validator); } @@ -246,13 +246,13 @@ public void validateGraph(List processNodes, WorkflowValidator vali } } - private WorkflowValidator readWorkflowValidator(String workflowId) { + private WorkflowValidator readWorkflowValidator() { try { return WorkflowValidator.parse("mappings/workflow-steps.json"); } catch (Exception e) { - logger.error("Failed to read workflow-steps mapping file", e); + logger.error("Failed at reading workflow-steps mapping file", e); throw new FlowFrameworkException( - "Workflow " + workflowId + " failed at reading workflow-steps mapping file", + "Failed at reading workflow-steps.json mapping file for a new workflow.", RestStatus.INTERNAL_SERVER_ERROR ); } @@ -262,11 +262,11 @@ private WorkflowValidator readWorkflowValidator(String workflowId) { * A method for parsing workflow timeout value. * The value could be parsed from node NODE_TIMEOUT_FIELD, the timeout field in workflow-step.json, * or the default NODE_TIMEOUT_DEFAULT_VALUE - * @param node the workflow nde + * @param node the workflow node * @return the timeout value */ protected TimeValue parseTimeout(WorkflowNode node) { - WorkflowValidator validator = readWorkflowValidator(node.id()); + WorkflowValidator validator = readWorkflowValidator(); TimeValue nodeTimeoutValue = Optional.ofNullable(validator.getWorkflowStepValidators().get(node.type()).getTimeout()) .orElse(NODE_TIMEOUT_DEFAULT_VALUE); String nodeTimeoutAsString = nodeTimeoutValue.getSeconds() + "s";