Skip to content

Commit

Permalink
Fix race condition with eager workflow start and close (#1191)
Browse files Browse the repository at this point in the history
  • Loading branch information
Quinn-With-Two-Ns authored Aug 8, 2023
1 parent 23ff5a7 commit 793489f
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 4 deletions.
6 changes: 3 additions & 3 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -951,6 +951,9 @@ func (aw *AggregatedWorker) Start() error {
if err := aw.workflowWorker.Start(); err != nil {
return err
}
if aw.client.eagerDispatcher != nil {
aw.client.eagerDispatcher.registerWorker(aw.workflowWorker)
}
}
if !util.IsInterfaceNil(aw.activityWorker) {
if err := aw.activityWorker.Start(); err != nil {
Expand Down Expand Up @@ -1568,9 +1571,6 @@ func NewAggregatedWorker(client *WorkflowClient, taskQueue string, options Worke
} else {
workflowWorker = newWorkflowWorker(client.workflowService, workerParams, nil, registry)
}
if client.eagerDispatcher != nil {
client.eagerDispatcher.registerWorker(workflowWorker)
}
}

// activity types.
Expand Down
2 changes: 1 addition & 1 deletion internal/internal_worker_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func (bw *baseWorker) runPoller() {
}

func (bw *baseWorker) tryReserveSlot() bool {
if !bw.isWorkerStarted || bw.isStop() {
if bw.isStop() {
return false
}
// Reserve a executor slot via a non-blocking attempt to take a poller
Expand Down
20 changes: 20 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1295,6 +1295,26 @@ func (ts *IntegrationTestSuite) TestBasicSession() {
ts.tracer.GetTrace("BasicSession"))
}

func (ts *IntegrationTestSuite) TestEagerWorkflowDispatchRaceWithWorkerStop() {
// Attempt to stop a worker while trying to schedule an eager workflow task
var wg sync.WaitGroup
wg.Add(2)
go func() {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, ctxTimeout)
defer cancel()
_, err := ts.client.ExecuteWorkflow(ctx, ts.startWorkflowOptions("test-basic-session"), ts.workflows.SimplestWorkflow)
ts.NoError(err)
wg.Done()
}()
go func() {
ts.worker.Stop()
ts.workerStopped = true
wg.Done()
}()
wg.Wait()
}

func (ts *IntegrationTestSuite) TestSessionStateFailedWorkerFailed() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
Expand Down

0 comments on commit 793489f

Please sign in to comment.