From 00a2f2605c0ab1ff62f061f3b66044182e59cce1 Mon Sep 17 00:00:00 2001 From: Heschi Kreinick Date: Mon, 9 Oct 2023 20:08:22 -0400 Subject: [PATCH] internal/workflow: change expansions to return values Expansions were kind of fun for the smaller workflows, but had a major flaw: there was no way to get data out of an expansion. Put another way, once a workflow entered an expansion it basically couldn't leave. That was okay as long as the larger workflows were static, but we're about to switch to LUCI for testing, and at that point we can't compile in the list of builders. I suppose we could read them in at startup, but I'd like to do less of that kind of thing, not more. Change them to allow returning a single Value. That might prove limiting, but I'd really prefer not to have a combinatorial explosion of expansion API just yet. Unfortunately, I didn't see an easy way to allow this and still do validation that there aren't orphaned tasks. In practice, even the most trivial test should catch the problem, so meh. (We didn't used to have stall detection, at which point it mattered much more.) For golang/go#63147 Change-Id: I855523a72b87b0ed8089066f246e5869f28e6c89 Reviewed-on: https://go-review.googlesource.com/c/build/+/533860 Reviewed-by: Dmitri Shuralyov Reviewed-by: Dmitri Shuralyov LUCI-TryBot-Result: Go LUCI Auto-Submit: Heschi Kreinick --- internal/task/tagx.go | 22 +++--- internal/workflow/workflow.go | 116 +++++++++++++++-------------- internal/workflow/workflow_test.go | 23 +++--- 3 files changed, 83 insertions(+), 78 deletions(-) diff --git a/internal/task/tagx.go b/internal/task/tagx.go index b8e4368e27..4a473015a4 100644 --- a/internal/task/tagx.go +++ b/internal/task/tagx.go @@ -39,7 +39,8 @@ func (x *TagXReposTasks) NewDefinition() *wf.Definition { wd := wf.New() reviewers := wf.Param(wd, reviewersParam) repos := wf.Task0(wd, "Select repositories", x.SelectRepos) - wf.Expand2(wd, "Create plan", x.BuildPlan, repos, reviewers) + done := wf.Expand2(wd, "Create plan", x.BuildPlan, repos, reviewers) + wf.Output(wd, "done", done) return wd } @@ -51,7 +52,8 @@ func (x *TagXReposTasks) NewSingleDefinition() *wf.Definition { // TODO: optional is required to avoid the "required" check, but since it's a checkbox // it's obviously yes/no, should probably be exempted from that check. skipPostSubmit := wf.Param(wd, wf.ParamDef[bool]{Name: "Skip post submit result (optional)", ParamType: wf.Bool}) - wf.Expand4(wd, "Create single-repo plan", x.BuildSingleRepoPlan, repos, name, skipPostSubmit, reviewers) + tagged := wf.Expand4(wd, "Create single-repo plan", x.BuildSingleRepoPlan, repos, name, skipPostSubmit, reviewers) + wf.Output(wd, "tagged repository", tagged) return wd } @@ -278,7 +280,7 @@ func checkCycles1(reposByModule map[string]TagRepo, repo TagRepo, stack []string } // BuildPlan adds the tasks needed to update repos to wd. -func (x *TagXReposTasks) BuildPlan(wd *wf.Definition, repos []TagRepo, reviewers []string) error { +func (x *TagXReposTasks) BuildPlan(wd *wf.Definition, repos []TagRepo, reviewers []string) (wf.Value[string], error) { // repo.ModPath to the wf.Value produced by planning it. planned := map[string]wf.Value[TagRepo]{} @@ -305,7 +307,7 @@ func (x *TagXReposTasks) BuildPlan(wd *wf.Definition, repos []TagRepo, reviewers missing = append(missing, r.Name) } } - return fmt.Errorf("failed to progress the plan: todo: %v", missing) + return nil, fmt.Errorf("failed to progress the plan: todo: %v", missing) } } var allDeps []wf.Dependency @@ -313,11 +315,10 @@ func (x *TagXReposTasks) BuildPlan(wd *wf.Definition, repos []TagRepo, reviewers allDeps = append(allDeps, dep) } done := wf.Task0(wd, "done", func(_ context.Context) (string, error) { return "done!", nil }, wf.After(allDeps...)) - wf.Output(wd, "done", done) - return nil + return done, nil } -func (x *TagXReposTasks) BuildSingleRepoPlan(wd *wf.Definition, repoSlice []TagRepo, name string, skipPostSubmit bool, reviewers []string) error { +func (x *TagXReposTasks) BuildSingleRepoPlan(wd *wf.Definition, repoSlice []TagRepo, name string, skipPostSubmit bool, reviewers []string) (wf.Value[TagRepo], error) { repos := map[string]TagRepo{} plannedRepos := map[string]wf.Value[TagRepo]{} for _, r := range repoSlice { @@ -329,7 +330,7 @@ func (x *TagXReposTasks) BuildSingleRepoPlan(wd *wf.Definition, repoSlice []TagR } repo, ok := repos[name] if !ok { - return fmt.Errorf("no repository %q", name) + return nil, fmt.Errorf("no repository %q", name) } tagged, ok := x.planRepo(wd, repo, plannedRepos, reviewers, skipPostSubmit) if !ok { @@ -337,10 +338,9 @@ func (x *TagXReposTasks) BuildSingleRepoPlan(wd *wf.Definition, repoSlice []TagR for _, d := range repo.Deps { deps = append(deps, d.ModPath) } - return fmt.Errorf("%q doesn't have all of its dependencies (%q)", repo.Name, deps) + return nil, fmt.Errorf("%q doesn't have all of its dependencies (%q)", repo.Name, deps) } - wf.Output(wd, "tagged repository", tagged) - return nil + return tagged, nil } // planRepo adds tasks to wf to update and possibly tag repo. It returns diff --git a/internal/workflow/workflow.go b/internal/workflow/workflow.go index f9fd85876d..df235268a0 100644 --- a/internal/workflow/workflow.go +++ b/internal/workflow/workflow.go @@ -187,7 +187,7 @@ func (p parameter[T]) typ() reflect.Type { return reflect.TypeOf(zero) } func (p parameter[T]) value(w *Workflow) reflect.Value { return reflect.ValueOf(w.params[p.d.Name]) } -func (p parameter[T]) dependencies() []*taskDefinition { return nil } +func (p parameter[T]) ready(w *Workflow) bool { return true } // ParamType defines the type of a workflow parameter. // @@ -285,7 +285,7 @@ func (c *constant[T]) typ() reflect.Type { return reflect.TypeOf(zero) } func (c *constant[T]) value(_ *Workflow) reflect.Value { return reflect.ValueOf(c.v) } -func (c *constant[T]) dependencies() []*taskDefinition { return nil } +func (c *constant[T]) ready(_ *Workflow) bool { return true } // Slice combines multiple Values of the same type into a Value containing // a slice of that type. @@ -312,12 +312,13 @@ func (s *slice[T]) value(w *Workflow) reflect.Value { return value } -func (s *slice[T]) dependencies() []*taskDefinition { - var result []*taskDefinition - for _, v := range s.vals { - result = append(result, v.dependencies()...) +func (s *slice[T]) ready(w *Workflow) bool { + for _, val := range s.vals { + if !val.ready(w) { + return false + } } - return result + return true } // Output registers a Value as a workflow output which will be returned when @@ -328,21 +329,17 @@ func Output[T any](d *Definition, name string, v Value[T]) { // A Dependency represents a dependency on a prior task. type Dependency interface { - dependencies() []*taskDefinition + ready(*Workflow) bool } // After represents an ordering dependency on another Task or Action. It can be // passed in addition to any arguments to the task's function. func After(afters ...Dependency) TaskOption { - var deps []*taskDefinition - for _, a := range afters { - deps = append(deps, a.dependencies()...) - } - return &after{deps} + return &after{afters} } type after struct { - deps []*taskDefinition + deps []Dependency } func (a *after) taskOption() {} @@ -382,7 +379,7 @@ func addFunc(d *Definition, name string, f interface{}, inputs []metaValue, opts name = d.name(name) td := &taskDefinition{name: name, f: f, args: inputs} for _, input := range inputs { - td.deps = append(td.deps, input.dependencies()...) + td.deps = append(td.deps, input) } for _, opt := range opts { td.deps = append(td.deps, opt.(*after).deps...) @@ -401,9 +398,29 @@ func addAction(d *Definition, name string, f interface{}, inputs []metaValue, op return &dependency{td} } -func addExpansion(d *Definition, name string, f interface{}, inputs []metaValue, opts []TaskOption) { +func addExpansion[O1 any](d *Definition, name string, f interface{}, inputs []metaValue, opts []TaskOption) *expansionResult[O1] { td := addFunc(d, name, f, inputs, opts) td.isExpansion = true + return &expansionResult[O1]{td} +} + +type expansionResult[T any] struct { + td *taskDefinition +} + +func (er *expansionResult[T]) valueType(T) {} + +func (er *expansionResult[T]) typ() reflect.Type { + var zero []T + return reflect.TypeOf(zero) +} + +func (er *expansionResult[T]) value(w *Workflow) reflect.Value { + return w.tasks[er.td].resultValue.value(w) +} + +func (er *expansionResult[T]) ready(w *Workflow) bool { + return w.taskReady(er.td) && w.tasks[er.td].resultValue.ready(w) } // ActionN adds an Action to the workflow definition. Its behavior and @@ -437,8 +454,8 @@ type dependency struct { task *taskDefinition } -func (d *dependency) dependencies() []*taskDefinition { - return []*taskDefinition{d.task} +func (d *dependency) ready(w *Workflow) bool { + return w.taskReady(d.task) } // ExpandN adds a workflow expansion task to the workflow definition. @@ -450,28 +467,28 @@ func (d *dependency) dependencies() []*taskDefinition { // // Running more than one expansion concurrently is an error and will corrupt // the workflow. -func Expand0(d *Definition, name string, f func(*Definition) error, opts ...TaskOption) { - addExpansion(d, name, f, nil, opts) +func Expand0[O1 any](d *Definition, name string, f func(*Definition) (Value[O1], error), opts ...TaskOption) Value[O1] { + return addExpansion[O1](d, name, f, nil, opts) } -func Expand1[I1 any](d *Definition, name string, f func(*Definition, I1) error, i1 Value[I1], opts ...TaskOption) { - addExpansion(d, name, f, []metaValue{i1}, opts) +func Expand1[I1, O1 any](d *Definition, name string, f func(*Definition, I1) (Value[O1], error), i1 Value[I1], opts ...TaskOption) Value[O1] { + return addExpansion[O1](d, name, f, []metaValue{i1}, opts) } -func Expand2[I1, I2 any](d *Definition, name string, f func(*Definition, I1, I2) error, i1 Value[I1], i2 Value[I2], opts ...TaskOption) { - addExpansion(d, name, f, []metaValue{i1, i2}, opts) +func Expand2[I1, I2, O1 any](d *Definition, name string, f func(*Definition, I1, I2) (Value[O1], error), i1 Value[I1], i2 Value[I2], opts ...TaskOption) Value[O1] { + return addExpansion[O1](d, name, f, []metaValue{i1, i2}, opts) } -func Expand3[I1, I2, I3 any](d *Definition, name string, f func(*Definition, I1, I2, I3) error, i1 Value[I1], i2 Value[I2], i3 Value[I3], opts ...TaskOption) { - addExpansion(d, name, f, []metaValue{i1, i2, i3}, opts) +func Expand3[I1, I2, I3, O1 any](d *Definition, name string, f func(*Definition, I1, I2, I3) (Value[O1], error), i1 Value[I1], i2 Value[I2], i3 Value[I3], opts ...TaskOption) Value[O1] { + return addExpansion[O1](d, name, f, []metaValue{i1, i2, i3}, opts) } -func Expand4[I1, I2, I3, I4 any](d *Definition, name string, f func(*Definition, I1, I2, I3, I4) error, i1 Value[I1], i2 Value[I2], i3 Value[I3], i4 Value[I4], opts ...TaskOption) { - addExpansion(d, name, f, []metaValue{i1, i2, i3, i4}, opts) +func Expand4[I1, I2, I3, I4, O1 any](d *Definition, name string, f func(*Definition, I1, I2, I3, I4) (Value[O1], error), i1 Value[I1], i2 Value[I2], i3 Value[I3], i4 Value[I4], opts ...TaskOption) Value[O1] { + return addExpansion[O1](d, name, f, []metaValue{i1, i2, i3, i4}, opts) } -func Expand5[I1, I2, I3, I4, I5 any](d *Definition, name string, f func(*Definition, I1, I2, I3, I4, I5) error, i1 Value[I1], i2 Value[I2], i3 Value[I3], i4 Value[I4], i5 Value[I5], opts ...TaskOption) { - addExpansion(d, name, f, []metaValue{i1, i2, i3, i4, i5}, opts) +func Expand5[I1, I2, I3, I4, I5, O1 any](d *Definition, name string, f func(*Definition, I1, I2, I3, I4, I5) (Value[O1], error), i1 Value[I1], i2 Value[I2], i3 Value[I3], i4 Value[I4], i5 Value[I5], opts ...TaskOption) Value[O1] { + return addExpansion[O1](d, name, f, []metaValue{i1, i2, i3, i4, i5}, opts) } // A TaskContext is a context.Context, plus workflow-related features. @@ -564,7 +581,7 @@ type taskDefinition struct { name string isExpansion bool args []metaValue - deps []*taskDefinition + deps []Dependency f interface{} } @@ -583,8 +600,8 @@ func (tr *taskResult[T]) value(w *Workflow) reflect.Value { return reflect.ValueOf(w.tasks[tr.task].result) } -func (tr *taskResult[T]) dependencies() []*taskDefinition { - return []*taskDefinition{tr.task} +func (tr *taskResult[T]) ready(w *Workflow) bool { + return w.taskReady(tr.task) } // A Workflow is an instantiated workflow instance, ready to run. @@ -605,6 +622,11 @@ type Workflow struct { pendingStates map[string]*TaskState } +func (w *Workflow) taskReady(td *taskDefinition) bool { + state := w.tasks[td] + return state.finished && state.err == nil +} + type taskState struct { def *taskDefinition created bool @@ -618,7 +640,8 @@ type taskState struct { retryCount int // workflow expansion - expanded *Definition + expanded *Definition + resultValue metaValue } func (t *taskState) toExported() *TaskState { @@ -655,24 +678,6 @@ func Start(def *Definition, params map[string]interface{}) (*Workflow, error) { } func (w *Workflow) validate() error { - // Validate tasks. - used := map[*taskDefinition]bool{} - for _, taskDef := range w.def.tasks { - for _, dep := range taskDef.deps { - used[dep] = true - } - } - for _, output := range w.def.outputs { - for _, dep := range output.dependencies() { - used[dep] = true - } - } - for _, task := range w.def.tasks { - if !used[task] && !task.isExpansion { - return fmt.Errorf("task %v is not referenced and should be deleted", task.name) - } - } - // Validate parameters. if got, want := len(w.params), len(w.def.parameters); got != want { return fmt.Errorf("parameter count mismatch: workflow instance has %d, but definition has %d", got, want) @@ -868,7 +873,7 @@ func (w *Workflow) Run(ctx context.Context, listener Listener) (map[string]inter func (w *Workflow) taskArgs(def *taskDefinition) ([]reflect.Value, bool) { for _, dep := range def.deps { - if depState, ok := w.tasks[dep]; !ok || !depState.finished || depState.err != nil { + if !dep.ready(w) { return nil, false } } @@ -933,10 +938,11 @@ func runExpansion(d *Definition, state taskState, args []reflect.Value) taskStat fv := reflect.ValueOf(state.def.f) out := fv.Call(in) state.finished = true - if out[0].IsNil() { + if out[1].IsNil() { state.expanded = d + state.resultValue = out[0].Interface().(metaValue) } else { - state.err = out[0].Interface().(error) + state.err = out[1].Interface().(error) } return state } diff --git a/internal/workflow/workflow_test.go b/internal/workflow/workflow_test.go index 997609ec5e..c569375f0e 100644 --- a/internal/workflow/workflow_test.go +++ b/internal/workflow/workflow_test.go @@ -229,14 +229,13 @@ func TestExpansion(t *testing.T) { v1 := wf.Task0(wd, "first", first) v2 := wf.Task0(wd, "second", second) wf.Output(wd, "second", v2) - wf.Expand1(wd, "add a task", func(wd *wf.Definition, arg string) error { + joined := wf.Expand1(wd, "add a task", func(wd *wf.Definition, arg string) (wf.Value[string], error) { v3 := wf.Task0(wd, "third", third) // v1 is resolved before the expansion runs, v2 and v3 are dependencies // created outside and inside the epansion. - joined := wf.Task1(wd, "join", join, wf.Slice(wf.Const(arg), v2, v3)) - wf.Output(wd, "final value", joined) - return nil + return wf.Task1(wd, "join", join, wf.Slice(wf.Const(arg), v2, v3)), nil }, v1) + wf.Output(wd, "final value", joined) w := startWorkflow(t, wd, nil) outputs := runWorkflow(t, w, nil) @@ -252,10 +251,10 @@ func TestResumeExpansion(t *testing.T) { return "", nil } wd := wf.New() - wf.Expand0(wd, "expand", func(wd *wf.Definition) error { - wf.Output(wd, "result", wf.Task0(wd, "succeeds", succeeds)) - return nil + result := wf.Expand0(wd, "expand", func(wd *wf.Definition) (wf.Value[string], error) { + return wf.Task0(wd, "succeeds", succeeds), nil }) + wf.Output(wd, "result", result) storage := &mapListener{Listener: &verboseListener{t}} w := startWorkflow(t, wd, nil) @@ -273,16 +272,16 @@ func TestResumeExpansion(t *testing.T) { func TestRetryExpansion(t *testing.T) { counter := 0 wd := wf.New() - wf.Expand0(wd, "expand", func(wd *wf.Definition) error { + out := wf.Expand0(wd, "expand", func(wd *wf.Definition) (wf.Value[string], error) { counter++ if counter == 1 { - return fmt.Errorf("first try fail") + return nil, fmt.Errorf("first try fail") } - wf.Output(wd, "out", wf.Task0(wd, "hi", func(_ context.Context) (string, error) { + return wf.Task0(wd, "hi", func(_ context.Context) (string, error) { return "", nil - })) - return nil + }), nil }) + wf.Output(wd, "out", out) w := startWorkflow(t, wd, nil) retry := func(string) {