-
Notifications
You must be signed in to change notification settings - Fork 5
/
state.go
96 lines (93 loc) · 1.91 KB
/
state.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package flow
import (
"context"
"sync"
)
// State is the internal state of a Step in a Workflow.
//
// It has the status and the config (dependency, input, retry option, condition, timeout, etc.) of the step.
// The status could be read / write from different goroutines, so use RWMutex to protect it.
type State struct {
StepResult
Config *StepConfig
sync.RWMutex
}
func (s *State) GetStatus() StepStatus {
s.RLock()
defer s.RUnlock()
return s.Status
}
func (s *State) SetStatus(ss StepStatus) {
s.Lock()
defer s.Unlock()
s.Status = ss
}
func (s *State) GetError() error {
s.RLock()
defer s.RUnlock()
return s.Err
}
func (s *State) SetError(err error) {
s.Lock()
defer s.Unlock()
s.Err = err
}
func (s *State) GetStepResult() StepResult {
s.RLock()
defer s.RUnlock()
return s.StepResult
}
func (s *State) Upstreams() Set[Steper] {
if s.Config == nil {
return nil
}
return s.Config.Upstreams
}
func (s *State) Option() *StepOption {
opt := &StepOption{}
if s.Config != nil && s.Config.Option != nil {
for _, o := range s.Config.Option {
o(opt)
}
}
return opt
}
func (s *State) Before(ctx context.Context, step Steper) (context.Context, error) {
if s.Config == nil || len(s.Config.Before) == 0 {
return ctx, nil
}
for _, b := range s.Config.Before {
var err error
ctx, err = b(ctx, step)
if err != nil {
return ctx, err
}
}
return ctx, nil
}
func (s *State) After(ctx context.Context, step Steper, err error) error {
if s.Config == nil || len(s.Config.After) == 0 {
return err
}
for _, a := range s.Config.After {
err = a(ctx, step, err)
}
return err
}
func (s *State) AddUpstream(up Steper) {
if s.Config == nil {
s.Config = &StepConfig{}
}
if s.Config.Upstreams == nil {
s.Config.Upstreams = make(Set[Steper])
}
if up != nil {
s.Config.Upstreams.Add(up)
}
}
func (s *State) MergeConfig(sc *StepConfig) {
if s.Config == nil {
s.Config = &StepConfig{}
}
s.Config.Merge(sc)
}