Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix LA start to close timeout #1180

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 35 additions & 16 deletions internal/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,13 @@ type (
// LocalActivityOptions stores local activity specific parameters that will be stored inside of a context.
LocalActivityOptions struct {
// ScheduleToCloseTimeout - The end to end timeout for the local activity including retries.
// This field is required.
// At least one of ScheduleToCloseTimeout or StartToCloseTimeout is required.
// defaults to StartToCloseTimeout if not set.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default for ScheduleToCloseTimeout for normal activities is infinity. We could be consistent here, but that might be a breaking change.

ScheduleToCloseTimeout time.Duration

// StartToCloseTimeout - The timeout for a single execution of the local activity.
// Optional: defaults to ScheduleToClose
// At least one of ScheduleToCloseTimeout or StartToCloseTimeout is required.
// defaults to ScheduleToCloseTimeout if not set.
StartToCloseTimeout time.Duration

// RetryPolicy specify how to retry activity if error happens.
Expand Down Expand Up @@ -253,25 +255,12 @@ func WithActivityTask(
contextPropagators []ContextPropagator,
interceptors []WorkerInterceptor,
) (context.Context, error) {
var deadline time.Time
scheduled := common.TimeValue(task.GetScheduledTime())
started := common.TimeValue(task.GetStartedTime())
scheduleToCloseTimeout := common.DurationValue(task.GetScheduleToCloseTimeout())
startToCloseTimeout := common.DurationValue(task.GetStartToCloseTimeout())
heartbeatTimeout := common.DurationValue(task.GetHeartbeatTimeout())

startToCloseDeadline := started.Add(startToCloseTimeout)
if scheduleToCloseTimeout > 0 {
scheduleToCloseDeadline := scheduled.Add(scheduleToCloseTimeout)
// Minimum of the two deadlines.
if scheduleToCloseDeadline.Before(startToCloseDeadline) {
deadline = scheduleToCloseDeadline
} else {
deadline = startToCloseDeadline
}
} else {
deadline = startToCloseDeadline
}
deadline := calculateActivityDeadline(scheduled, started, scheduleToCloseTimeout, startToCloseTimeout)

logger = log.With(logger,
tagActivityID, task.ActivityId,
Expand Down Expand Up @@ -332,6 +321,21 @@ func WithLocalActivityTask(
tagWorkflowID, task.params.WorkflowInfo.WorkflowExecution.ID,
tagRunID, task.params.WorkflowInfo.WorkflowExecution.RunID,
)
startedTime := time.Now()
scheduleToCloseTimeout := task.params.ScheduleToCloseTimeout
startToCloseTimeout := task.params.StartToCloseTimeout

if startToCloseTimeout == 0 {
startToCloseTimeout = scheduleToCloseTimeout
}
if scheduleToCloseTimeout == 0 {
scheduleToCloseTimeout = startToCloseTimeout
}
deadline := calculateActivityDeadline(task.scheduledTime, startedTime, scheduleToCloseTimeout, startToCloseTimeout)
if task.attempt > 1 && !task.expireTime.IsZero() && task.expireTime.Before(deadline) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still an open issue if we should removed this/can remove this. This expireTime is calculated based on the ScheduleToCloseTimeout and the workflow time, but is only used on the second attempt. This has been basically unchanged for 4+ years, but seems wrong.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While wrong, let's leave for now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I agree, just wanted to call this out since when we put Go on Core this will be something we have to deal with.

// this is attempt and expire time is before SCHEDULE_TO_CLOSE timeout
deadline = task.expireTime
}
return newActivityContext(ctx, interceptors, &activityEnvironment{
workflowType: &workflowTypeLocal,
workflowNamespace: task.params.WorkflowInfo.Namespace,
Expand All @@ -342,6 +346,9 @@ func WithLocalActivityTask(
logger: logger,
metricsHandler: metricsHandler,
isLocalActivity: true,
deadline: deadline,
scheduledTime: task.scheduledTime,
startedTime: startedTime,
dataConverter: dataConverter,
attempt: task.attempt,
})
Expand Down Expand Up @@ -374,3 +381,15 @@ func newActivityContext(

return ctx, nil
}

func calculateActivityDeadline(scheduled, started time.Time, scheduleToCloseTimeout, startToCloseTimeout time.Duration) time.Time {
startToCloseDeadline := started.Add(startToCloseTimeout)
if scheduleToCloseTimeout > 0 {
scheduleToCloseDeadline := scheduled.Add(scheduleToCloseTimeout)
// Minimum of the two deadlines.
if scheduleToCloseDeadline.Before(startToCloseDeadline) {
return scheduleToCloseDeadline
}
}
return startToCloseDeadline
}
3 changes: 2 additions & 1 deletion internal/internal_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ func getValidatedLocalActivityOptions(ctx Context) (*ExecuteLocalActivityOptions
}
if p.ScheduleToCloseTimeout == 0 {
p.ScheduleToCloseTimeout = p.StartToCloseTimeout
} else {
}
if p.StartToCloseTimeout == 0 {
p.StartToCloseTimeout = p.ScheduleToCloseTimeout
}
return p, nil
Expand Down
14 changes: 8 additions & 6 deletions internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ type (
pastFirstWFT bool // Set true once this LA has lived for more than one workflow task
retryPolicy *RetryPolicy
expireTime time.Time
scheduledTime time.Time // Time the activity was scheduled initially.
header *commonpb.Header
}

Expand Down Expand Up @@ -682,12 +683,13 @@ func (wc *workflowEnvironmentImpl) ExecuteLocalActivity(params ExecuteLocalActiv

func newLocalActivityTask(params ExecuteLocalActivityParams, callback LocalActivityResultHandler, activityID string) *localActivityTask {
task := &localActivityTask{
activityID: activityID,
params: &params,
callback: callback,
retryPolicy: params.RetryPolicy,
attempt: params.Attempt,
header: params.Header,
activityID: activityID,
params: &params,
callback: callback,
retryPolicy: params.RetryPolicy,
attempt: params.Attempt,
header: params.Header,
scheduledTime: time.Now(),
}

if params.ScheduleToCloseTimeout > 0 {
Expand Down
3 changes: 3 additions & 0 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1194,6 +1194,9 @@ func (w *workflowExecutionContextImpl) CompleteWorkflowTask(workflowTask *workfl
task := eventHandler.pendingLaTasks[activityID]
task.wc = w
task.workflowTask = workflowTask

task.scheduledTime = time.Now()

if !w.laTunnel.sendTask(task) {
unstartedLaTasks[activityID] = struct{}{}
task.wc = nil
Expand Down
23 changes: 9 additions & 14 deletions internal/internal_task_pollers.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,18 +578,8 @@ func (lath *localActivityTaskHandler) executeLocalActivityTask(task *localActivi
return &localActivityResult{task: task, err: err}
}

timeout := task.params.ScheduleToCloseTimeout
if task.params.StartToCloseTimeout != 0 && task.params.StartToCloseTimeout < timeout {
timeout = task.params.StartToCloseTimeout
}
timeoutDuration := timeout
deadline := time.Now().Add(timeoutDuration)
if task.attempt > 1 && !task.expireTime.IsZero() && task.expireTime.Before(deadline) {
// this is attempt and expire time is before SCHEDULE_TO_CLOSE timeout
deadline = task.expireTime
}
Comment on lines -581 to -590
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see anything obvious, but are there any ways that you can think of that this logic has changed with your new logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only change here is using the scheduled time + ScheduleToCloseTimeout when calculating the deadline (used to be time.Now). It probably won't effect many activities because for the first attempt scheduled time will be close to time.Now and subsequent attempts task.expireTime uses workflow time so it will always be shorter than either deadline

Copy link
Member

@cretz cretz Jul 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if today I have a schedule to close timeout of 5s, my second attempt would get the full 5s (even though that's improper)? But after this change, it will only get however much time remains the beginning of the first attempt? Sorry I didn't dig into how expireTime is set. If this is the case, I may hit you up off-PR to communicate with one or two known heavy local activity users to confirm this is acceptable.

Copy link
Contributor Author

@Quinn-With-Two-Ns Quinn-With-Two-Ns Jul 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if today I have a schedule to close timeout of 5s, my second attempt

Today you would have no second attempt because schedule to close is not retried

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirmed off-PR that second-attempt after first-attempt-immediate-failure is bounded today by expireTime regardless.


ctx, cancel := context.WithDeadline(ctx, deadline)
info := getActivityEnv(ctx)
ctx, cancel := context.WithDeadline(ctx, info.deadline)
defer cancel()

task.Lock()
Expand Down Expand Up @@ -631,13 +621,14 @@ func (lath *localActivityTaskHandler) executeLocalActivityTask(task *localActivi
laResult, err = ae.ExecuteWithActualArgs(ctx, task.params.InputArgs)
executionLatency := time.Since(laStartTime)
metricsHandler.Timer(metrics.LocalActivityExecutionLatency).Record(executionLatency)
if executionLatency > timeoutDuration {
if time.Now().After(info.deadline) {
// If local activity takes longer than expected timeout, the context would already be DeadlineExceeded and
// the result would be discarded. Print a warning in this case.
lath.logger.Warn("LocalActivity takes too long to complete.",
"LocalActivityID", task.activityID,
"LocalActivityType", activityType,
"ScheduleToCloseTimeout", task.params.ScheduleToCloseTimeout,
"StartToCloseTimeout", task.params.StartToCloseTimeout,
"ActualExecutionDuration", executionLatency)
}
}(doneCh)
Expand All @@ -658,7 +649,11 @@ WaitResult:
metricsHandler.Counter(metrics.LocalActivityExecutionCanceledCounter).Inc(1)
return &localActivityResult{err: ErrCanceled, task: task}
} else if ctx.Err() == context.DeadlineExceeded {
return &localActivityResult{err: ErrDeadlineExceeded, task: task}
if task.params.ScheduleToCloseTimeout != 0 && time.Now().After(info.scheduledTime.Add(task.params.ScheduleToCloseTimeout)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wish there was a better way to know which timeout was hit instead of having to do math here.

return &localActivityResult{err: ErrDeadlineExceeded, task: task}
} else {
return &localActivityResult{err: NewTimeoutError("deadline exceeded", enumspb.TIMEOUT_TYPE_START_TO_CLOSE, nil), task: task}
}
} else {
// should not happen
return &localActivityResult{err: NewApplicationError("unexpected context done", "", true, nil), task: task}
Expand Down
3 changes: 2 additions & 1 deletion internal/internal_workflow_testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,8 @@ func (env *testWorkflowEnvironmentImpl) executeLocalActivity(
params: &params,
callback: func(lar *LocalActivityResultWrapper) {
},
attempt: 1,
attempt: 1,
scheduledTime: time.Now(),
}
taskHandler := localActivityTaskHandler{
userContext: env.workerOptions.BackgroundActivityContext,
Expand Down
18 changes: 18 additions & 0 deletions test/activity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,15 @@ func (a *Activities) Sleep(_ context.Context, delay time.Duration) error {
return nil
}

func (a *Activities) SleepN(ctx context.Context, delay time.Duration, times int) (int32, error) {
a.append("sleepN")
if activity.GetInfo(ctx).Attempt >= int32(times) {
return activity.GetInfo(ctx).Attempt, nil
}
time.Sleep(delay)
return activity.GetInfo(ctx).Attempt, nil
}

func LocalSleep(_ context.Context, delay time.Duration) error {
time.Sleep(delay)
return nil
Expand Down Expand Up @@ -186,6 +195,15 @@ func (a *Activities) InspectActivityInfo(ctx context.Context, namespace, taskQue
if info.TaskQueue != taskQueue {
return fmt.Errorf("expected taskQueue %v but got %v", taskQueue, info.TaskQueue)
}
if info.Deadline.IsZero() {
return errors.New("expected non zero deadline")
}
if info.StartedTime.IsZero() {
return errors.New("expected non zero started time")
}
if info.ScheduledTime.IsZero() {
return errors.New("expected non zero scheduled time")
}
if info.IsLocalActivity != isLocalActivity {
return fmt.Errorf("expected IsLocalActivity %v but got %v", isLocalActivity, info.IsLocalActivity)
}
Expand Down
4 changes: 4 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1142,6 +1142,10 @@ func (ts *IntegrationTestSuite) TestWorkflowWithParallelSideEffects() {
ts.NoError(ts.executeWorkflow("test-wf-parallel-side-effects", ts.workflows.WorkflowWithParallelSideEffects, nil))
}

func (ts *IntegrationTestSuite) TestWorkflowWithLocalActivityStartToClose() {
ts.NoError(ts.executeWorkflow("test-wf-la-start-to-close", ts.workflows.WorkflowWithLocalActivityStartToCloseTimeout, nil))
}

func (ts *IntegrationTestSuite) TestActivityTimeoutsWorkflow() {
ts.NoError(ts.executeWorkflow("test-activity-timeout-workflow", ts.workflows.ActivityTimeoutsWorkflow, nil, workflow.ActivityOptions{
ScheduleToCloseTimeout: 5 * time.Second,
Expand Down
41 changes: 41 additions & 0 deletions test/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1159,6 +1159,46 @@ func (w *Workflows) WorkflowWithParallelLongLocalActivityAndHeartbeat(ctx workfl
return nil
}

func (w *Workflows) WorkflowWithLocalActivityStartToCloseTimeout(ctx workflow.Context) error {
// Validate that local activities respect StartToCloseTimeout and retry correctly
ao := w.defaultLocalActivityOptions()
ao.ScheduleToCloseTimeout = 10 * time.Second
ao.StartToCloseTimeout = 1 * time.Second
ao.RetryPolicy = &temporal.RetryPolicy{
MaximumInterval: time.Second,
MaximumAttempts: 5,
}
ctx = workflow.WithLocalActivityOptions(ctx, ao)

var activities *Activities
future := workflow.ExecuteLocalActivity(ctx, activities.SleepN, 3*time.Second, 3)
var count int32
err := future.Get(ctx, &count)
if err != nil {
return err
}
if count != 3 {
return fmt.Errorf("expected 3, got %v", count)
}
// Validate the correct timeout error is returned
ao.StartToCloseTimeout = 1 * time.Second
ao.RetryPolicy = &temporal.RetryPolicy{
MaximumInterval: time.Second,
MaximumAttempts: 1,
}
ctx = workflow.WithLocalActivityOptions(ctx, ao)
future = workflow.ExecuteLocalActivity(ctx, activities.SleepN, 3*time.Second, 3)
err = future.Get(ctx, nil)
var timeoutErr *temporal.TimeoutError
if errors.As(err, &timeoutErr) {
if timeoutErr.TimeoutType() != enumspb.TIMEOUT_TYPE_START_TO_CLOSE {
return fmt.Errorf("expected start to close timeout, got %v", timeoutErr.TimeoutType())
}
return nil
}
return errors.New("expected timeout error")
}

func (w *Workflows) WorkflowWithLocalActivityRetries(ctx workflow.Context) error {
laOpts := w.defaultLocalActivityOptions()
laOpts.RetryPolicy = &internal.RetryPolicy{
Expand Down Expand Up @@ -2342,6 +2382,7 @@ func (w *Workflows) register(worker worker.Worker) {
worker.RegisterWorkflow(w.WorkflowWithLocalActivityStartWhenTimerCancel)
worker.RegisterWorkflow(w.WorkflowWithParallelSideEffects)
worker.RegisterWorkflow(w.WorkflowWithParallelMutableSideEffects)
worker.RegisterWorkflow(w.WorkflowWithLocalActivityStartToCloseTimeout)
worker.RegisterWorkflow(w.LocalActivityStaleCache)
worker.RegisterWorkflow(w.UpdateInfoWorkflow)
worker.RegisterWorkflow(w.SignalWorkflow)
Expand Down
Loading