Skip to content

Commit

Permalink
Fix race condition in eager dispatch (#1195)
Browse files Browse the repository at this point in the history
Fix race condition in eager dispatch
  • Loading branch information
Quinn-With-Two-Ns authored Aug 10, 2023
1 parent 654e872 commit 1fccf9c
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 22 deletions.
6 changes: 3 additions & 3 deletions internal/internal_eager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type eagerWorker interface {
tryReserveSlot() bool
// releaseSlot release a task slot acquired by tryReserveSlot
releaseSlot()
// processTaskAsync process a new task on the worker asynchronously and
// call callback once complete
processTaskAsync(task interface{}, callback func())
// pushEagerTask pushes a new eager workflow task to the workers task queue.
// should only be called with a reserved slot.
pushEagerTask(task eagerTask)
}
21 changes: 12 additions & 9 deletions internal/internal_eager_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,17 @@ func (e *eagerActivityExecutor) handleResponse(
// Start each activity asynchronously
for _, activity := range resp.GetActivityTasks() {
// Asynchronously execute
task := &activityTask{activity}
e.activityWorker.processTaskAsync(task, func() {
// The processTaskAsync does not do this itself because our task is *activityTask, not *polledTask.
e.activityWorker.releaseSlot()
// Decrement executing count
e.countLock.Lock()
e.heldSlotCount--
e.countLock.Unlock()
})
e.activityWorker.pushEagerTask(
eagerTask{
task: &activityTask{activity},
callback: func() {
// The processTaskAsync does not do this itself because our task is *activityTask, not *polledTask.
e.activityWorker.releaseSlot()
// Decrement executing count
e.countLock.Lock()
e.heldSlotCount--
e.countLock.Unlock()
},
})
}
}
1 change: 1 addition & 0 deletions internal/internal_eager_activity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func TestEagerActivityCounts(t *testing.T) {
activityWorker := newActivityWorker(nil,
workerExecutionParameters{TaskQueue: "task-queue1", ConcurrentActivityExecutionSize: 5}, nil, newRegistry(), nil)
activityWorker.worker.isWorkerStarted = true
go activityWorker.worker.runEagerTaskDispatcher()

exec.activityWorker = activityWorker.worker
// Fill up the poller request channel
Expand Down
15 changes: 8 additions & 7 deletions internal/internal_eager_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,14 @@ func (e *eagerWorkflowExecutor) handleResponse(response *workflowservice.PollWor
panic("eagerWorkflowExecutor trying to handle multiple responses")
}
// Asynchronously execute the task
task := &eagerWorkflowTask{
task: response,
}
e.worker.processTaskAsync(task, func() {
// The processTaskAsync does not do this itself because our task is *eagerWorkflowTask, not *polledTask.
e.worker.releaseSlot()
})
e.worker.pushEagerTask(
eagerTask{
task: &eagerWorkflowTask{
task: response,
},
// The processTaskAsync does not do this itself because our task is *eagerWorkflowTask, not *polledTask.
callback: e.worker.releaseSlot,
})
}

// release the executor task slot this eagerWorkflowExecutor was holding.
Expand Down
4 changes: 2 additions & 2 deletions internal/internal_eager_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ func (e *eagerWorkerMock) releaseSlot() {
e.releaseSlotCallback()
}

func (e *eagerWorkerMock) processTaskAsync(task interface{}, callback func()) {
e.processTaskAsyncCallback(task, callback)
func (e *eagerWorkerMock) pushEagerTask(task eagerTask) {
e.processTaskAsyncCallback(task, task.callback)
}

func TestEagerWorkflowDispatchNoWorkerOnTaskQueue(t *testing.T) {
Expand Down
38 changes: 37 additions & 1 deletion internal/internal_worker_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ type (

pollerRequestCh chan struct{}
taskQueueCh chan interface{}
eagerTaskQueueCh chan eagerTask
fatalErrCb func(error)
sessionTokenBucket *sessionTokenBucket

Expand All @@ -192,6 +193,13 @@ type (
polledTask struct {
task interface{}
}

eagerTask struct {
// task to process.
task interface{}
// callback to run once the task is processed.
callback func()
}
)

// SetRetryLongPollGracePeriod sets the amount of time a long poller retries on
Expand Down Expand Up @@ -240,7 +248,8 @@ func newBaseWorker(
metricsHandler: metricsHandler.WithTags(metrics.WorkerTags(options.workerType)),
taskSlotsAvailable: int32(options.maxConcurrentTask),
pollerRequestCh: make(chan struct{}, options.maxConcurrentTask),
taskQueueCh: make(chan interface{}), // no buffer, so poller only able to poll new task after previous is dispatched.
taskQueueCh: make(chan interface{}), // no buffer, so poller only able to poll new task after previous is dispatched.
eagerTaskQueueCh: make(chan eagerTask, options.maxConcurrentTask), // allow enough capacity so that eager dispatch will not block
fatalErrCb: options.fatalErrCb,

limiterContext: ctx,
Expand Down Expand Up @@ -274,6 +283,9 @@ func (bw *baseWorker) Start() {
bw.stopWG.Add(1)
go bw.runTaskDispatcher()

bw.stopWG.Add(1)
go bw.runEagerTaskDispatcher()

bw.isWorkerStarted = true
traceLog(func() {
bw.logger.Info("Started Worker",
Expand Down Expand Up @@ -330,6 +342,11 @@ func (bw *baseWorker) releaseSlot() {
bw.pollerRequestCh <- struct{}{}
}

func (bw *baseWorker) pushEagerTask(task eagerTask) {
// Should always be non blocking if a slot was reserved.
bw.eagerTaskQueueCh <- task
}

func (bw *baseWorker) processTaskAsync(task interface{}, callback func()) {
bw.stopWG.Add(1)
go func() {
Expand All @@ -351,6 +368,8 @@ func (bw *baseWorker) runTaskDispatcher() {
// wait for new task or worker stop
select {
case <-bw.stopCh:
// Currently we can drop any tasks received when closing.
// https://github.com/temporalio/sdk-go/issues/1197
return
case task := <-bw.taskQueueCh:
// for non-polled-task (local activity result as task or eager task), we don't need to rate limit
Expand All @@ -365,6 +384,23 @@ func (bw *baseWorker) runTaskDispatcher() {
}
}

func (bw *baseWorker) runEagerTaskDispatcher() {
defer bw.stopWG.Done()
for {
select {
case <-bw.stopCh:
// drain eager dispatch queue
for len(bw.eagerTaskQueueCh) > 0 {
eagerTask := <-bw.eagerTaskQueueCh
bw.processTaskAsync(eagerTask.task, eagerTask.callback)
}
return
case eagerTask := <-bw.eagerTaskQueueCh:
bw.processTaskAsync(eagerTask.task, eagerTask.callback)
}
}
}

func (bw *baseWorker) pollTask() {
var err error
var task interface{}
Expand Down

0 comments on commit 1fccf9c

Please sign in to comment.