Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add IsLocalActivity to ActivityInfo #1170

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions activity/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,8 @@ func GetHeartbeatDetails(ctx context.Context, d ...interface{}) error {
func GetWorkerStopChannel(ctx context.Context) <-chan struct{} {
return internal.GetWorkerStopChannel(ctx)
}

// IsActivity check if the context is an activity context from a normal or local activity.
func IsActivity(ctx context.Context) bool {
return internal.IsActivity(ctx)
}
7 changes: 7 additions & 0 deletions internal/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type (
StartedTime time.Time // Time of activity start
Deadline time.Time // Time of activity timeout
Attempt int32 // Attempt starts from 1, and increased by 1 for every retry if retry policy is specified.
IsLocalActivity bool // true if it is a local activity
}

// RegisterActivityOptions consists of options for registering an activity
Expand Down Expand Up @@ -179,6 +180,12 @@ func HasHeartbeatDetails(ctx context.Context) bool {
return getActivityOutboundInterceptor(ctx).HasHeartbeatDetails(ctx)
}

// IsActivity check if the context is an activity context from a normal or local activity.
func IsActivity(ctx context.Context) bool {
a := ctx.Value(activityInterceptorContextKey)
return a != nil
}

// GetHeartbeatDetails extract heartbeat details from last failed attempt. This is used in combination with retry policy.
// An activity could be scheduled with an optional retry policy on ActivityOptions. If the activity failed then server
// would attempt to dispatch another activity task to retry according to the retry policy. If there was heartbeat
Expand Down
8 changes: 8 additions & 0 deletions internal/activity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,3 +241,11 @@ func (s *activityTestSuite) TestGetWorkerStopChannel() {
channel := GetWorkerStopChannel(ctx)
s.NotNil(channel)
}

func (s *activityTestSuite) TestIsActivity() {
ctx := context.Background()
s.False(IsActivity(ctx))
ch := make(chan struct{}, 1)
ctx, _ = newActivityContext(context.Background(), nil, &activityEnvironment{workerStopChannel: ch})
s.True(IsActivity(ctx))
}
1 change: 1 addition & 0 deletions internal/internal_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ func (a *activityEnvironmentInterceptor) GetInfo(ctx context.Context) ActivityIn
Attempt: a.env.attempt,
WorkflowType: a.env.workflowType,
WorkflowNamespace: a.env.workflowNamespace,
IsLocalActivity: a.env.isLocalActivity,
}
}

Expand Down
9 changes: 8 additions & 1 deletion test/activity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,12 @@ func (a *Activities) failNTimes(_ context.Context, times int, id int) error {
return errFailOnPurpose
}

func (a *Activities) InspectActivityInfo(ctx context.Context, namespace, taskQueue, wfType string) error {
func (a *Activities) InspectActivityInfo(ctx context.Context, namespace, taskQueue, wfType string, isLocalActivity bool) error {
a.append("inspectActivityInfo")
if !activity.IsActivity(ctx) {
return fmt.Errorf("expected InActivity to return %v but got %v", true, activity.IsActivity(ctx))
}

info := activity.GetInfo(ctx)
if info.WorkflowNamespace != namespace {
return fmt.Errorf("expected namespace %v but got %v", namespace, info.WorkflowNamespace)
Expand All @@ -165,6 +169,9 @@ func (a *Activities) InspectActivityInfo(ctx context.Context, namespace, taskQue
if info.TaskQueue != taskQueue {
return fmt.Errorf("expected taskQueue %v but got %v", taskQueue, info.TaskQueue)
}
if info.IsLocalActivity != isLocalActivity {
return fmt.Errorf("expected IsLocalActivity %v but got %v", isLocalActivity, info.IsLocalActivity)
}
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions test/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1058,7 +1058,7 @@ func (w *Workflows) InspectActivityInfo(ctx workflow.Context) error {
wfType := info.WorkflowType.Name
taskQueue := info.TaskQueueName
ctx = workflow.WithActivityOptions(ctx, w.defaultActivityOptions())
return workflow.ExecuteActivity(ctx, "inspectActivityInfo", namespace, taskQueue, wfType).Get(ctx, nil)
return workflow.ExecuteActivity(ctx, "inspectActivityInfo", namespace, taskQueue, wfType, false).Get(ctx, nil)
}

func (w *Workflows) InspectLocalActivityInfo(ctx workflow.Context) error {
Expand All @@ -1069,7 +1069,7 @@ func (w *Workflows) InspectLocalActivityInfo(ctx workflow.Context) error {
ctx = workflow.WithLocalActivityOptions(ctx, w.defaultLocalActivityOptions())
var activities *Activities
return workflow.ExecuteLocalActivity(
ctx, activities.InspectActivityInfo, namespace, taskQueue, wfType).Get(ctx, nil)
ctx, activities.InspectActivityInfo, namespace, taskQueue, wfType, true).Get(ctx, nil)
}

func (w *Workflows) WorkflowWithLocalActivityCtxPropagation(ctx workflow.Context) (string, error) {
Expand Down
Loading