From 62797d267f54142eb62ddc33a6a8bb97edd17095 Mon Sep 17 00:00:00 2001 From: Jared O'Connell <46976761+jaredoconnell@users.noreply.github.com> Date: Fri, 20 Sep 2024 17:08:03 -0400 Subject: [PATCH] Added optional expression tags (#212) * Added optional expression tags * Fix linter error * Improved comment for a test * Addressed review comments --- go.mod | 6 +- go.sum | 6 +- internal/infer/infer.go | 9 +- internal/infer/optional_expression.go | 13 + workflow/any.go | 2 +- workflow/executor.go | 71 ++++- workflow/model.go | 7 +- workflow/workflow.go | 32 ++- workflow/workflow_test.go | 399 ++++++++++++++++++++++++++ workflow/yaml.go | 48 +++- workflow/yaml_test.go | 50 +++- 11 files changed, 600 insertions(+), 43 deletions(-) create mode 100644 internal/infer/optional_expression.go diff --git a/go.mod b/go.mod index f5577f2..4f012c7 100644 --- a/go.mod +++ b/go.mod @@ -1,10 +1,12 @@ module go.flow.arcalot.io/engine -go 1.21 +go 1.22.0 + +toolchain go1.22.2 require ( go.arcalot.io/assert v1.8.0 - go.arcalot.io/dgraph v1.6.0 + go.arcalot.io/dgraph v1.7.0 go.arcalot.io/lang v1.1.0 go.arcalot.io/log/v2 v2.2.0 go.flow.arcalot.io/deployer v0.6.1 diff --git a/go.sum b/go.sum index 6d17e09..2ef708c 100644 --- a/go.sum +++ b/go.sum @@ -123,8 +123,8 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.arcalot.io/assert v1.8.0 h1:hGcHMPncQXwQvjj7MbyOu2gg8VIBB00crUJZpeQOjxs= go.arcalot.io/assert v1.8.0/go.mod h1:nNmWPoNUHFyrPkNrD2aASm5yPuAfiWdB/4X7Lw3ykHk= -go.arcalot.io/dgraph v1.6.0 h1:mJFZ1vdPEg3KtqyhNqYtWVAkxxWBWoJVUFZQ2Z4mbvE= -go.arcalot.io/dgraph v1.6.0/go.mod h1:+Kxc81utiihMSmC1/ttSPGLDlWPpvgOpNxSFmIDPxFM= +go.arcalot.io/dgraph v1.7.0 h1:KmVoPoV7jKbc7DgBS7Bmh/gliEXw4S4kyeVpPvyIygs= +go.arcalot.io/dgraph v1.7.0/go.mod h1:P8mMBGCZbIMVe08iw0afFBMl1QM3aJk+MU//Q2Z5rJc= go.arcalot.io/exex v0.2.0 h1:u44pjwPwcH57TF8knhaqVZP/1V/KbnRe//pKzMwDpLw= go.arcalot.io/exex v0.2.0/go.mod h1:5zlFr+7vOQNZKYCNOEDdsad+z/dlvXKs2v4kG+v+bQo= go.arcalot.io/lang v1.1.0 h1:ugglRKpd3qIMkdghAjKJxsziIgHm8QpxrzZPSXoa08I= @@ -135,8 +135,6 @@ go.flow.arcalot.io/deployer v0.6.1 h1:Q65VHeRZzdrMJZqTRb26EQZQbK+C3pORETVlpw02xW go.flow.arcalot.io/deployer v0.6.1/go.mod h1:Oh+71KYQEof6IS3UGhpMyjQQPRcuomUccn7fwAqrPxE= go.flow.arcalot.io/dockerdeployer v0.7.3 h1:CLvSdqfoE8oZADI0wfry46SXR4CQjB6Qh+6Ym70zheQ= go.flow.arcalot.io/dockerdeployer v0.7.3/go.mod h1:YWw9+GbYJxEnlahlYCx4UOJe+QNkecf8+EBtSIQD0aE= -go.flow.arcalot.io/expressions v0.4.3 h1:0BRRghutHp0sctsITHe/A1le0yYiJtKNTxm27T+P6Og= -go.flow.arcalot.io/expressions v0.4.3/go.mod h1:UORX78N4ep71wOzNXdIo/UY+6SdDD0id0mvuRNEQMeM= go.flow.arcalot.io/expressions v0.4.4 h1:bYTC7YDmgDWcsdyY41+IvTJbvsM1rdE3ZBJhB+jNPHQ= go.flow.arcalot.io/expressions v0.4.4/go.mod h1:0Y2LgynO1SWA4bqsnKlCxqLME9zOR8tWKg3g+RG+FFQ= go.flow.arcalot.io/kubernetesdeployer v0.9.3 h1:XKiqmCqXb6ZLwP5IQTAKS/gJHpq0Ub/yEjCfgAwQF2A= diff --git a/internal/infer/infer.go b/internal/infer/infer.go index 6c4b54b..39bd221 100644 --- a/internal/infer/infer.go +++ b/internal/infer/infer.go @@ -82,6 +82,8 @@ func Type( return nil, fmt.Errorf("failed to evaluate type of expression %s (%w)", expr.String(), err) } return oneOfType, nil + case *OptionalExpression: + return Type(expr.Expr, internalDataModel, functions, workflowContext) } v := reflect.ValueOf(data) @@ -198,14 +200,17 @@ func objectType( ) (schema.Type, error) { properties := make(map[string]*schema.PropertySchema, value.Len()) for _, keyValue := range value.MapKeys() { - propertyType, err := Type(value.MapIndex(keyValue).Interface(), internalDataModel, functions, workflowContext) + inferredValue := value.MapIndex(keyValue).Interface() + propertyType, err := Type(inferredValue, internalDataModel, functions, workflowContext) if err != nil { return nil, fmt.Errorf("failed to infer property %s type (%w)", keyValue.Interface(), err) } + _, isOptionalExpr := inferredValue.(*OptionalExpression) + properties[keyValue.Interface().(string)] = schema.NewPropertySchema( propertyType, nil, - true, + !isOptionalExpr, nil, nil, nil, diff --git a/internal/infer/optional_expression.go b/internal/infer/optional_expression.go new file mode 100644 index 0000000..845ebd3 --- /dev/null +++ b/internal/infer/optional_expression.go @@ -0,0 +1,13 @@ +package infer + +import ( + "go.flow.arcalot.io/expressions" +) + +// OptionalExpression is an expression that can be used in an object as an optional field. +type OptionalExpression struct { + Expr expressions.Expression + WaitForCompletion bool + GroupNodePath string + ParentNodePath string +} diff --git a/workflow/any.go b/workflow/any.go index 742df22..e744fe6 100644 --- a/workflow/any.go +++ b/workflow/any.go @@ -45,7 +45,7 @@ func (a *anySchemaWithExpressions) Serialize(data any) (any, error) { func (a *anySchemaWithExpressions) checkAndConvert(data any) (any, error) { switch data.(type) { - case expressions.Expression, infer.OneOfExpression, *infer.OneOfExpression: + case expressions.Expression, infer.OneOfExpression, *infer.OneOfExpression, *infer.OptionalExpression: return data, nil } t := reflect.ValueOf(data) diff --git a/workflow/executor.go b/workflow/executor.go index bf22e74..e0c4d6b 100644 --- a/workflow/executor.go +++ b/workflow/executor.go @@ -577,13 +577,13 @@ func (e *executor) createTypeStructure(rootSchema schema.Scope, inputField any, return inputField.Type(rootSchema, e.callableFunctionSchemas, workflowContext) case *infer.OneOfExpression: return inputField.Type(rootSchema, e.callableFunctionSchemas, workflowContext) + case *infer.OptionalExpression: + return e.createTypeStructure(rootSchema, inputField.Expr, workflowContext) } v := reflect.ValueOf(inputField) switch v.Kind() { case reflect.Slice: - // Okay. Construct the list of schemas, and pass it into the - result := make([]any, v.Len()) for i := 0; i < v.Len(); i++ { value := v.Index(i).Interface() @@ -917,6 +917,8 @@ func (e *executor) prepareDependencies( return e.prepareExprDependencies(s, workflowContext, currentNode, outputSchema, dag) case *infer.OneOfExpression: return e.prepareOneOfExprDependencies(s, workflowContext, currentNode, pathInCurrentNode, outputSchema, dag) + case *infer.OptionalExpression: + return e.prepareOptionalExprDependencies(s, workflowContext, currentNode, pathInCurrentNode, outputSchema, dag) default: return &ErrInvalidWorkflow{fmt.Errorf("unsupported struct/pointer type in workflow input: %T", stepData)} } @@ -1006,6 +1008,55 @@ func (e *executor) prepareExprDependencies( return nil } +func (e *executor) createGroupNode( + currentNode dgraph.Node[*DAGItem], + pathInCurrentNode []string, + dag dgraph.DirectedGraph[*DAGItem], + dependencyType dgraph.DependencyType, +) (dgraph.Node[*DAGItem], error) { + groupNodeType := &DAGItem{ + Kind: DagItemKindDependencyGroup, + } + groupedDagNode, err := dag.AddNode( + currentNode.ID()+"."+strings.Join(pathInCurrentNode, "."), groupNodeType) + if err != nil { + return nil, err + } + err = currentNode.ConnectDependency(groupedDagNode.ID(), dependencyType) + if err != nil { + return nil, err + } + return groupedDagNode, nil +} + +func (e *executor) prepareOptionalExprDependencies( + expr *infer.OptionalExpression, + workflowContext map[string][]byte, + currentNode dgraph.Node[*DAGItem], + pathInCurrentNode []string, + outputSchema *schema.ScopeSchema, + dag dgraph.DirectedGraph[*DAGItem], +) error { + var dependencyType dgraph.DependencyType + if expr.WaitForCompletion { + dependencyType = dgraph.CompletionAndDependency + } else { + dependencyType = dgraph.OptionalDependency + } + + // Creates a new group node to isolate the optional dependencies. + // The current node will depend on the group node with the dependency type set in `dependencyType`. + optionalDagNode, err := e.createGroupNode(currentNode, pathInCurrentNode, dag, dependencyType) + if err != nil { + return err + } + + expr.GroupNodePath = optionalDagNode.ID() + expr.ParentNodePath = currentNode.ID() + + return e.prepareExprDependencies(expr.Expr, workflowContext, optionalDagNode, outputSchema, dag) +} + func (e *executor) prepareOneOfExprDependencies( expr *infer.OneOfExpression, workflowContext map[string][]byte, @@ -1020,25 +1071,21 @@ func (e *executor) prepareOneOfExprDependencies( if len(expr.Options) == 0 { return fmt.Errorf("oneof %s has no options", expr.String()) } - // In case there are multiple OneOfs, each oneof needs its own node. - orNodeType := &DAGItem{ - Kind: DagItemKindOrGroup, - } - oneofDagNode, err := dag.AddNode( - currentNode.ID()+"."+strings.Join(pathInCurrentNode, "."), orNodeType) - if err != nil { - return err + groupNodeType := &DAGItem{ + Kind: DagItemKindDependencyGroup, } - err = currentNode.ConnectDependency(oneofDagNode.ID(), dgraph.AndDependency) + // In case there are multiple OneOfs, each oneof needs its own node. + oneofDagNode, err := e.createGroupNode(currentNode, pathInCurrentNode, dag, dgraph.AndDependency) if err != nil { return err } + // Mark the node ID on the OneOfExpression. This mutates the expression, so make sure // this is not operating on a copy of the schema for the data to be retained. expr.NodePath = oneofDagNode.ID() for optionID, optionData := range expr.Options { optionDagNode, err := dag.AddNode( - oneofDagNode.ID()+"."+optionID, orNodeType) + oneofDagNode.ID()+"."+optionID, groupNodeType) if err != nil { return err } diff --git a/workflow/model.go b/workflow/model.go index c09c500..b4a9973 100644 --- a/workflow/model.go +++ b/workflow/model.go @@ -170,9 +170,10 @@ const ( DAGItemKindStepStageOutput DAGItemKind = "stepStageOutput" // DAGItemKindOutput indicates a DAG node for the workflow output. DAGItemKindOutput DAGItemKind = "output" - // DagItemKindOrGroup indicates a DAG node used to complete a part of - // an input or output that needs dependencies grouped, typically for OR dependencies. - DagItemKindOrGroup DAGItemKind = "orGroup" + // DagItemKindDependencyGroup indicates a DAG node used to complete a part of + // an input or output that needs dependencies grouped, typically for OR dependencies + // or optional dependencies. + DagItemKindDependencyGroup DAGItemKind = "dependencyGroup" ) // DAGItem is the internal structure of the DAG. diff --git a/workflow/workflow.go b/workflow/workflow.go index e1eaf1e..57f4922 100644 --- a/workflow/workflow.go +++ b/workflow/workflow.go @@ -521,7 +521,7 @@ func (l *loopState) notifySteps() { //nolint:gocognit inputData := nodeItem.Data if inputData == nil { switch nodeItem.Kind { - case DagItemKindOrGroup: + case DagItemKindDependencyGroup: if err := node.ResolveNode(dgraph.Resolved); err != nil { panic(fmt.Errorf("error occurred while resolving workflow OR group node (%s)", err.Error())) } @@ -688,6 +688,8 @@ func (l *loopState) resolveExpressions(inputData any, dataModel any) (any, error return expr.Evaluate(dataModel, l.callableFunctions, l.workflowContext) case *infer.OneOfExpression: return l.resolveOneOfExpression(expr, dataModel) + case *infer.OptionalExpression: + return l.resolveOptionalExpression(expr, dataModel) } v := reflect.ValueOf(inputData) @@ -712,7 +714,9 @@ func (l *loopState) resolveExpressions(inputData any, dataModel any) (any, error if err != nil { return nil, fmt.Errorf("failed to resolve workflow map expressions (%w)", err) } - result[key] = newValue + if newValue != nil { // In case it's an optional field. + result[key] = newValue + } } return result, nil default: @@ -725,6 +729,9 @@ func (l *loopState) resolveOneOfExpression(expr *infer.OneOfExpression, dataMode // Get the node the OneOf uses to check which Or dependency resolved first (the others will either not be // in the resolved list, or they will be obviated) + if expr.NodePath == "" { + return nil, fmt.Errorf("node path is empty in oneof expression %s", expr.String()) + } oneOfNode, err := l.dag.GetNodeByID(expr.NodePath) if err != nil { return nil, fmt.Errorf("failed to get node to resolve oneof expression (%w)", err) @@ -777,6 +784,27 @@ func (l *loopState) resolveOneOfExpression(expr *infer.OneOfExpression, dataMode return outputData, nil } +func (l *loopState) resolveOptionalExpression(expr *infer.OptionalExpression, dataModel any) (any, error) { + l.logger.Debugf("Evaluating oneof expression %s...", expr.Expr.String()) + if expr.ParentNodePath == "" { + return nil, fmt.Errorf("ParentNodePath is empty in resolve optional expression %s", expr.Expr.String()) + } + if expr.GroupNodePath == "" { + return nil, fmt.Errorf("GroupNodePath is empty in resolve optional expression %s", expr.Expr.String()) + } + // Check to see if the group node is resolved within the parent node + parentDagNode, err := l.dag.GetNodeByID(expr.ParentNodePath) + if err != nil { + return nil, fmt.Errorf("failed to get parent node to resolve optional expression (%w)", err) + } + resolvedDependencies := parentDagNode.ResolvedDependencies() + _, dependencyGroupResolved := resolvedDependencies[expr.GroupNodePath] + if !dependencyGroupResolved { + return nil, nil // It's nil to indicate that the optional field is not present. + } + return expr.Expr.Evaluate(dataModel, l.callableFunctions, l.workflowContext) +} + // stageChangeHandler is implementing step.StageChangeHandler. type stageChangeHandler struct { onStageChange func( diff --git a/workflow/workflow_test.go b/workflow/workflow_test.go index 780a10d..ae1c7c8 100644 --- a/workflow/workflow_test.go +++ b/workflow/workflow_test.go @@ -2111,6 +2111,405 @@ outputs: toggled_wait_output: !expr $.steps.toggled_wait.disabled.output ` +var optionalFieldGracefullyDisabledStepWorkflow = ` +version: v0.2.0 +input: + root: WorkflowInput + objects: + WorkflowInput: + id: WorkflowInput + properties: + step_enabled: + type: + type_id: bool +steps: + simple_wait: + plugin: + src: "n/a" + deployment_type: "builtin" + step: wait + input: + wait_time_ms: 0 + enabled: !expr $.input.step_enabled +outputs: + workflow-success: + success_output: !wait-optional $.steps.simple_wait.outputs.success +` + +func TestOptionalFieldGracefullyDisabledStepWorkflow(t *testing.T) { + // Run a workflow where the output uses the !wait-optional tag to allow + // the step to be disabled without breaking the output. + preparedWorkflow := assert.NoErrorR[workflow.ExecutableWorkflow](t)( + getTestImplPreparedWorkflow(t, optionalFieldGracefullyDisabledStepWorkflow), + ) + outputID, outputData, err := preparedWorkflow.Execute(context.Background(), map[string]any{ + "step_enabled": true, + }) + assert.NoError(t, err) + assert.Equals(t, outputID, "workflow-success") + assert.Equals(t, outputData.(map[any]any), map[any]any{ + "success_output": map[any]any{ + "message": "Plugin slept for 0 ms.", + }, + }) + // Test step disabled case + outputID, outputData, err = preparedWorkflow.Execute(context.Background(), map[string]any{ + "step_enabled": false, + }) + assert.NoError(t, err) + assert.Equals(t, outputID, "workflow-success") + assert.Equals(t, outputData.(map[any]any), map[any]any{}) +} + +var optionalFieldWithFailureStepWorkflow = ` +version: v0.2.0 +input: + root: WorkflowInput + objects: + WorkflowInput: + id: WorkflowInput + properties: + fail_purposefully: + type: + type_id: bool +steps: + hello_step: + plugin: + src: "n/a" + deployment_type: "builtin" + step: hello + input: + fail: !expr $.input.fail_purposefully + wait_step: + plugin: + src: "n/a" + deployment_type: "builtin" + step: wait + input: + wait_time_ms: 0 +outputs: + workflow-success: + hello_output: !wait-optional $.steps.hello_step.outputs.success + wait_output: !expr $.steps.wait_step.outputs.success +` + +func TestOptionalFieldWithFailureStepWorkflow(t *testing.T) { + // Test a workflow where one of the steps can fail without + // causing a failure of the output. + preparedWorkflow := assert.NoErrorR[workflow.ExecutableWorkflow](t)( + getTestImplPreparedWorkflow(t, optionalFieldWithFailureStepWorkflow), + ) + outputID, outputData, err := preparedWorkflow.Execute(context.Background(), map[string]any{ + "fail_purposefully": false, + }) + assert.NoError(t, err) + assert.Equals(t, outputID, "workflow-success") + assert.Equals(t, outputData.(map[any]any), map[any]any{ + "wait_output": map[any]any{ + "message": "Plugin slept for 0 ms.", + }, + "hello_output": map[any]any{}, + }) + outputID, outputData, err = preparedWorkflow.Execute(context.Background(), map[string]any{ + "fail_purposefully": true, + }) + assert.NoError(t, err) + assert.Equals(t, outputID, "workflow-success") + assert.Equals(t, outputData.(map[any]any), map[any]any{ + "wait_output": map[any]any{ + "message": "Plugin slept for 0 ms.", + }, + }) +} + +var softOptionalFieldWorkflow = ` +version: v0.2.0 +input: + root: WorkflowInput + objects: + WorkflowInput: + id: WorkflowInput + properties: + wait_1_ms: + type: + type_id: integer + wait_2_ms: + type: + type_id: integer + wait_1_enabled: + type: + type_id: bool + wait_2_enabled: + type: + type_id: bool +steps: + wait_step_1: + plugin: + src: "n/a" + deployment_type: "builtin" + step: wait + input: + wait_time_ms: !expr $.input.wait_1_ms + enabled: !expr $.input.wait_1_enabled + wait_step_2: + plugin: + src: "n/a" + deployment_type: "builtin" + step: wait + input: + wait_time_ms: !expr $.input.wait_2_ms + enabled: !expr $.input.wait_2_enabled +outputs: + workflow-success: + required_output: !expr $.steps.wait_step_1.outputs.success + optional_output: !soft-optional $.steps.wait_step_2.outputs.success +` + +func TestSoftOptionalFieldWorkflow(t *testing.T) { + // This test case handles the soft optional dependency. It uses timing and step + // enablement. Timing is used since there are no order guarantees provided by + // `!soft-optional` alone. If this proves to be fragile, consider utilizing + // multiple steps that wait for each other in a way that ensures proper ordering. + preparedWorkflow := assert.NoErrorR[workflow.ExecutableWorkflow](t)( + getTestImplPreparedWorkflow(t, softOptionalFieldWorkflow), + ) + // Test case where step 1 will be ready before step 2, and will therefore not + // be present in the output. + outputID, outputData, err := preparedWorkflow.Execute(context.Background(), map[string]any{ + "wait_1_ms": 0, + "wait_2_ms": 15, + "wait_1_enabled": true, + "wait_2_enabled": true, + }) + assert.NoError(t, err) + assert.Equals(t, outputID, "workflow-success") + assert.Equals(t, outputData.(map[any]any), map[any]any{ + "required_output": map[any]any{ + "message": "Plugin slept for 0 ms.", + }, + }) + // Test case where step 1 is going to take so much longer that we can all but + // guarantee that step 2's output will be present before step 1 completes. + outputID, outputData, err = preparedWorkflow.Execute(context.Background(), map[string]any{ + "wait_1_ms": 15, + "wait_2_ms": 0, + "wait_1_enabled": true, + "wait_2_enabled": true, + }) + assert.NoError(t, err) + assert.Equals(t, outputID, "workflow-success") + assert.Equals(t, outputData.(map[any]any), map[any]any{ + "required_output": map[any]any{ + "message": "Plugin slept for 15 ms.", + }, + "optional_output": map[any]any{ + "message": "Plugin slept for 0 ms.", + }, + }) + // Test case where step 2 is just disabled, and will therefore be not present in + // the output due to being unresolvable. + outputID, outputData, err = preparedWorkflow.Execute(context.Background(), map[string]any{ + "wait_1_ms": 5, + "wait_2_ms": 5, + "wait_1_enabled": true, + "wait_2_enabled": false, + }) + assert.NoError(t, err) + assert.Equals(t, outputID, "workflow-success") + assert.Equals(t, outputData.(map[any]any), map[any]any{ + "required_output": map[any]any{ + "message": "Plugin slept for 5 ms.", + }, + }) + // Test proper failure handling of the required step. The optional value will be available, + // but should not be used. + outputID, _, err = preparedWorkflow.Execute(context.Background(), map[string]any{ + "wait_1_ms": 0, + "wait_2_ms": 0, + "wait_1_enabled": false, + "wait_2_enabled": true, + }) + assert.Error(t, err) + assert.Equals(t, outputID, "") + var typedError *workflow.ErrNoMorePossibleOutputs + if !errors.As(err, &typedError) { + t.Fatalf("incorrect error type returned: %T (%s)", err, err) + } +} + +var forEachWithOptionalWf = ` +version: v0.2.0 +input: + root: RootObject + objects: + RootObject: + id: RootObject + properties: + step_to_run: + type: + type_id: string +steps: + step_a: + plugin: + src: "n/a" + deployment_type: "builtin" + step: hello + input: + fail: !expr false + enabled: !expr $.input.step_to_run == "a" + step_b: + plugin: + src: "n/a" + deployment_type: "builtin" + step: wait + input: + wait_time_ms: 0 + enabled: !expr $.input.step_to_run == "b" + subwf_step: + kind: foreach + items: + - input_1: !wait-optional $.steps.step_a.outputs.success + input_2: !wait-optional $.steps.step_b.outputs.success + workflow: subworkflow.yaml +outputs: + success: + subwf_result: !expr $.steps.subwf_step.outputs +` + +var forEachWithOptionalSubWf = ` +version: v0.2.0 +input: + root: RootObject + objects: + RootObject: + id: RootObject + properties: + input_1: + required: false + type: + type_id: object + id: hello-output + properties: {} + input_2: + required: false + type: + type_id: object + id: output + properties: + message: + type: + type_id: string +steps: + placeholder_step: + plugin: + src: "n/a" + deployment_type: "builtin" + step: hello + input: + fail: !expr false +outputs: + success: !expr $.input +` + +func TestForeachWithOptional(t *testing.T) { + // This test tests the optional tag `!wait-optional` being used to create an input to + // a subworkflow. + // Since the subworkflow schema must match the input, this also validates that + // the inferred schema is correct. + logConfig := log.Config{ + Level: log.LevelDebug, + Destination: log.DestinationStdout, + } + logger := log.New( + logConfig, + ) + cfg := &config.Config{ + Log: logConfig, + } + factories := workflowFactory{ + config: cfg, + } + deployerRegistry := deployerregistry.New( + deployer.Any(testimpl.NewFactory()), + ) + + pluginProvider := assert.NoErrorR[step.Provider](t)( + plugin.New(logger, deployerRegistry, map[string]interface{}{ + "builtin": map[string]any{ + "deployer_name": "test-impl", + "deploy_time": "0", + }, + }), + ) + stepRegistry, err := stepregistry.New( + pluginProvider, + lang.Must2(foreach.New(logger, factories.createYAMLParser, factories.createWorkflow)), + ) + assert.NoError(t, err) + + factories.stepRegistry = stepRegistry + executor := lang.Must2(workflow.NewExecutor( + logger, + cfg, + stepRegistry, + builtinfunctions.GetFunctions(), + )) + wf := lang.Must2(workflow.NewYAMLConverter(stepRegistry).FromYAML([]byte(forEachWithOptionalWf))) + preparedWorkflow := lang.Must2(executor.Prepare(wf, map[string][]byte{ + "subworkflow.yaml": []byte(forEachWithOptionalSubWf), + })) + outputID, outputData, err := preparedWorkflow.Execute(context.Background(), map[string]any{ + "step_to_run": "a", + }) + assert.NoError(t, err) + assert.Equals(t, outputID, "success") + assert.Equals(t, outputData.(map[any]any), map[any]any{ + "subwf_result": map[string]any{ + "success": map[string]any{ + "data": []any{ + map[string]any{ + "input_1": map[string]any{}, + }, + }, + }, + }, + }) + + outputID, outputData, err = preparedWorkflow.Execute(context.Background(), map[string]any{ + "step_to_run": "b", + }) + assert.NoError(t, err) + assert.Equals(t, outputID, "success") + assert.Equals(t, outputData.(map[any]any), map[any]any{ + "subwf_result": map[string]any{ + "success": map[string]any{ + "data": []any{ + map[string]any{ + "input_2": map[string]any{ + "message": "Plugin slept for 0 ms.", + }, + }, + }, + }, + }, + }) + + outputID, outputData, err = preparedWorkflow.Execute(context.Background(), map[string]any{ + "step_to_run": "non-existent", + }) + assert.NoError(t, err) + assert.Equals(t, outputID, "success") + assert.Equals(t, outputData.(map[any]any), map[any]any{ + "subwf_result": map[string]any{ + "success": map[string]any{ + "data": []any{ + map[string]any{}, + }, + }, + }, + }) +} + func TestDelayedDisabledStepWorkflow(t *testing.T) { // Run a workflow where the step is disabled by a value that isn't available // at the start of the workflow; in this case the step is disabled from diff --git a/workflow/yaml.go b/workflow/yaml.go index e79a99e..21b1f4e 100644 --- a/workflow/yaml.go +++ b/workflow/yaml.go @@ -63,13 +63,21 @@ const YamlDiscriminatorKey = "discriminator" // YamlOneOfTag is the yaml tag that allows the section to be interpreted as a OneOf. const YamlOneOfTag = "!oneof" -// OrDisabledTag is the key to specify that the following code should be interpreted as a `oneof` type with -// two possible outputs: the expr specified or the disabled output. +// OrDisabledTag is the yaml tag to specify that the following code should be interpreted +// as a `oneof` type with two possible outputs: the expr specified or the disabled output. const OrDisabledTag = "!ordisabled" -func buildExpression(data yaml.Node, path []string, tag string) (expressions.Expression, error) { +// WaitOptionalTag is the tag to specify that the field, when used in an object, +// should be optional with a completion DAG dependency. +const WaitOptionalTag = "!wait-optional" + +// SoftOptionalTag is the tag to specify that the field, when used in an object, +// should be optional with an optional DAG dependency. +const SoftOptionalTag = "!soft-optional" + +func buildExpression(data yaml.Node, path []string) (expressions.Expression, error) { if data.Type() != yaml.TypeIDString { - return nil, fmt.Errorf("%s found on non-string node at %s", tag, strings.Join(path, " -> ")) + return nil, fmt.Errorf("%s found on non-string node at %s", data.Tag(), strings.Join(path, " -> ")) } expr, err := expressions.New(data.Value()) if err != nil { @@ -127,8 +135,8 @@ var stepPathRegex = regexp.MustCompile(`((?:\$.)?steps\.[^.]+)(\..+)`) // Builds a oneof for the given path, or the step disabled output. // Requires this to be a valid step output. But it is flexible to support all outputs, // a specific output, or a field within a specific output. -func buildResultOrDisabledExpression(data yaml.Node, path []string) (any, error) { - successExpr, err := buildExpression(data, path, OrDisabledTag) +func buildResultOrDisabledExpression(data yaml.Node, path []string) (*infer.OneOfExpression, error) { + successExpr, err := buildExpression(data, path) if err != nil { return nil, err } @@ -155,14 +163,40 @@ func buildResultOrDisabledExpression(data yaml.Node, path []string) (any, error) }, nil } +func buildOptionalExpression(data yaml.Node, path []string) (*infer.OptionalExpression, error) { + var waitForCompletion bool + switch data.Tag() { + case SoftOptionalTag: + waitForCompletion = false + case WaitOptionalTag: + waitForCompletion = true + default: + return nil, fmt.Errorf( + "unsupported tag %q in buildOptionalExpression at %s", + data.Tag(), + strings.Join(path, " -> "), + ) + } + expr, err := buildExpression(data, path) + if err != nil { + return nil, err + } + return &infer.OptionalExpression{ + Expr: expr, + WaitForCompletion: waitForCompletion, + }, nil +} + func yamlBuildExpressions(data yaml.Node, path []string) (any, error) { switch data.Tag() { case YamlExprTag: - return buildExpression(data, path, YamlExprTag) + return buildExpression(data, path) case YamlOneOfTag: return buildOneOfExpressions(data, path) case OrDisabledTag: return buildResultOrDisabledExpression(data, path) + case SoftOptionalTag, WaitOptionalTag: + return buildOptionalExpression(data, path) } switch data.Type() { case yaml.TypeIDString: diff --git a/workflow/yaml_test.go b/workflow/yaml_test.go index 812bc1b..2c9d028 100644 --- a/workflow/yaml_test.go +++ b/workflow/yaml_test.go @@ -107,10 +107,8 @@ func TestBuildResultOrDisabledExpression_Simple(t *testing.T) { // Test without root $ yamlInput := []byte(`!ordisabled steps.test.outputs`) input := assert.NoErrorR[yaml.Node](t)(yaml.New().Parse(yamlInput)) - result, err := buildResultOrDisabledExpression(input, make([]string, 0)) + oneOfResult, err := buildResultOrDisabledExpression(input, make([]string, 0)) assert.NoError(t, err) - assert.InstanceOf[*infer.OneOfExpression](t, result) - oneOfResult := result.(*infer.OneOfExpression) assert.Equals(t, oneOfResult.Discriminator, "result") assert.Equals(t, oneOfResult.Options, map[string]any{ "enabled": lang.Must2(expressions.New("steps.test.outputs")), @@ -120,10 +118,8 @@ func TestBuildResultOrDisabledExpression_Simple(t *testing.T) { // Test with all outputs yamlInput = []byte(`!ordisabled $.steps.test.outputs`) input = assert.NoErrorR[yaml.Node](t)(yaml.New().Parse(yamlInput)) - result, err = buildResultOrDisabledExpression(input, make([]string, 0)) + oneOfResult, err = buildResultOrDisabledExpression(input, make([]string, 0)) assert.NoError(t, err) - assert.InstanceOf[*infer.OneOfExpression](t, result) - oneOfResult = result.(*infer.OneOfExpression) assert.Equals(t, oneOfResult.Discriminator, "result") assert.Equals(t, oneOfResult.Options, map[string]any{ "enabled": lang.Must2(expressions.New("$.steps.test.outputs")), @@ -133,10 +129,8 @@ func TestBuildResultOrDisabledExpression_Simple(t *testing.T) { // Test with a specific output yamlInput = []byte(`!ordisabled $.steps.test.outputs.success`) input = assert.NoErrorR[yaml.Node](t)(yaml.New().Parse(yamlInput)) - result, err = buildResultOrDisabledExpression(input, make([]string, 0)) + oneOfResult, err = buildResultOrDisabledExpression(input, make([]string, 0)) assert.NoError(t, err) - assert.InstanceOf[*infer.OneOfExpression](t, result) - oneOfResult = result.(*infer.OneOfExpression) assert.Equals(t, oneOfResult.Discriminator, "result") assert.Equals(t, oneOfResult.Options, map[string]any{ "enabled": lang.Must2(expressions.New("$.steps.test.outputs.success")), @@ -169,7 +163,43 @@ func TestBuildResultOrDisabledExpression_InvalidPattern(t *testing.T) { func TestBuildExpression_WrongType(t *testing.T) { yamlInput := []byte(`!expr {}`) // A map input := assert.NoErrorR[yaml.Node](t)(yaml.New().Parse(yamlInput)) - _, err := buildExpression(input, make([]string, 0), YamlExprTag) + _, err := buildExpression(input, make([]string, 0)) assert.Error(t, err) assert.Contains(t, err.Error(), "found on non-string node") } + +func TestBuildWaitOptionalExpr_Simple(t *testing.T) { + yamlInput := []byte(`!wait-optional some_expr`) + input := assert.NoErrorR[yaml.Node](t)(yaml.New().Parse(yamlInput)) + optionalResult, err := buildOptionalExpression(input, make([]string, 0)) + assert.NoError(t, err) + assert.Equals(t, optionalResult.WaitForCompletion, true) + assert.Equals(t, optionalResult.Expr, lang.Must2(expressions.New("some_expr"))) +} + +func TestBuildSoftOptionalExpr_Simple(t *testing.T) { + yamlInput := []byte(`!soft-optional some_expr`) + input := assert.NoErrorR[yaml.Node](t)(yaml.New().Parse(yamlInput)) + optionalResult, err := buildOptionalExpression(input, make([]string, 0)) + assert.NoError(t, err) + assert.Equals(t, optionalResult.WaitForCompletion, false) + assert.Equals(t, optionalResult.Expr, lang.Must2(expressions.New("some_expr"))) +} + +func TestBuildWaitOptionalExpr_InvalidExpr(t *testing.T) { + yamlInput := []byte(`!wait-optional ....`) + input := assert.NoErrorR[yaml.Node](t)(yaml.New().Parse(yamlInput)) + _, err := buildOptionalExpression(input, make([]string, 0)) + assert.Error(t, err) + assert.Contains(t, err.Error(), "placed in invalid configuration") +} + +func TestBuildWaitOptionalExpr_InvalidTag(t *testing.T) { + // This code tests an invalid tag used with buildOptionalExpressions that + // should not be possible in production code due to other checks. + yamlInput := []byte(`!invalid some_expr`) + input := assert.NoErrorR[yaml.Node](t)(yaml.New().Parse(yamlInput)) + _, err := buildOptionalExpression(input, make([]string, 0)) + assert.Error(t, err) + assert.Contains(t, err.Error(), "unsupported tag") +}