Skip to content

Commit

Permalink
add sync.Cond for statusChange
Browse files Browse the repository at this point in the history
  • Loading branch information
xuxife committed Sep 26, 2024
1 parent 1ab4796 commit bcfeae5
Showing 1 changed file with 15 additions and 4 deletions.
19 changes: 15 additions & 4 deletions workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ type Workflow struct {

steps map[Steper]*State // the internal states of Steps

leaseBucket chan struct{} // constraint max concurrency of running Steps
waitGroup sync.WaitGroup // to prevent goroutine leak
isRunning sync.Mutex // indicate whether the Workflow is running
statusChange *sync.Cond // a condition to signal the status change to proceed tick
leaseBucket chan struct{} // constraint max concurrency of running Steps
waitGroup sync.WaitGroup // to prevent goroutine leak
isRunning sync.Mutex // indicate whether the Workflow is running
}

// Add Steps into Workflow in phase Main.
Expand Down Expand Up @@ -241,6 +242,7 @@ func (w *Workflow) reset() {
if w.Clock == nil {
w.Clock = clock.New()
}
w.statusChange = sync.NewCond(new(sync.Mutex))
if w.MaxConcurrency > 0 {
// use buffered channel as a sized bucket
// a Step needs to create a lease in the bucket to run,
Expand Down Expand Up @@ -269,11 +271,14 @@ func (w *Workflow) Do(ctx context.Context) error {
return err
}
// each time one Step terminated, tick forward
w.statusChange.L.Lock()
for {
if done := w.tick(ctx); done {
break
}
w.statusChange.Wait()
}
w.statusChange.L.Unlock()
// ensure all goroutines are exited
w.waitGroup.Wait()
// return the error
Expand Down Expand Up @@ -371,7 +376,12 @@ func (w *Workflow) tick(ctx context.Context) bool {
cond = option.Condition
}
if nextStatus := cond(ctx, ups); nextStatus.IsTerminated() {
state.SetStatus(nextStatus)
w.waitGroup.Add(1)
go func() {
defer w.waitGroup.Done()
state.SetStatus(nextStatus)
w.statusChange.Signal()
}()
continue
}
// kick off the Step
Expand All @@ -388,6 +398,7 @@ func (w *Workflow) tick(ctx context.Context) bool {
)
defer func() {
state.SetStatus(status)
w.statusChange.Signal()
state.SetError(err)
}()

Expand Down

0 comments on commit bcfeae5

Please sign in to comment.