From 260ef4300f3bc11adf03ce11796600677aeb692a Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Thu, 27 Jul 2023 10:06:41 -0700 Subject: [PATCH 1/2] Fix LA start to close timeout --- internal/activity.go | 54 +++++++++++++++++-------- internal/internal_activity.go | 3 +- internal/internal_event_handlers.go | 42 ++++++++++--------- internal/internal_task_handlers.go | 5 +++ internal/internal_task_pollers.go | 23 +++++------ internal/internal_workflow_testsuite.go | 4 +- test/activity_test.go | 18 +++++++++ test/integration_test.go | 4 ++ test/workflow_test.go | 41 +++++++++++++++++++ 9 files changed, 143 insertions(+), 51 deletions(-) diff --git a/internal/activity.go b/internal/activity.go index 03d2d9f24..10513c7d4 100644 --- a/internal/activity.go +++ b/internal/activity.go @@ -155,11 +155,13 @@ type ( // LocalActivityOptions stores local activity specific parameters that will be stored inside of a context. LocalActivityOptions struct { // ScheduleToCloseTimeout - The end to end timeout for the local activity including retries. - // This field is required. + // At least on of ScheduleToCloseTimeout or StartToCloseTimeout is required. + // defaults to StartToCloseTimeout if not set. ScheduleToCloseTimeout time.Duration // StartToCloseTimeout - The timeout for a single execution of the local activity. - // Optional: defaults to ScheduleToClose + // At least on of ScheduleToCloseTimeout or StartToCloseTimeout is required. + // defaults to ScheduleToCloseTimeout if not set. StartToCloseTimeout time.Duration // RetryPolicy specify how to retry activity if error happens. @@ -253,25 +255,12 @@ func WithActivityTask( contextPropagators []ContextPropagator, interceptors []WorkerInterceptor, ) (context.Context, error) { - var deadline time.Time scheduled := common.TimeValue(task.GetScheduledTime()) started := common.TimeValue(task.GetStartedTime()) scheduleToCloseTimeout := common.DurationValue(task.GetScheduleToCloseTimeout()) startToCloseTimeout := common.DurationValue(task.GetStartToCloseTimeout()) heartbeatTimeout := common.DurationValue(task.GetHeartbeatTimeout()) - - startToCloseDeadline := started.Add(startToCloseTimeout) - if scheduleToCloseTimeout > 0 { - scheduleToCloseDeadline := scheduled.Add(scheduleToCloseTimeout) - // Minimum of the two deadlines. - if scheduleToCloseDeadline.Before(startToCloseDeadline) { - deadline = scheduleToCloseDeadline - } else { - deadline = startToCloseDeadline - } - } else { - deadline = startToCloseDeadline - } + deadline := calculateActivityDeadline(scheduled, started, scheduleToCloseTimeout, startToCloseTimeout) logger = log.With(logger, tagActivityID, task.ActivityId, @@ -332,6 +321,21 @@ func WithLocalActivityTask( tagWorkflowID, task.params.WorkflowInfo.WorkflowExecution.ID, tagRunID, task.params.WorkflowInfo.WorkflowExecution.RunID, ) + startedTime := time.Now() + scheduleToCloseTimeout := task.params.ScheduleToCloseTimeout + startToCloseTimeout := task.params.StartToCloseTimeout + + if startToCloseTimeout == 0 { + startToCloseTimeout = scheduleToCloseTimeout + } + if scheduleToCloseTimeout == 0 { + scheduleToCloseTimeout = startToCloseTimeout + } + deadline := calculateActivityDeadline(task.scheduledTime, startedTime, scheduleToCloseTimeout, startToCloseTimeout) + if task.attempt > 1 && !task.expireTime.IsZero() && task.expireTime.Before(deadline) { + // this is attempt and expire time is before SCHEDULE_TO_CLOSE timeout + deadline = task.expireTime + } return newActivityContext(ctx, interceptors, &activityEnvironment{ workflowType: &workflowTypeLocal, workflowNamespace: task.params.WorkflowInfo.Namespace, @@ -342,6 +346,9 @@ func WithLocalActivityTask( logger: logger, metricsHandler: metricsHandler, isLocalActivity: true, + deadline: deadline, + scheduledTime: task.scheduledTime, + startedTime: startedTime, dataConverter: dataConverter, attempt: task.attempt, }) @@ -374,3 +381,18 @@ func newActivityContext( return ctx, nil } + +func calculateActivityDeadline(scheduled, started time.Time, scheduleToCloseTimeout, startToCloseTimeout time.Duration) time.Time { + startToCloseDeadline := started.Add(startToCloseTimeout) + if scheduleToCloseTimeout > 0 { + scheduleToCloseDeadline := scheduled.Add(scheduleToCloseTimeout) + // Minimum of the two deadlines. + if scheduleToCloseDeadline.Before(startToCloseDeadline) { + return scheduleToCloseDeadline + } else { + return startToCloseDeadline + } + } else { + return startToCloseDeadline + } +} diff --git a/internal/internal_activity.go b/internal/internal_activity.go index 0dba0ae9e..ab2413822 100644 --- a/internal/internal_activity.go +++ b/internal/internal_activity.go @@ -217,7 +217,8 @@ func getValidatedLocalActivityOptions(ctx Context) (*ExecuteLocalActivityOptions } if p.ScheduleToCloseTimeout == 0 { p.ScheduleToCloseTimeout = p.StartToCloseTimeout - } else { + } + if p.StartToCloseTimeout == 0 { p.StartToCloseTimeout = p.ScheduleToCloseTimeout } return p, nil diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index a9613c1b3..3837d8a80 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -172,19 +172,21 @@ type ( localActivityTask struct { sync.Mutex - workflowTask *workflowTask - activityID string - params *ExecuteLocalActivityParams - callback LocalActivityResultHandler - wc *workflowExecutionContextImpl - canceled bool - cancelFunc func() - attempt int32 // attempt starting from 1 - attemptsThisWFT uint32 // Number of attempts started during this workflow task - pastFirstWFT bool // Set true once this LA has lived for more than one workflow task - retryPolicy *RetryPolicy - expireTime time.Time - header *commonpb.Header + workflowTask *workflowTask + activityID string + params *ExecuteLocalActivityParams + callback LocalActivityResultHandler + wc *workflowExecutionContextImpl + canceled bool + cancelFunc func() + attempt int32 // attempt starting from 1 + attemptsThisWFT uint32 // Number of attempts started during this workflow task + pastFirstWFT bool // Set true once this LA has lived for more than one workflow task + retryPolicy *RetryPolicy + expireTime time.Time + scheduledTime time.Time // Time the activity was scheduled initially. + currentAttemptScheduledTime time.Time // Time this attempt of the activity was scheduled. + header *commonpb.Header } localActivityMarkerData struct { @@ -682,12 +684,14 @@ func (wc *workflowEnvironmentImpl) ExecuteLocalActivity(params ExecuteLocalActiv func newLocalActivityTask(params ExecuteLocalActivityParams, callback LocalActivityResultHandler, activityID string) *localActivityTask { task := &localActivityTask{ - activityID: activityID, - params: ¶ms, - callback: callback, - retryPolicy: params.RetryPolicy, - attempt: params.Attempt, - header: params.Header, + activityID: activityID, + params: ¶ms, + callback: callback, + retryPolicy: params.RetryPolicy, + attempt: params.Attempt, + header: params.Header, + scheduledTime: time.Now(), + currentAttemptScheduledTime: time.Now(), } if params.ScheduleToCloseTimeout > 0 { diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index 5141b5058..e0d20eaa1 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -861,6 +861,7 @@ processWorkflowLoop: } laRetry.attempt++ + laRetry.currentAttemptScheduledTime = time.Now() if !wth.laTunnel.sendTask(laRetry) { laRetry.attempt-- @@ -1194,6 +1195,10 @@ func (w *workflowExecutionContextImpl) CompleteWorkflowTask(workflowTask *workfl task := eventHandler.pendingLaTasks[activityID] task.wc = w task.workflowTask = workflowTask + + task.scheduledTime = time.Now() + task.currentAttemptScheduledTime = task.scheduledTime + if !w.laTunnel.sendTask(task) { unstartedLaTasks[activityID] = struct{}{} task.wc = nil diff --git a/internal/internal_task_pollers.go b/internal/internal_task_pollers.go index e8506c8ca..2e17ceea1 100644 --- a/internal/internal_task_pollers.go +++ b/internal/internal_task_pollers.go @@ -578,18 +578,8 @@ func (lath *localActivityTaskHandler) executeLocalActivityTask(task *localActivi return &localActivityResult{task: task, err: err} } - timeout := task.params.ScheduleToCloseTimeout - if task.params.StartToCloseTimeout != 0 && task.params.StartToCloseTimeout < timeout { - timeout = task.params.StartToCloseTimeout - } - timeoutDuration := timeout - deadline := time.Now().Add(timeoutDuration) - if task.attempt > 1 && !task.expireTime.IsZero() && task.expireTime.Before(deadline) { - // this is attempt and expire time is before SCHEDULE_TO_CLOSE timeout - deadline = task.expireTime - } - - ctx, cancel := context.WithDeadline(ctx, deadline) + info := getActivityEnv(ctx) + ctx, cancel := context.WithDeadline(ctx, info.deadline) defer cancel() task.Lock() @@ -631,13 +621,14 @@ func (lath *localActivityTaskHandler) executeLocalActivityTask(task *localActivi laResult, err = ae.ExecuteWithActualArgs(ctx, task.params.InputArgs) executionLatency := time.Since(laStartTime) metricsHandler.Timer(metrics.LocalActivityExecutionLatency).Record(executionLatency) - if executionLatency > timeoutDuration { + if time.Now().After(info.deadline) { // If local activity takes longer than expected timeout, the context would already be DeadlineExceeded and // the result would be discarded. Print a warning in this case. lath.logger.Warn("LocalActivity takes too long to complete.", "LocalActivityID", task.activityID, "LocalActivityType", activityType, "ScheduleToCloseTimeout", task.params.ScheduleToCloseTimeout, + "StartToCloseTimeout", task.params.StartToCloseTimeout, "ActualExecutionDuration", executionLatency) } }(doneCh) @@ -658,7 +649,11 @@ WaitResult: metricsHandler.Counter(metrics.LocalActivityExecutionCanceledCounter).Inc(1) return &localActivityResult{err: ErrCanceled, task: task} } else if ctx.Err() == context.DeadlineExceeded { - return &localActivityResult{err: ErrDeadlineExceeded, task: task} + if task.params.ScheduleToCloseTimeout != 0 && time.Now().After(info.scheduledTime.Add(task.params.ScheduleToCloseTimeout)) { + return &localActivityResult{err: ErrDeadlineExceeded, task: task} + } else { + return &localActivityResult{err: NewTimeoutError("deadline exceeded", enumspb.TIMEOUT_TYPE_START_TO_CLOSE, nil), task: task} + } } else { // should not happen return &localActivityResult{err: NewApplicationError("unexpected context done", "", true, nil), task: task} diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index 56102597b..74eb88ad2 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -634,7 +634,9 @@ func (env *testWorkflowEnvironmentImpl) executeLocalActivity( params: ¶ms, callback: func(lar *LocalActivityResultWrapper) { }, - attempt: 1, + attempt: 1, + scheduledTime: time.Now(), + currentAttemptScheduledTime: time.Now(), } taskHandler := localActivityTaskHandler{ userContext: env.workerOptions.BackgroundActivityContext, diff --git a/test/activity_test.go b/test/activity_test.go index 2d822ebbf..eba9ef25b 100644 --- a/test/activity_test.go +++ b/test/activity_test.go @@ -71,6 +71,15 @@ func (a *Activities) Sleep(_ context.Context, delay time.Duration) error { return nil } +func (a *Activities) SleepN(ctx context.Context, delay time.Duration, times int) (int32, error) { + a.append("sleepN") + if activity.GetInfo(ctx).Attempt >= int32(times) { + return activity.GetInfo(ctx).Attempt, nil + } + time.Sleep(delay) + return activity.GetInfo(ctx).Attempt, nil +} + func LocalSleep(_ context.Context, delay time.Duration) error { time.Sleep(delay) return nil @@ -186,6 +195,15 @@ func (a *Activities) InspectActivityInfo(ctx context.Context, namespace, taskQue if info.TaskQueue != taskQueue { return fmt.Errorf("expected taskQueue %v but got %v", taskQueue, info.TaskQueue) } + if info.Deadline.IsZero() { + return errors.New("expected non zero deadline") + } + if info.StartedTime.IsZero() { + return errors.New("expected non zero started time") + } + if info.ScheduledTime.IsZero() { + return errors.New("expected non zero scheduled time") + } if info.IsLocalActivity != isLocalActivity { return fmt.Errorf("expected IsLocalActivity %v but got %v", isLocalActivity, info.IsLocalActivity) } diff --git a/test/integration_test.go b/test/integration_test.go index 29c3af179..0f79d0770 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -1142,6 +1142,10 @@ func (ts *IntegrationTestSuite) TestWorkflowWithParallelSideEffects() { ts.NoError(ts.executeWorkflow("test-wf-parallel-side-effects", ts.workflows.WorkflowWithParallelSideEffects, nil)) } +func (ts *IntegrationTestSuite) TestWorkflowWithLocalActivityStartToClose() { + ts.NoError(ts.executeWorkflow("test-wf-la-start-to-close", ts.workflows.WorkflowWithLocalActivityStartToCloseTimeout, nil)) +} + func (ts *IntegrationTestSuite) TestActivityTimeoutsWorkflow() { ts.NoError(ts.executeWorkflow("test-activity-timeout-workflow", ts.workflows.ActivityTimeoutsWorkflow, nil, workflow.ActivityOptions{ ScheduleToCloseTimeout: 5 * time.Second, diff --git a/test/workflow_test.go b/test/workflow_test.go index 48d093e78..993d1af98 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -1159,6 +1159,46 @@ func (w *Workflows) WorkflowWithParallelLongLocalActivityAndHeartbeat(ctx workfl return nil } +func (w *Workflows) WorkflowWithLocalActivityStartToCloseTimeout(ctx workflow.Context) error { + // Validate that local activities respect StartToCloseTimeout and retry correctly + ao := w.defaultLocalActivityOptions() + ao.ScheduleToCloseTimeout = 10 * time.Second + ao.StartToCloseTimeout = 1 * time.Second + ao.RetryPolicy = &temporal.RetryPolicy{ + MaximumInterval: time.Second, + MaximumAttempts: 5, + } + ctx = workflow.WithLocalActivityOptions(ctx, ao) + + var activities *Activities + future := workflow.ExecuteLocalActivity(ctx, activities.SleepN, 3*time.Second, 3) + var count int32 + err := future.Get(ctx, &count) + if err != nil { + return err + } + if count != 3 { + return fmt.Errorf("expected 3, got %v", count) + } + // Validate the correct timeout error is returned + ao.StartToCloseTimeout = 1 * time.Second + ao.RetryPolicy = &temporal.RetryPolicy{ + MaximumInterval: time.Second, + MaximumAttempts: 1, + } + ctx = workflow.WithLocalActivityOptions(ctx, ao) + future = workflow.ExecuteLocalActivity(ctx, activities.SleepN, 3*time.Second, 3) + err = future.Get(ctx, nil) + var timeoutErr *temporal.TimeoutError + if errors.As(err, &timeoutErr) { + if timeoutErr.TimeoutType() != enumspb.TIMEOUT_TYPE_START_TO_CLOSE { + return fmt.Errorf("expected start to close timeout, got %v", timeoutErr.TimeoutType()) + } + return nil + } + return errors.New("expected timeout error") +} + func (w *Workflows) WorkflowWithLocalActivityRetries(ctx workflow.Context) error { laOpts := w.defaultLocalActivityOptions() laOpts.RetryPolicy = &internal.RetryPolicy{ @@ -2342,6 +2382,7 @@ func (w *Workflows) register(worker worker.Worker) { worker.RegisterWorkflow(w.WorkflowWithLocalActivityStartWhenTimerCancel) worker.RegisterWorkflow(w.WorkflowWithParallelSideEffects) worker.RegisterWorkflow(w.WorkflowWithParallelMutableSideEffects) + worker.RegisterWorkflow(w.WorkflowWithLocalActivityStartToCloseTimeout) worker.RegisterWorkflow(w.LocalActivityStaleCache) worker.RegisterWorkflow(w.UpdateInfoWorkflow) worker.RegisterWorkflow(w.SignalWorkflow) From 0e260efea870071cb2cf36625a1d1284fef7d9f9 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Wed, 2 Aug 2023 08:26:07 -0700 Subject: [PATCH 2/2] Respond to pr comments --- internal/activity.go | 9 ++--- internal/internal_event_handlers.go | 44 ++++++++++++------------- internal/internal_task_handlers.go | 2 -- internal/internal_workflow_testsuite.go | 5 ++- 4 files changed, 26 insertions(+), 34 deletions(-) diff --git a/internal/activity.go b/internal/activity.go index 10513c7d4..3de596709 100644 --- a/internal/activity.go +++ b/internal/activity.go @@ -155,12 +155,12 @@ type ( // LocalActivityOptions stores local activity specific parameters that will be stored inside of a context. LocalActivityOptions struct { // ScheduleToCloseTimeout - The end to end timeout for the local activity including retries. - // At least on of ScheduleToCloseTimeout or StartToCloseTimeout is required. + // At least one of ScheduleToCloseTimeout or StartToCloseTimeout is required. // defaults to StartToCloseTimeout if not set. ScheduleToCloseTimeout time.Duration // StartToCloseTimeout - The timeout for a single execution of the local activity. - // At least on of ScheduleToCloseTimeout or StartToCloseTimeout is required. + // At least one of ScheduleToCloseTimeout or StartToCloseTimeout is required. // defaults to ScheduleToCloseTimeout if not set. StartToCloseTimeout time.Duration @@ -389,10 +389,7 @@ func calculateActivityDeadline(scheduled, started time.Time, scheduleToCloseTime // Minimum of the two deadlines. if scheduleToCloseDeadline.Before(startToCloseDeadline) { return scheduleToCloseDeadline - } else { - return startToCloseDeadline } - } else { - return startToCloseDeadline } + return startToCloseDeadline } diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index 3837d8a80..9cb8fa9e8 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -172,21 +172,20 @@ type ( localActivityTask struct { sync.Mutex - workflowTask *workflowTask - activityID string - params *ExecuteLocalActivityParams - callback LocalActivityResultHandler - wc *workflowExecutionContextImpl - canceled bool - cancelFunc func() - attempt int32 // attempt starting from 1 - attemptsThisWFT uint32 // Number of attempts started during this workflow task - pastFirstWFT bool // Set true once this LA has lived for more than one workflow task - retryPolicy *RetryPolicy - expireTime time.Time - scheduledTime time.Time // Time the activity was scheduled initially. - currentAttemptScheduledTime time.Time // Time this attempt of the activity was scheduled. - header *commonpb.Header + workflowTask *workflowTask + activityID string + params *ExecuteLocalActivityParams + callback LocalActivityResultHandler + wc *workflowExecutionContextImpl + canceled bool + cancelFunc func() + attempt int32 // attempt starting from 1 + attemptsThisWFT uint32 // Number of attempts started during this workflow task + pastFirstWFT bool // Set true once this LA has lived for more than one workflow task + retryPolicy *RetryPolicy + expireTime time.Time + scheduledTime time.Time // Time the activity was scheduled initially. + header *commonpb.Header } localActivityMarkerData struct { @@ -684,14 +683,13 @@ func (wc *workflowEnvironmentImpl) ExecuteLocalActivity(params ExecuteLocalActiv func newLocalActivityTask(params ExecuteLocalActivityParams, callback LocalActivityResultHandler, activityID string) *localActivityTask { task := &localActivityTask{ - activityID: activityID, - params: ¶ms, - callback: callback, - retryPolicy: params.RetryPolicy, - attempt: params.Attempt, - header: params.Header, - scheduledTime: time.Now(), - currentAttemptScheduledTime: time.Now(), + activityID: activityID, + params: ¶ms, + callback: callback, + retryPolicy: params.RetryPolicy, + attempt: params.Attempt, + header: params.Header, + scheduledTime: time.Now(), } if params.ScheduleToCloseTimeout > 0 { diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index e0d20eaa1..56ba636cb 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -861,7 +861,6 @@ processWorkflowLoop: } laRetry.attempt++ - laRetry.currentAttemptScheduledTime = time.Now() if !wth.laTunnel.sendTask(laRetry) { laRetry.attempt-- @@ -1197,7 +1196,6 @@ func (w *workflowExecutionContextImpl) CompleteWorkflowTask(workflowTask *workfl task.workflowTask = workflowTask task.scheduledTime = time.Now() - task.currentAttemptScheduledTime = task.scheduledTime if !w.laTunnel.sendTask(task) { unstartedLaTasks[activityID] = struct{}{} diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index 74eb88ad2..e5104581f 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -634,9 +634,8 @@ func (env *testWorkflowEnvironmentImpl) executeLocalActivity( params: ¶ms, callback: func(lar *LocalActivityResultWrapper) { }, - attempt: 1, - scheduledTime: time.Now(), - currentAttemptScheduledTime: time.Now(), + attempt: 1, + scheduledTime: time.Now(), } taskHandler := localActivityTaskHandler{ userContext: env.workerOptions.BackgroundActivityContext,