Skip to content

Commit

Permalink
internal/workflow: change expansions to return values
Browse files Browse the repository at this point in the history
Expansions were kind of fun for the smaller workflows, but had a major
flaw: there was no way to get data out of an expansion. Put another way,
once a workflow entered an expansion it basically couldn't leave.
That was okay as long as the larger workflows were static, but we're
about to switch to LUCI for testing, and at that point we can't compile
in the list of builders. I suppose we could read them in at startup,
but I'd like to do less of that kind of thing, not more.

Change them to allow returning a single Value. That might prove
limiting, but I'd really prefer not to have a combinatorial explosion
of expansion API just yet.

Unfortunately, I didn't see an easy way to allow this and still do
validation that there aren't orphaned tasks. In practice, even the most
trivial test should catch the problem, so meh. (We didn't used to have
stall detection, at which point it mattered much more.)

For golang/go#63147

Change-Id: I855523a72b87b0ed8089066f246e5869f28e6c89
Reviewed-on: https://go-review.googlesource.com/c/build/+/533860
Reviewed-by: Dmitri Shuralyov <[email protected]>
Reviewed-by: Dmitri Shuralyov <[email protected]>
LUCI-TryBot-Result: Go LUCI <[email protected]>
Auto-Submit: Heschi Kreinick <[email protected]>
  • Loading branch information
heschi authored and gopherbot committed Oct 12, 2023
1 parent 8b8f944 commit 00a2f26
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 78 deletions.
22 changes: 11 additions & 11 deletions internal/task/tagx.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ func (x *TagXReposTasks) NewDefinition() *wf.Definition {
wd := wf.New()
reviewers := wf.Param(wd, reviewersParam)
repos := wf.Task0(wd, "Select repositories", x.SelectRepos)
wf.Expand2(wd, "Create plan", x.BuildPlan, repos, reviewers)
done := wf.Expand2(wd, "Create plan", x.BuildPlan, repos, reviewers)
wf.Output(wd, "done", done)
return wd
}

Expand All @@ -51,7 +52,8 @@ func (x *TagXReposTasks) NewSingleDefinition() *wf.Definition {
// TODO: optional is required to avoid the "required" check, but since it's a checkbox
// it's obviously yes/no, should probably be exempted from that check.
skipPostSubmit := wf.Param(wd, wf.ParamDef[bool]{Name: "Skip post submit result (optional)", ParamType: wf.Bool})
wf.Expand4(wd, "Create single-repo plan", x.BuildSingleRepoPlan, repos, name, skipPostSubmit, reviewers)
tagged := wf.Expand4(wd, "Create single-repo plan", x.BuildSingleRepoPlan, repos, name, skipPostSubmit, reviewers)
wf.Output(wd, "tagged repository", tagged)
return wd
}

Expand Down Expand Up @@ -278,7 +280,7 @@ func checkCycles1(reposByModule map[string]TagRepo, repo TagRepo, stack []string
}

// BuildPlan adds the tasks needed to update repos to wd.
func (x *TagXReposTasks) BuildPlan(wd *wf.Definition, repos []TagRepo, reviewers []string) error {
func (x *TagXReposTasks) BuildPlan(wd *wf.Definition, repos []TagRepo, reviewers []string) (wf.Value[string], error) {
// repo.ModPath to the wf.Value produced by planning it.
planned := map[string]wf.Value[TagRepo]{}

Expand All @@ -305,19 +307,18 @@ func (x *TagXReposTasks) BuildPlan(wd *wf.Definition, repos []TagRepo, reviewers
missing = append(missing, r.Name)
}
}
return fmt.Errorf("failed to progress the plan: todo: %v", missing)
return nil, fmt.Errorf("failed to progress the plan: todo: %v", missing)
}
}
var allDeps []wf.Dependency
for _, dep := range planned {
allDeps = append(allDeps, dep)
}
done := wf.Task0(wd, "done", func(_ context.Context) (string, error) { return "done!", nil }, wf.After(allDeps...))
wf.Output(wd, "done", done)
return nil
return done, nil
}

func (x *TagXReposTasks) BuildSingleRepoPlan(wd *wf.Definition, repoSlice []TagRepo, name string, skipPostSubmit bool, reviewers []string) error {
func (x *TagXReposTasks) BuildSingleRepoPlan(wd *wf.Definition, repoSlice []TagRepo, name string, skipPostSubmit bool, reviewers []string) (wf.Value[TagRepo], error) {
repos := map[string]TagRepo{}
plannedRepos := map[string]wf.Value[TagRepo]{}
for _, r := range repoSlice {
Expand All @@ -329,18 +330,17 @@ func (x *TagXReposTasks) BuildSingleRepoPlan(wd *wf.Definition, repoSlice []TagR
}
repo, ok := repos[name]
if !ok {
return fmt.Errorf("no repository %q", name)
return nil, fmt.Errorf("no repository %q", name)
}
tagged, ok := x.planRepo(wd, repo, plannedRepos, reviewers, skipPostSubmit)
if !ok {
var deps []string
for _, d := range repo.Deps {
deps = append(deps, d.ModPath)
}
return fmt.Errorf("%q doesn't have all of its dependencies (%q)", repo.Name, deps)
return nil, fmt.Errorf("%q doesn't have all of its dependencies (%q)", repo.Name, deps)
}
wf.Output(wd, "tagged repository", tagged)
return nil
return tagged, nil
}

// planRepo adds tasks to wf to update and possibly tag repo. It returns
Expand Down
116 changes: 61 additions & 55 deletions internal/workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (p parameter[T]) typ() reflect.Type {
return reflect.TypeOf(zero)
}
func (p parameter[T]) value(w *Workflow) reflect.Value { return reflect.ValueOf(w.params[p.d.Name]) }
func (p parameter[T]) dependencies() []*taskDefinition { return nil }
func (p parameter[T]) ready(w *Workflow) bool { return true }

// ParamType defines the type of a workflow parameter.
//
Expand Down Expand Up @@ -285,7 +285,7 @@ func (c *constant[T]) typ() reflect.Type {
return reflect.TypeOf(zero)
}
func (c *constant[T]) value(_ *Workflow) reflect.Value { return reflect.ValueOf(c.v) }
func (c *constant[T]) dependencies() []*taskDefinition { return nil }
func (c *constant[T]) ready(_ *Workflow) bool { return true }

// Slice combines multiple Values of the same type into a Value containing
// a slice of that type.
Expand All @@ -312,12 +312,13 @@ func (s *slice[T]) value(w *Workflow) reflect.Value {
return value
}

func (s *slice[T]) dependencies() []*taskDefinition {
var result []*taskDefinition
for _, v := range s.vals {
result = append(result, v.dependencies()...)
func (s *slice[T]) ready(w *Workflow) bool {
for _, val := range s.vals {
if !val.ready(w) {
return false
}
}
return result
return true
}

// Output registers a Value as a workflow output which will be returned when
Expand All @@ -328,21 +329,17 @@ func Output[T any](d *Definition, name string, v Value[T]) {

// A Dependency represents a dependency on a prior task.
type Dependency interface {
dependencies() []*taskDefinition
ready(*Workflow) bool
}

// After represents an ordering dependency on another Task or Action. It can be
// passed in addition to any arguments to the task's function.
func After(afters ...Dependency) TaskOption {
var deps []*taskDefinition
for _, a := range afters {
deps = append(deps, a.dependencies()...)
}
return &after{deps}
return &after{afters}
}

type after struct {
deps []*taskDefinition
deps []Dependency
}

func (a *after) taskOption() {}
Expand Down Expand Up @@ -382,7 +379,7 @@ func addFunc(d *Definition, name string, f interface{}, inputs []metaValue, opts
name = d.name(name)
td := &taskDefinition{name: name, f: f, args: inputs}
for _, input := range inputs {
td.deps = append(td.deps, input.dependencies()...)
td.deps = append(td.deps, input)
}
for _, opt := range opts {
td.deps = append(td.deps, opt.(*after).deps...)
Expand All @@ -401,9 +398,29 @@ func addAction(d *Definition, name string, f interface{}, inputs []metaValue, op
return &dependency{td}
}

func addExpansion(d *Definition, name string, f interface{}, inputs []metaValue, opts []TaskOption) {
func addExpansion[O1 any](d *Definition, name string, f interface{}, inputs []metaValue, opts []TaskOption) *expansionResult[O1] {
td := addFunc(d, name, f, inputs, opts)
td.isExpansion = true
return &expansionResult[O1]{td}
}

type expansionResult[T any] struct {
td *taskDefinition
}

func (er *expansionResult[T]) valueType(T) {}

func (er *expansionResult[T]) typ() reflect.Type {
var zero []T
return reflect.TypeOf(zero)
}

func (er *expansionResult[T]) value(w *Workflow) reflect.Value {
return w.tasks[er.td].resultValue.value(w)
}

func (er *expansionResult[T]) ready(w *Workflow) bool {
return w.taskReady(er.td) && w.tasks[er.td].resultValue.ready(w)
}

// ActionN adds an Action to the workflow definition. Its behavior and
Expand Down Expand Up @@ -437,8 +454,8 @@ type dependency struct {
task *taskDefinition
}

func (d *dependency) dependencies() []*taskDefinition {
return []*taskDefinition{d.task}
func (d *dependency) ready(w *Workflow) bool {
return w.taskReady(d.task)
}

// ExpandN adds a workflow expansion task to the workflow definition.
Expand All @@ -450,28 +467,28 @@ func (d *dependency) dependencies() []*taskDefinition {
//
// Running more than one expansion concurrently is an error and will corrupt
// the workflow.
func Expand0(d *Definition, name string, f func(*Definition) error, opts ...TaskOption) {
addExpansion(d, name, f, nil, opts)
func Expand0[O1 any](d *Definition, name string, f func(*Definition) (Value[O1], error), opts ...TaskOption) Value[O1] {
return addExpansion[O1](d, name, f, nil, opts)
}

func Expand1[I1 any](d *Definition, name string, f func(*Definition, I1) error, i1 Value[I1], opts ...TaskOption) {
addExpansion(d, name, f, []metaValue{i1}, opts)
func Expand1[I1, O1 any](d *Definition, name string, f func(*Definition, I1) (Value[O1], error), i1 Value[I1], opts ...TaskOption) Value[O1] {
return addExpansion[O1](d, name, f, []metaValue{i1}, opts)
}

func Expand2[I1, I2 any](d *Definition, name string, f func(*Definition, I1, I2) error, i1 Value[I1], i2 Value[I2], opts ...TaskOption) {
addExpansion(d, name, f, []metaValue{i1, i2}, opts)
func Expand2[I1, I2, O1 any](d *Definition, name string, f func(*Definition, I1, I2) (Value[O1], error), i1 Value[I1], i2 Value[I2], opts ...TaskOption) Value[O1] {
return addExpansion[O1](d, name, f, []metaValue{i1, i2}, opts)
}

func Expand3[I1, I2, I3 any](d *Definition, name string, f func(*Definition, I1, I2, I3) error, i1 Value[I1], i2 Value[I2], i3 Value[I3], opts ...TaskOption) {
addExpansion(d, name, f, []metaValue{i1, i2, i3}, opts)
func Expand3[I1, I2, I3, O1 any](d *Definition, name string, f func(*Definition, I1, I2, I3) (Value[O1], error), i1 Value[I1], i2 Value[I2], i3 Value[I3], opts ...TaskOption) Value[O1] {
return addExpansion[O1](d, name, f, []metaValue{i1, i2, i3}, opts)
}

func Expand4[I1, I2, I3, I4 any](d *Definition, name string, f func(*Definition, I1, I2, I3, I4) error, i1 Value[I1], i2 Value[I2], i3 Value[I3], i4 Value[I4], opts ...TaskOption) {
addExpansion(d, name, f, []metaValue{i1, i2, i3, i4}, opts)
func Expand4[I1, I2, I3, I4, O1 any](d *Definition, name string, f func(*Definition, I1, I2, I3, I4) (Value[O1], error), i1 Value[I1], i2 Value[I2], i3 Value[I3], i4 Value[I4], opts ...TaskOption) Value[O1] {
return addExpansion[O1](d, name, f, []metaValue{i1, i2, i3, i4}, opts)
}

func Expand5[I1, I2, I3, I4, I5 any](d *Definition, name string, f func(*Definition, I1, I2, I3, I4, I5) error, i1 Value[I1], i2 Value[I2], i3 Value[I3], i4 Value[I4], i5 Value[I5], opts ...TaskOption) {
addExpansion(d, name, f, []metaValue{i1, i2, i3, i4, i5}, opts)
func Expand5[I1, I2, I3, I4, I5, O1 any](d *Definition, name string, f func(*Definition, I1, I2, I3, I4, I5) (Value[O1], error), i1 Value[I1], i2 Value[I2], i3 Value[I3], i4 Value[I4], i5 Value[I5], opts ...TaskOption) Value[O1] {
return addExpansion[O1](d, name, f, []metaValue{i1, i2, i3, i4, i5}, opts)
}

// A TaskContext is a context.Context, plus workflow-related features.
Expand Down Expand Up @@ -564,7 +581,7 @@ type taskDefinition struct {
name string
isExpansion bool
args []metaValue
deps []*taskDefinition
deps []Dependency
f interface{}
}

Expand All @@ -583,8 +600,8 @@ func (tr *taskResult[T]) value(w *Workflow) reflect.Value {
return reflect.ValueOf(w.tasks[tr.task].result)
}

func (tr *taskResult[T]) dependencies() []*taskDefinition {
return []*taskDefinition{tr.task}
func (tr *taskResult[T]) ready(w *Workflow) bool {
return w.taskReady(tr.task)
}

// A Workflow is an instantiated workflow instance, ready to run.
Expand All @@ -605,6 +622,11 @@ type Workflow struct {
pendingStates map[string]*TaskState
}

func (w *Workflow) taskReady(td *taskDefinition) bool {
state := w.tasks[td]
return state.finished && state.err == nil
}

type taskState struct {
def *taskDefinition
created bool
Expand All @@ -618,7 +640,8 @@ type taskState struct {
retryCount int

// workflow expansion
expanded *Definition
expanded *Definition
resultValue metaValue
}

func (t *taskState) toExported() *TaskState {
Expand Down Expand Up @@ -655,24 +678,6 @@ func Start(def *Definition, params map[string]interface{}) (*Workflow, error) {
}

func (w *Workflow) validate() error {
// Validate tasks.
used := map[*taskDefinition]bool{}
for _, taskDef := range w.def.tasks {
for _, dep := range taskDef.deps {
used[dep] = true
}
}
for _, output := range w.def.outputs {
for _, dep := range output.dependencies() {
used[dep] = true
}
}
for _, task := range w.def.tasks {
if !used[task] && !task.isExpansion {
return fmt.Errorf("task %v is not referenced and should be deleted", task.name)
}
}

// Validate parameters.
if got, want := len(w.params), len(w.def.parameters); got != want {
return fmt.Errorf("parameter count mismatch: workflow instance has %d, but definition has %d", got, want)
Expand Down Expand Up @@ -868,7 +873,7 @@ func (w *Workflow) Run(ctx context.Context, listener Listener) (map[string]inter

func (w *Workflow) taskArgs(def *taskDefinition) ([]reflect.Value, bool) {
for _, dep := range def.deps {
if depState, ok := w.tasks[dep]; !ok || !depState.finished || depState.err != nil {
if !dep.ready(w) {
return nil, false
}
}
Expand Down Expand Up @@ -933,10 +938,11 @@ func runExpansion(d *Definition, state taskState, args []reflect.Value) taskStat
fv := reflect.ValueOf(state.def.f)
out := fv.Call(in)
state.finished = true
if out[0].IsNil() {
if out[1].IsNil() {
state.expanded = d
state.resultValue = out[0].Interface().(metaValue)
} else {
state.err = out[0].Interface().(error)
state.err = out[1].Interface().(error)
}
return state
}
Expand Down
23 changes: 11 additions & 12 deletions internal/workflow/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,14 +229,13 @@ func TestExpansion(t *testing.T) {
v1 := wf.Task0(wd, "first", first)
v2 := wf.Task0(wd, "second", second)
wf.Output(wd, "second", v2)
wf.Expand1(wd, "add a task", func(wd *wf.Definition, arg string) error {
joined := wf.Expand1(wd, "add a task", func(wd *wf.Definition, arg string) (wf.Value[string], error) {
v3 := wf.Task0(wd, "third", third)
// v1 is resolved before the expansion runs, v2 and v3 are dependencies
// created outside and inside the epansion.
joined := wf.Task1(wd, "join", join, wf.Slice(wf.Const(arg), v2, v3))
wf.Output(wd, "final value", joined)
return nil
return wf.Task1(wd, "join", join, wf.Slice(wf.Const(arg), v2, v3)), nil
}, v1)
wf.Output(wd, "final value", joined)

w := startWorkflow(t, wd, nil)
outputs := runWorkflow(t, w, nil)
Expand All @@ -252,10 +251,10 @@ func TestResumeExpansion(t *testing.T) {
return "", nil
}
wd := wf.New()
wf.Expand0(wd, "expand", func(wd *wf.Definition) error {
wf.Output(wd, "result", wf.Task0(wd, "succeeds", succeeds))
return nil
result := wf.Expand0(wd, "expand", func(wd *wf.Definition) (wf.Value[string], error) {
return wf.Task0(wd, "succeeds", succeeds), nil
})
wf.Output(wd, "result", result)

storage := &mapListener{Listener: &verboseListener{t}}
w := startWorkflow(t, wd, nil)
Expand All @@ -273,16 +272,16 @@ func TestResumeExpansion(t *testing.T) {
func TestRetryExpansion(t *testing.T) {
counter := 0
wd := wf.New()
wf.Expand0(wd, "expand", func(wd *wf.Definition) error {
out := wf.Expand0(wd, "expand", func(wd *wf.Definition) (wf.Value[string], error) {
counter++
if counter == 1 {
return fmt.Errorf("first try fail")
return nil, fmt.Errorf("first try fail")
}
wf.Output(wd, "out", wf.Task0(wd, "hi", func(_ context.Context) (string, error) {
return wf.Task0(wd, "hi", func(_ context.Context) (string, error) {
return "", nil
}))
return nil
}), nil
})
wf.Output(wd, "out", out)

w := startWorkflow(t, wd, nil)
retry := func(string) {
Expand Down

0 comments on commit 00a2f26

Please sign in to comment.