From 51616500e942bf036f66c9153f90ac4859224cf2 Mon Sep 17 00:00:00 2001 From: HenryNguyen5 <6404866+HenryNguyen5@users.noreply.github.com> Date: Tue, 19 Mar 2024 20:20:07 -0400 Subject: [PATCH] Add comments and renames --- core/services/workflows/engine.go | 127 ++++++++++++++++---------- core/services/workflows/queue.go | 14 +-- core/services/workflows/queue_test.go | 6 +- core/services/workflows/state.go | 60 ++++++++---- core/services/workflows/store.go | 3 + core/services/workflows/workflow.go | 53 ++++++----- 6 files changed, 163 insertions(+), 100 deletions(-) diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index f740213e627..5e97316c4e3 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -22,16 +22,20 @@ const ( mockedTriggerID = "cccccccccc0000000000000000000000" ) +// Engine handles the lifecycle of a single workflow and its executions. type Engine struct { services.StateMachine logger logger.Logger registry types.CapabilitiesRegistry workflow *workflow - store *store - queue *queue[stepRequest] - callbackCh chan capabilities.CapabilityResponse + executionStates *store + // NOTE: I do find it confusing that pending step requests are global rather than scoped to a single execution + pendingStepRequests *queue[stepRequest] + triggerEvents chan capabilities.CapabilityResponse newWorkerCh chan struct{} stepUpdateCh chan stepState + // wg is only used to make sure that in the case of a shutdown, + // we wait for all pending steps to finish. wg sync.WaitGroup stopCh services.StopChan } @@ -43,7 +47,7 @@ func (e *Engine) Start(ctx context.Context) error { // queue.start will add to the wg and // spin off a goroutine. - e.queue.start(ctx, &e.wg) + e.pendingStepRequests.start(ctx, &e.wg) e.wg.Add(2) go e.init(ctx) @@ -53,6 +57,13 @@ func (e *Engine) Start(ctx context.Context) error { }) } +// init does the following: +// +// 1. Resolves the underlying capability for each trigger +// 2. Registers each step's capability to this workflow +// 3. Registers for trigger events now that all capabilities are resolved +// +// Steps 1 and 2 are retried every 5 seconds until successful. func (e *Engine) init(ctx context.Context) { defer e.wg.Done() @@ -67,6 +78,7 @@ LOOP: case <-ctx.Done(): return case <-ticker.C: + // Resolve the underlying capability for each trigger for _, t := range e.workflow.triggers { cp, err := e.registry.GetTrigger(ctx, t.Type) if err != nil { @@ -77,13 +89,15 @@ LOOP: } } - err := e.workflow.walkDo(keywordTrigger, func(n *node) error { - // The graph contains a dummy node for triggers, but + // Walk the graph and register each step's capablity to this workflow + err := e.workflow.walkDo(keywordTrigger, func(n *step) error { + // The graph contains a dummy step for triggers, but // we handle triggers separately since there might be more than one. if n.Ref == keywordTrigger { return nil } + // If the capability is already cached, that means we've already registered it if n.cachedCapability != nil { return nil } @@ -93,6 +107,7 @@ LOOP: return fmt.Errorf("failed to get capability with ref %s: %s, retrying in %d seconds", n.Type, innerErr, retrySec) } + // We only support CallbackExecutable capabilities for now cc, ok := cp.(capabilities.CallbackExecutable) if !ok { return fmt.Errorf("could not coerce capability %s to CallbackExecutable", n.Type) @@ -143,6 +158,7 @@ LOOP: e.logger.Info("engine initialized") } +// registerTrigger is used during the initialization phase to bind a trigger to this workflow func (e *Engine) registerTrigger(ctx context.Context, t *triggerCapability) error { triggerInputs, err := values.NewMap( map[string]any{ @@ -165,7 +181,7 @@ func (e *Engine) registerTrigger(ctx context.Context, t *triggerCapability) erro Config: tc, Inputs: triggerInputs, } - err = t.cachedTrigger.RegisterTrigger(ctx, e.callbackCh, triggerRegRequest) + err = t.cachedTrigger.RegisterTrigger(ctx, e.triggerEvents, triggerRegRequest) if err != nil { return fmt.Errorf("failed to instantiate trigger %s, %s", t.Type, err) } @@ -173,14 +189,17 @@ func (e *Engine) registerTrigger(ctx context.Context, t *triggerCapability) erro } // loop is the synchronization goroutine for the engine, and is responsible for: -// - dispatching new workers up to the limit specified (default = 100) -// - starting a new execution when a trigger emits a message on `callbackCh` -// - updating the `executionState` with the outcome of a `step`. +// - dispatching new workers up to the limit specified (default = 100) +// - starting a new execution when a trigger emits a message on `triggerEvents` +// - updating the `executionState` with the outcome of a `step`. // // Note: `executionState` is only mutated by this loop directly. +// // This is important to avoid data races, and any accesses of `executionState` by any other // goroutine should happen via a `stepRequest` message containing a copy of the latest -// `executionState`. This works because a worker thread for a given step will only +// `executionState`. +// +// This works because a worker thread for a given step will only // be spun up once all dependent steps have completed (guaranteeing that the state associated // with those dependent steps will no longer change). Therefore as long this worker thread only // accesses data from dependent states, the data will never be stale. @@ -190,7 +209,7 @@ func (e *Engine) loop(ctx context.Context) { select { case <-ctx.Done(): return - case resp := <-e.callbackCh: + case resp := <-e.triggerEvents: if resp.Err != nil { e.logger.Errorf("trigger event was an error; not executing", resp.Err) continue @@ -200,10 +219,13 @@ func (e *Engine) loop(ctx context.Context) { if err != nil { e.logger.Errorf("failed to start execution: %w", err) } - case dm := <-e.queue.out: + case stepRequest := <-e.pendingStepRequests.dequeue: + // Wait for a new worker to be available before dispatching a new one. <-e.newWorkerCh + // NOTE: Can we add this to e.workerForStep instead? e.wg.Add(1) - go e.workerForStep(ctx, dm) + // NOTE: Should we instead add a "process" method to the queue, and do concurrency control there? + go e.workerForStepRequest(ctx, stepRequest) case stepUpdate := <-e.stepUpdateCh: // Executed synchronously to ensure we correctly schedule subsequent tasks. err := e.handleStepUpdate(ctx, stepUpdate) @@ -214,6 +236,7 @@ func (e *Engine) loop(ctx context.Context) { } } +// startExecution kicks off a new workflow execution when a trigger event is received. func (e *Engine) startExecution(ctx context.Context, event values.Value) error { executionID := uuid.New().String() e.logger.Debugw("executing on a trigger event", "event", event, "executionID", executionID) @@ -231,44 +254,45 @@ func (e *Engine) startExecution(ctx context.Context, event values.Value) error { status: statusStarted, } - err := e.store.add(ctx, ec) + err := e.executionStates.add(ctx, ec) if err != nil { return err } // Find the tasks we need to fire when a trigger has fired and enqueue them. - an, err := e.workflow.adjacentNodes(keywordTrigger) + triggerDependents, err := e.workflow.dependents(keywordTrigger) if err != nil { return err } - for _, node := range an { - e.logger.Debugw("step request enqueued", "ref", node.Ref, "executionID", executionID) - e.queue.in <- stepRequest{state: copyState(*ec), stepRef: node.Ref} + for _, step := range triggerDependents { + e.logger.Debugw("step request enqueued", "ref", step.Ref, "executionID", executionID) + e.pendingStepRequests.enqueue <- stepRequest{state: copyState(*ec), stepRef: step.Ref} } return nil } func (e *Engine) handleStepUpdate(ctx context.Context, stepUpdate stepState) error { - state, err := e.store.updateStep(ctx, &stepUpdate) + state, err := e.executionStates.updateStep(ctx, &stepUpdate) if err != nil { return err } switch stepUpdate.status { case statusCompleted: - adjacentNodes, err := e.workflow.adjacentNodes(stepUpdate.ref) + stepDependents, err := e.workflow.dependents(stepUpdate.ref) if err != nil { return err } - // There are no nodes left to process in the current path, so let's check if + // There are no steps left to process in the current path, so let's check if // we've completed the workflow. - // If not, we'll check adjacent nodes for any that are ready to process. - if len(adjacentNodes) == 0 { + // If not, we'll check for any dependents that are ready to process. + if len(stepDependents) == 0 { workflowCompleted := true - err := e.workflow.walkDo(keywordTrigger, func(n *node) error { + err := e.workflow.walkDo(keywordTrigger, func(n *step) error { step, ok := state.steps[n.Ref] + // Note: Why do we not return an error if !ok? if !ok { workflowCompleted = false return nil @@ -286,35 +310,38 @@ func (e *Engine) handleStepUpdate(ctx context.Context, stepUpdate stepState) err } if workflowCompleted { - err := e.store.updateStatus(ctx, state.executionID, statusCompleted) + err := e.executionStates.updateStatus(ctx, state.executionID, statusCompleted) if err != nil { return err } } } - for _, node := range adjacentNodes { - var anyNotCompleted bool - for _, dr := range node.dependencies { - step, ok := state.steps[dr] + for _, step := range stepDependents { + // Check if all dependencies are completed for the current step + var waitingOnDependencies bool + for _, dr := range step.dependencies { + stepState, ok := state.steps[dr] if !ok { return fmt.Errorf("could not locate dependency %s in %+v", dr, state) } - if step.status != statusCompleted { - anyNotCompleted = true + // NOTE: Should we also check for statusErrored? + if stepState.status != statusCompleted { + waitingOnDependencies = true } } - if !anyNotCompleted { - e.queue.in <- stepRequest{ + // If all dependencies are completed, enqueue the step. + if !waitingOnDependencies { + e.pendingStepRequests.enqueue <- stepRequest{ state: copyState(state), - stepRef: node.Ref, + stepRef: step.Ref, } } } case statusErrored: - err := e.store.updateStatus(ctx, state.executionID, statusErrored) + err := e.executionStates.updateStatus(ctx, state.executionID, statusErrored) if err != nil { return err } @@ -323,7 +350,8 @@ func (e *Engine) handleStepUpdate(ctx context.Context, stepUpdate stepState) err return nil } -func (e *Engine) workerForStep(ctx context.Context, msg stepRequest) { +// NOTE: Should this be attached to a step struct instead of the engine? +func (e *Engine) workerForStepRequest(ctx context.Context, msg stepRequest) { defer e.wg.Done() e.logger.Debugw("executing on a step event", "event", msg, "executionID", msg.state.executionID) @@ -333,7 +361,7 @@ func (e *Engine) workerForStep(ctx context.Context, msg stepRequest) { ref: msg.stepRef, } - inputs, outputs, err := e.handleStep(ctx, msg) + inputs, outputs, err := e.executeStep(ctx, msg) if err != nil { e.logger.Errorf("error executing step request: %w", err, "executionID", msg.state.executionID, "stepRef", msg.stepRef) stepState.outputs.err = err @@ -350,13 +378,14 @@ func (e *Engine) workerForStep(ctx context.Context, msg stepRequest) { e.newWorkerCh <- struct{}{} } -func (e *Engine) handleStep(ctx context.Context, msg stepRequest) (*values.Map, values.Value, error) { - node, err := e.workflow.Vertex(msg.stepRef) +// executeStep executes the referenced capability within a step and returns the result. +func (e *Engine) executeStep(ctx context.Context, msg stepRequest) (*values.Map, values.Value, error) { + step, err := e.workflow.Vertex(msg.stepRef) if err != nil { return nil, nil, err } - i, err := findAndInterpolateAllKeys(node.Inputs, msg.state) + i, err := findAndInterpolateAllKeys(step.Inputs, msg.state) if err != nil { return nil, nil, err } @@ -368,14 +397,14 @@ func (e *Engine) handleStep(ctx context.Context, msg stepRequest) (*values.Map, tr := capabilities.CapabilityRequest{ Inputs: inputs, - Config: node.cachedConfig, + Config: step.cachedConfig, Metadata: capabilities.RequestMetadata{ WorkflowID: msg.state.workflowID, WorkflowExecutionID: msg.state.executionID, }, } - resp, err := capabilities.ExecuteSync(ctx, node.cachedCapability, tr) + resp, err := capabilities.ExecuteSync(ctx, step.cachedCapability, tr) if err != nil { return inputs, nil, err } @@ -389,7 +418,7 @@ func (e *Engine) handleStep(ctx context.Context, msg stepRequest) (*values.Map, return inputs, resp.Underlying[0], err } -func (e *Engine) deregisterTrigger(ctx context.Context, t *triggerCapability) error { +func (e *Engine) deregisterTrigger(_ context.Context, t *triggerCapability) error { triggerInputs, err := values.NewMap( map[string]any{ "triggerId": mockedTriggerID, @@ -424,7 +453,7 @@ func (e *Engine) Close() error { close(e.stopCh) e.wg.Wait() - err := e.workflow.walkDo(keywordTrigger, func(n *node) error { + err := e.workflow.walkDo(keywordTrigger, func(n *step) error { if n.Ref == keywordTrigger { return nil } @@ -468,7 +497,7 @@ func NewEngine(cfg Config) (engine *Engine, err error) { } // TODO: validation of the workflow spec // We'll need to check, among other things: - // - that there are no node `ref` called `trigger` as this is reserved for any triggers + // - that there are no step `ref` called `trigger` as this is reserved for any triggers // - that there are no duplicate `ref`s // - that the `ref` for any triggers is empty -- and filled in with `trigger` // - etc. @@ -488,11 +517,11 @@ func NewEngine(cfg Config) (engine *Engine, err error) { logger: cfg.Lggr.Named("WorkflowEngine"), registry: cfg.Registry, workflow: workflow, - store: newStore(), - queue: newQueue[stepRequest](), + executionStates: newStore(), + pendingStepRequests: newQueue[stepRequest](), newWorkerCh: newWorkerCh, stepUpdateCh: make(chan stepState), - callbackCh: make(chan capabilities.CapabilityResponse), + triggerEvents: make(chan capabilities.CapabilityResponse), stopCh: make(chan struct{}), } return engine, nil diff --git a/core/services/workflows/queue.go b/core/services/workflows/queue.go index 5ae9d4e4424..8ce23b0d529 100644 --- a/core/services/workflows/queue.go +++ b/core/services/workflows/queue.go @@ -11,26 +11,26 @@ type stepRequest struct { } type queue[T any] struct { - in chan T - out chan T + enqueue chan T + dequeue chan T } func (q *queue[T]) worker(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() - + // NOTE: Should there be a max size for the queue? qData := []T{} for { select { case <-ctx.Done(): return - case inc := <-q.in: + case inc := <-q.enqueue: qData = append(qData, inc) default: if len(qData) > 0 { popped := qData[0] select { - case q.out <- popped: + case q.dequeue <- popped: qData = qData[1:] default: } @@ -47,7 +47,7 @@ func (q *queue[T]) start(ctx context.Context, wg *sync.WaitGroup) { func newQueue[T any]() *queue[T] { return &queue[T]{ - in: make(chan T), - out: make(chan T), + enqueue: make(chan T), + dequeue: make(chan T), } } diff --git a/core/services/workflows/queue_test.go b/core/services/workflows/queue_test.go index d115cd69a4e..c5cf5efe781 100644 --- a/core/services/workflows/queue_test.go +++ b/core/services/workflows/queue_test.go @@ -17,15 +17,15 @@ func TestQueue(t *testing.T) { ints := []int{1, 2, 3, 4, 5} for _, i := range ints { - q.in <- i + q.enqueue <- i } got := []int{} for i := 0; i < 5; i++ { - got = append(got, <-q.out) + got = append(got, <-q.dequeue) } assert.Equal(t, ints, got) - assert.Len(t, q.out, 0) + assert.Len(t, q.dequeue, 0) } diff --git a/core/services/workflows/state.go b/core/services/workflows/state.go index 62eec51a835..25c573ed7d9 100644 --- a/core/services/workflows/state.go +++ b/core/services/workflows/state.go @@ -38,14 +38,14 @@ type executionState struct { status string } +// copyState returns a deep copy of the input executionState func copyState(es executionState) executionState { steps := map[string]*stepState{} for ref, step := range es.steps { var mval *values.Map if step.inputs != nil { mp := values.Proto(step.inputs).GetMapValue() - copied := values.FromMapValueProto(mp) - mval = copied + mval = values.FromMapValueProto(mp) } op := values.Proto(step.outputs.value) @@ -76,9 +76,14 @@ func copyState(es executionState) executionState { // interpolateKey takes a multi-part, dot-separated key and attempts to replace // it with its corresponding value in `state`. -// A key is valid if: -// - it contains at least two parts, with the first part being the workflow step's `ref` variable, and the second being one of `inputs` or `outputs` -// - any subsequent parts will be processed as a list index (if the current element is a list) or a map key (if it's a map) +// +// A key is valid if it contains at least two parts, with: +// - the first part being the workflow step's `ref` variable +// - the second part being one of `inputs` or `outputs` +// +// If a key has more than two parts, then we traverse the parts +// to find the value we want to replace. +// We support traversing both nested maps and lists and any combination of the two. func interpolateKey(key string, state executionState) (any, error) { parts := strings.Split(key, ".") @@ -86,6 +91,7 @@ func interpolateKey(key string, state executionState) (any, error) { return "", fmt.Errorf("cannot interpolate %s: must have at least two parts", key) } + // lookup the step we want to get either input or output state from sc, ok := state.steps[parts[0]] if !ok { return "", fmt.Errorf("could not find ref `%s`", parts[0]) @@ -94,6 +100,7 @@ func interpolateKey(key string, state executionState) (any, error) { var value values.Value switch parts[1] { case "inputs": + // Can this error? What happens if this input depends on a previous step's output? value = sc.inputs case "outputs": if sc.outputs.err != nil { @@ -116,24 +123,22 @@ func interpolateKey(key string, state executionState) (any, error) { case map[string]any: inner, ok := v[r] if !ok { + // probably worthwhile to print the entire reference here return "", fmt.Errorf("could not find ref part `%s` in `%+v`", r, v) } val = inner case []any: - d, err := strconv.Atoi(r) + i, err := strconv.Atoi(r) if err != nil { return "", fmt.Errorf("could not interpolate ref part `%s` in `%+v`: `%s` is not convertible to an int", r, v, r) } - if d > len(v)-1 { - return "", fmt.Errorf("could not interpolate ref part `%s` in `%+v`: cannot fetch index %d", r, v, d) + if (i > len(v)-1) || (i < 0){ + return "", fmt.Errorf("could not interpolate ref part `%s` in `%+v`: index out of bounds %d", r, v, i) } - if d < 0 { - return "", fmt.Errorf("could not interpolate ref part `%s` in `%+v`: index %d must be a positive number", r, v, d) - } - val = v[d] + val = v[i] default: return "", fmt.Errorf("could not interpolate ref part `%s` in `%+v`", r, val) } @@ -148,9 +153,10 @@ var ( // findAndInterpolateAllKeys takes an `input` any value, and recursively // identifies any values that should be replaced from `state`. -// A value `v` should be replaced if it is wrapped as follows `$(v)`. +// +// A value `v` should be replaced if it is wrapped as follows: `$(v)`. func findAndInterpolateAllKeys(input any, state executionState) (any, error) { - return traverse( + return deepMap( input, func(el string) (any, error) { matches := interpolationTokenRe.FindStringSubmatch(el) @@ -164,10 +170,17 @@ func findAndInterpolateAllKeys(input any, state executionState) (any, error) { ) } +// findRefs takes an `inputs` map and returns a list of all the step references +// contained within it. func findRefs(inputs map[string]any) ([]string, error) { refs := []string{} - _, err := traverse( + _, err := deepMap( inputs, + // This function is called for each string in the map + // for each string, we iterate over each match of the interpolation token + // - if there are no matches, return no reference + // - if there is one match, return the reference + // - if there are multiple matches (in the case of a multi-part state reference), return just the step ref func(el string) (any, error) { matches := interpolationTokenRe.FindStringSubmatch(el) if len(matches) < 2 { @@ -187,10 +200,19 @@ func findRefs(inputs map[string]any) ([]string, error) { return refs, err } -func traverse(input any, do func(el string) (any, error)) (any, error) { +// deepMap recursively applies a transformation function +// over each string within: +// +// - a map[string]any +// - a []any +// - a string +func deepMap(input any, transform func(el string) (any, error)) (any, error) { + // in the case of a string, simply apply the transformation + // in the case of a map, recurse and apply the transformation to each value + // in the case of a list, recurse and apply the transformation to each element switch tv := input.(type) { case string: - nv, err := do(tv) + nv, err := transform(tv) if err != nil { return nil, err } @@ -199,7 +221,7 @@ func traverse(input any, do func(el string) (any, error)) (any, error) { case map[string]any: nm := map[string]any{} for k, v := range tv { - nv, err := traverse(v, do) + nv, err := deepMap(v, transform) if err != nil { return nil, err } @@ -210,7 +232,7 @@ func traverse(input any, do func(el string) (any, error)) (any, error) { case []any: a := []any{} for _, el := range tv { - ne, err := traverse(el, do) + ne, err := deepMap(el, transform) if err != nil { return nil, err } diff --git a/core/services/workflows/store.go b/core/services/workflows/store.go index bb9a8d14bcc..4527e2b4f16 100644 --- a/core/services/workflows/store.go +++ b/core/services/workflows/store.go @@ -15,6 +15,7 @@ func newStore() *store { return &store{idToState: map[string]*executionState{}} } +// add adds a new execution state under the given executionID func (s *store) add(ctx context.Context, state *executionState) error { s.mu.Lock() defer s.mu.Unlock() @@ -27,6 +28,7 @@ func (s *store) add(ctx context.Context, state *executionState) error { return nil } +// updateStep updates a step for the given executionID func (s *store) updateStep(ctx context.Context, step *stepState) (executionState, error) { s.mu.Lock() defer s.mu.Unlock() @@ -39,6 +41,7 @@ func (s *store) updateStep(ctx context.Context, step *stepState) (executionState return *state, nil } +// updateStatus updates the status for the given executionID func (s *store) updateStatus(ctx context.Context, executionID string, status string) error { s.mu.Lock() defer s.mu.Unlock() diff --git a/core/services/workflows/workflow.go b/core/services/workflows/workflow.go index 52392607c60..21d734553d0 100644 --- a/core/services/workflows/workflow.go +++ b/core/services/workflows/workflow.go @@ -11,37 +11,45 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/values" ) -type Capability struct { +// yamlStep is the yaml parsed representation of a step in a workflow. +type yamlStep struct { Type string `yaml:"type"` Ref string `yaml:"ref"` Inputs map[string]any `yaml:"inputs"` Config map[string]any `yaml:"config"` } -type workflowSpec struct { - Triggers []Capability `yaml:"triggers"` - Actions []Capability `yaml:"actions"` - Consensus []Capability `yaml:"consensus"` - Targets []Capability `yaml:"targets"` +// yamlWorkflowSpec is the yaml parsed representation of a workflow. +type yamlWorkflowSpec struct { + Triggers []yamlStep `yaml:"triggers"` + Actions []yamlStep `yaml:"actions"` + Consensus []yamlStep `yaml:"consensus"` + Targets []yamlStep `yaml:"targets"` } -func (w *workflowSpec) steps() []Capability { - s := []Capability{} +func (w *yamlWorkflowSpec) steps() []yamlStep { + s := []yamlStep{} s = append(s, w.Actions...) s = append(s, w.Consensus...) s = append(s, w.Targets...) return s } +// workflow is a directed graph of nodes, where each node is a step. +// +// triggers are special steps that are stored separately, they're +// treated differently due to their nature of being the starting +// point of a workflow. type workflow struct { - graph.Graph[string, *node] + graph.Graph[string, *step] triggers []*triggerCapability - spec *workflowSpec + spec *yamlWorkflowSpec } -func (w *workflow) walkDo(start string, do func(n *node) error) error { +// NOTE: should we make this concurrent? +func (w *workflow) walkDo(start string, do func(n *step) error) error { var outerErr error err := graph.BFS(w.Graph, start, func(ref string) bool { n, err := w.Graph.Vertex(ref) @@ -65,8 +73,8 @@ func (w *workflow) walkDo(start string, do func(n *node) error) error { return outerErr } -func (w *workflow) adjacentNodes(start string) ([]*node, error) { - nodes := []*node{} +func (w *workflow) dependents(start string) ([]*step, error) { + nodes := []*step{} m, err := w.Graph.AdjacencyMap() if err != nil { return nil, err @@ -89,15 +97,16 @@ func (w *workflow) adjacentNodes(start string) ([]*node, error) { return nodes, nil } -type node struct { - Capability +// step wraps a yamlStep with additional context for dependencies and execution +type step struct { + yamlStep dependencies []string cachedCapability capabilities.CallbackExecutable cachedConfig *values.Map } type triggerCapability struct { - Capability + yamlStep cachedTrigger capabilities.TriggerCapability } @@ -106,7 +115,7 @@ const ( ) func Parse(yamlWorkflow string) (*workflow, error) { - wfs := &workflowSpec{} + wfs := &yamlWorkflowSpec{} err := yaml.Unmarshal([]byte(yamlWorkflow), wfs) if err != nil { return nil, err @@ -116,7 +125,7 @@ func Parse(yamlWorkflow string) (*workflow, error) { // empty graph with just one starting entry: `trigger`. // This provides the starting point for our graph and // points to all dependent nodes. - nodeHash := func(n *node) string { + nodeHash := func(n *step) string { return n.Ref } g := graph.New( @@ -124,8 +133,8 @@ func Parse(yamlWorkflow string) (*workflow, error) { graph.PreventCycles(), graph.Directed(), ) - err = g.AddVertex(&node{ - Capability: Capability{Ref: keywordTrigger}, + err = g.AddVertex(&step{ + yamlStep: yamlStep{Ref: keywordTrigger}, }) if err != nil { return nil, err @@ -136,7 +145,7 @@ func Parse(yamlWorkflow string) (*workflow, error) { s.Ref = s.Type } - err := g.AddVertex(&node{Capability: s}) + err := g.AddVertex(&step{yamlStep: s}) if err != nil { return nil, fmt.Errorf("cannot add vertex %s: %w", s.Ref, err) } @@ -169,7 +178,7 @@ func Parse(yamlWorkflow string) (*workflow, error) { triggerNodes := []*triggerCapability{} for _, t := range wfs.Triggers { triggerNodes = append(triggerNodes, &triggerCapability{ - Capability: t, + yamlStep: t, }) } wf := &workflow{