From f5493bd81963ec0a54d2c90e77249ba183bce59c Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Sun, 23 Jul 2023 09:12:37 -0700 Subject: [PATCH 1/3] Add IsLocalActivity to ActivityInfo --- internal/activity.go | 1 + internal/internal_activity.go | 1 + test/activity_test.go | 5 ++++- test/workflow_test.go | 4 ++-- 4 files changed, 8 insertions(+), 3 deletions(-) diff --git a/internal/activity.go b/internal/activity.go index 89e5bc615..d7a70fcfa 100644 --- a/internal/activity.go +++ b/internal/activity.go @@ -58,6 +58,7 @@ type ( StartedTime time.Time // Time of activity start Deadline time.Time // Time of activity timeout Attempt int32 // Attempt starts from 1, and increased by 1 for every retry if retry policy is specified. + IsLocalActivity bool // true if it is a local activity } // RegisterActivityOptions consists of options for registering an activity diff --git a/internal/internal_activity.go b/internal/internal_activity.go index df5153a86..0dba0ae9e 100644 --- a/internal/internal_activity.go +++ b/internal/internal_activity.go @@ -378,6 +378,7 @@ func (a *activityEnvironmentInterceptor) GetInfo(ctx context.Context) ActivityIn Attempt: a.env.attempt, WorkflowType: a.env.workflowType, WorkflowNamespace: a.env.workflowNamespace, + IsLocalActivity: a.env.isLocalActivity, } } diff --git a/test/activity_test.go b/test/activity_test.go index 78b6e9d2e..360d1d866 100644 --- a/test/activity_test.go +++ b/test/activity_test.go @@ -153,7 +153,7 @@ func (a *Activities) failNTimes(_ context.Context, times int, id int) error { return errFailOnPurpose } -func (a *Activities) InspectActivityInfo(ctx context.Context, namespace, taskQueue, wfType string) error { +func (a *Activities) InspectActivityInfo(ctx context.Context, namespace, taskQueue, wfType string, isLocalActivity bool) error { a.append("inspectActivityInfo") info := activity.GetInfo(ctx) if info.WorkflowNamespace != namespace { @@ -165,6 +165,9 @@ func (a *Activities) InspectActivityInfo(ctx context.Context, namespace, taskQue if info.TaskQueue != taskQueue { return fmt.Errorf("expected taskQueue %v but got %v", taskQueue, info.TaskQueue) } + if info.IsLocalActivity != isLocalActivity { + return fmt.Errorf("expected IsLocalActivity %v but got %v", isLocalActivity, info.IsLocalActivity) + } return nil } diff --git a/test/workflow_test.go b/test/workflow_test.go index cb3524ece..89f303492 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -1058,7 +1058,7 @@ func (w *Workflows) InspectActivityInfo(ctx workflow.Context) error { wfType := info.WorkflowType.Name taskQueue := info.TaskQueueName ctx = workflow.WithActivityOptions(ctx, w.defaultActivityOptions()) - return workflow.ExecuteActivity(ctx, "inspectActivityInfo", namespace, taskQueue, wfType).Get(ctx, nil) + return workflow.ExecuteActivity(ctx, "inspectActivityInfo", namespace, taskQueue, wfType, false).Get(ctx, nil) } func (w *Workflows) InspectLocalActivityInfo(ctx workflow.Context) error { @@ -1069,7 +1069,7 @@ func (w *Workflows) InspectLocalActivityInfo(ctx workflow.Context) error { ctx = workflow.WithLocalActivityOptions(ctx, w.defaultLocalActivityOptions()) var activities *Activities return workflow.ExecuteLocalActivity( - ctx, activities.InspectActivityInfo, namespace, taskQueue, wfType).Get(ctx, nil) + ctx, activities.InspectActivityInfo, namespace, taskQueue, wfType, true).Get(ctx, nil) } func (w *Workflows) WorkflowWithLocalActivityCtxPropagation(ctx workflow.Context) (string, error) { From e5985ba08da3720ecf35eac4a63c3bb4e7121823 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Sun, 23 Jul 2023 09:48:15 -0700 Subject: [PATCH 2/3] Add InActivity to check if context is an activity context --- activity/activity.go | 5 +++++ internal/activity.go | 6 ++++++ internal/activity_test.go | 8 ++++++++ test/activity_test.go | 4 ++++ 4 files changed, 23 insertions(+) diff --git a/activity/activity.go b/activity/activity.go index 93a69a5ae..ca015d562 100644 --- a/activity/activity.go +++ b/activity/activity.go @@ -101,3 +101,8 @@ func GetHeartbeatDetails(ctx context.Context, d ...interface{}) error { func GetWorkerStopChannel(ctx context.Context) <-chan struct{} { return internal.GetWorkerStopChannel(ctx) } + +// IsActivity check if the context is an activity context from a normal or local activity. +func IsActivity(ctx context.Context) bool { + return internal.IsActivity(ctx) +} diff --git a/internal/activity.go b/internal/activity.go index d7a70fcfa..5a7fe749a 100644 --- a/internal/activity.go +++ b/internal/activity.go @@ -180,6 +180,12 @@ func HasHeartbeatDetails(ctx context.Context) bool { return getActivityOutboundInterceptor(ctx).HasHeartbeatDetails(ctx) } +// IsActivity check if the context is an activity context from a normal or local activity. +func IsActivity(ctx context.Context) bool { + a := ctx.Value(activityInterceptorContextKey) + return a != nil +} + // GetHeartbeatDetails extract heartbeat details from last failed attempt. This is used in combination with retry policy. // An activity could be scheduled with an optional retry policy on ActivityOptions. If the activity failed then server // would attempt to dispatch another activity task to retry according to the retry policy. If there was heartbeat diff --git a/internal/activity_test.go b/internal/activity_test.go index 9bf6e04b8..bc174f0f1 100644 --- a/internal/activity_test.go +++ b/internal/activity_test.go @@ -241,3 +241,11 @@ func (s *activityTestSuite) TestGetWorkerStopChannel() { channel := GetWorkerStopChannel(ctx) s.NotNil(channel) } + +func (s *activityTestSuite) TestIsActivity() { + ctx := context.Background() + s.False(IsActivity(ctx)) + ch := make(chan struct{}, 1) + ctx, _ = newActivityContext(context.Background(), nil, &activityEnvironment{workerStopChannel: ch}) + s.True(IsActivity(ctx)) +} diff --git a/test/activity_test.go b/test/activity_test.go index 360d1d866..5b652ab35 100644 --- a/test/activity_test.go +++ b/test/activity_test.go @@ -155,6 +155,10 @@ func (a *Activities) failNTimes(_ context.Context, times int, id int) error { func (a *Activities) InspectActivityInfo(ctx context.Context, namespace, taskQueue, wfType string, isLocalActivity bool) error { a.append("inspectActivityInfo") + if !activity.IsActivity(ctx) { + return fmt.Errorf("expected InActivity to return %v but got %v", true, activity.InActivity(ctx)) + } + info := activity.GetInfo(ctx) if info.WorkflowNamespace != namespace { return fmt.Errorf("expected namespace %v but got %v", namespace, info.WorkflowNamespace) From 85231630924823ab7b49ca16f28b87f64fd6dc36 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Sun, 23 Jul 2023 12:47:35 -0700 Subject: [PATCH 3/3] fix build error --- test/activity_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/activity_test.go b/test/activity_test.go index 5b652ab35..5f902c736 100644 --- a/test/activity_test.go +++ b/test/activity_test.go @@ -156,7 +156,7 @@ func (a *Activities) failNTimes(_ context.Context, times int, id int) error { func (a *Activities) InspectActivityInfo(ctx context.Context, namespace, taskQueue, wfType string, isLocalActivity bool) error { a.append("inspectActivityInfo") if !activity.IsActivity(ctx) { - return fmt.Errorf("expected InActivity to return %v but got %v", true, activity.InActivity(ctx)) + return fmt.Errorf("expected InActivity to return %v but got %v", true, activity.IsActivity(ctx)) } info := activity.GetInfo(ctx)