diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index b40c00a4f..cc49cadb9 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -130,24 +130,25 @@ type ( // workflowTaskHandlerImpl is the implementation of WorkflowTaskHandler workflowTaskHandlerImpl struct { - namespace string - metricsHandler metrics.Handler - ppMgr pressurePointMgr - logger log.Logger - identity string - workerBuildID string - useBuildIDForVersioning bool - deploymentName string - enableLoggingInReplay bool - registry *registry - laTunnel *localActivityTunnel - workflowPanicPolicy WorkflowPanicPolicy - dataConverter converter.DataConverter - failureConverter converter.FailureConverter - contextPropagators []ContextPropagator - cache *WorkerCache - deadlockDetectionTimeout time.Duration - capabilities *workflowservice.GetSystemInfoResponse_Capabilities + namespace string + metricsHandler metrics.Handler + ppMgr pressurePointMgr + logger log.Logger + identity string + workerBuildID string + useBuildIDForVersioning bool + deploymentName string + defaultVersioningBehavior VersioningBehavior + enableLoggingInReplay bool + registry *registry + laTunnel *localActivityTunnel + workflowPanicPolicy WorkflowPanicPolicy + dataConverter converter.DataConverter + failureConverter converter.FailureConverter + contextPropagators []ContextPropagator + cache *WorkerCache + deadlockDetectionTimeout time.Duration + capabilities *workflowservice.GetSystemInfoResponse_Capabilities } activityProvider func(name string) activity @@ -550,23 +551,24 @@ func inferMessageFromAcceptedEvent(attrs *historypb.WorkflowExecutionUpdateAccep func newWorkflowTaskHandler(params workerExecutionParameters, ppMgr pressurePointMgr, registry *registry) WorkflowTaskHandler { ensureRequiredParams(¶ms) return &workflowTaskHandlerImpl{ - namespace: params.Namespace, - logger: params.Logger, - ppMgr: ppMgr, - metricsHandler: params.MetricsHandler, - identity: params.Identity, - workerBuildID: params.getBuildID(), - useBuildIDForVersioning: params.UseBuildIDForVersioning, - deploymentName: params.DeploymentName, - enableLoggingInReplay: params.EnableLoggingInReplay, - registry: registry, - workflowPanicPolicy: params.WorkflowPanicPolicy, - dataConverter: params.DataConverter, - failureConverter: params.FailureConverter, - contextPropagators: params.ContextPropagators, - cache: params.cache, - deadlockDetectionTimeout: params.DeadlockDetectionTimeout, - capabilities: params.capabilities, + namespace: params.Namespace, + logger: params.Logger, + ppMgr: ppMgr, + metricsHandler: params.MetricsHandler, + identity: params.Identity, + workerBuildID: params.getBuildID(), + useBuildIDForVersioning: params.UseBuildIDForVersioning, + deploymentName: params.DeploymentName, + defaultVersioningBehavior: params.DefaultVersioningBehavior, + enableLoggingInReplay: params.EnableLoggingInReplay, + registry: registry, + workflowPanicPolicy: params.WorkflowPanicPolicy, + dataConverter: params.DataConverter, + failureConverter: params.FailureConverter, + contextPropagators: params.ContextPropagators, + cache: params.cache, + deadlockDetectionTimeout: params.DeadlockDetectionTimeout, + capabilities: params.capabilities, } } @@ -1914,6 +1916,13 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow( if wth.capabilities != nil && wth.capabilities.BuildIdBasedVersioning { builtRequest.BinaryChecksum = "" } + if wth.useBuildIDForVersioning && wth.deploymentName != "" { + if workflowContext.workflowInfo.currentVersioningBehavior != VersioningBehaviorUnspecified { + builtRequest.VersioningBehavior = versioningBehaviorToProto(workflowContext.workflowInfo.currentVersioningBehavior) + } else { + builtRequest.VersioningBehavior = versioningBehaviorToProto(wth.defaultVersioningBehavior) + } + } return builtRequest } diff --git a/internal/internal_worker.go b/internal/internal_worker.go index 3ffc834c6..8921a3eee 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -170,6 +170,9 @@ type ( // The worker's deployment name, an identifier in versioning-3 to group Task Queues for a given build ID. DeploymentName string + // The Versioning Behavior for workflows that do not set one in their first task. + DefaultVersioningBehavior VersioningBehavior + MetricsHandler metrics.Handler Logger log.Logger @@ -1621,7 +1624,7 @@ func NewAggregatedWorker(client *WorkflowClient, taskQueue string, options Worke // Sessions are not currently compatible with worker versioning // See: https://github.com/temporalio/sdk-go/issues/1227 - if options.EnableSessionWorker && options.UseBuildIDForVersioning { + if options.EnableSessionWorker && options.DeploymentOptions.UseBuildIDForVersioning { panic("cannot set both EnableSessionWorker and UseBuildIDForVersioning") } @@ -1665,9 +1668,10 @@ func NewAggregatedWorker(client *WorkflowClient, taskQueue string, options Worke MaxConcurrentWorkflowTaskQueuePollers: options.MaxConcurrentWorkflowTaskPollers, MaxConcurrentNexusTaskQueuePollers: options.MaxConcurrentNexusTaskPollers, Identity: client.identity, - WorkerBuildID: options.BuildID, - UseBuildIDForVersioning: options.UseBuildIDForVersioning, - DeploymentName: options.DeploymentName, + WorkerBuildID: options.DeploymentOptions.BuildID, + UseBuildIDForVersioning: options.DeploymentOptions.UseBuildIDForVersioning, + DeploymentName: options.DeploymentOptions.DeploymentName, + DefaultVersioningBehavior: options.DeploymentOptions.DefaultVersioningBehavior, MetricsHandler: client.metricsHandler.WithTags(metrics.TaskQueueTags(taskQueue)), Logger: client.logger, EnableLoggingInReplay: options.EnableLoggingInReplay, diff --git a/internal/internal_worker_test.go b/internal/internal_worker_test.go index 6a5a34862..c8103360a 100644 --- a/internal/internal_worker_test.go +++ b/internal/internal_worker_test.go @@ -2865,7 +2865,8 @@ func TestWorkerBuildIDAndSessionPanic(t *testing.T) { var recovered interface{} func() { defer func() { recovered = recover() }() - worker := NewAggregatedWorker(&WorkflowClient{}, "some-task-queue", WorkerOptions{EnableSessionWorker: true, UseBuildIDForVersioning: true}) + worker := NewAggregatedWorker(&WorkflowClient{}, "some-task-queue", WorkerOptions{EnableSessionWorker: true, + DeploymentOptions: WorkerDeploymentOptions{UseBuildIDForVersioning: true}}) worker.RegisterWorkflow(testReplayWorkflow) }() require.Equal(t, "cannot set both EnableSessionWorker and UseBuildIDForVersioning", recovered) diff --git a/internal/internal_workflow.go b/internal/internal_workflow.go index ce4eb8986..3d9c5a678 100644 --- a/internal/internal_workflow.go +++ b/internal/internal_workflow.go @@ -1611,6 +1611,23 @@ func SetCurrentDetails(ctx Context, details string) { getWorkflowEnvOptions(ctx).currentDetails = details } +// SetVersioningBehavior sets the strategy to upgrade this workflow when the default Build ID +// has changed, and Worker Versioning-3 has been enabled. +// +// NOTE: Experimental +func SetVersioningBehavior(ctx Context, behavior VersioningBehavior) { + env := getWorkflowEnvironment(ctx) + env.WorkflowInfo().currentVersioningBehavior = behavior +} + +// GetVersioningBehavior returns the last versioning behavior set with SetVersioningBehavior. +// +// NOTE: Experimental +func GetVersioningBehavior(ctx Context) VersioningBehavior { + env := getWorkflowEnvironment(ctx) + return env.WorkflowInfo().currentVersioningBehavior +} + func getWorkflowMetadata(ctx Context) (*sdk.WorkflowMetadata, error) { info := GetWorkflowInfo(ctx) eo := getWorkflowEnvOptions(ctx) diff --git a/internal/worker.go b/internal/worker.go index 607cf032f..ea2cc505f 100644 --- a/internal/worker.go +++ b/internal/worker.go @@ -30,6 +30,36 @@ import ( ) type ( + // WorkerDeploymentOptions provides configuration to enable Worker Versioning. + // NOTE: Experimental + WorkerDeploymentOptions struct { + // Assign a BuildID to this worker. This replaces the deprecated binary checksum concept, + // and is used to provide a unique identifier for a set of worker code, and is necessary + // to opt in to the Worker Versioning feature. See UseBuildIDForVersioning. + // NOTE: Experimental + BuildID string + + // If set, opts this worker into the Worker Versioning feature. It will only + // operate on workflows it claims to be compatible with. You must set BuildID if this flag + // is true. + // NOTE: Experimental + // Note: Cannot be enabled at the same time as WorkerOptions.EnableSessionWorker + UseBuildIDForVersioning bool + + // Assign a Deployment Name to this worker, an identifier for Worker Versioning that + // groups task queues for the given BuildID. + // NOTE: Experimental + // NOTE: Both BuildID and UseBuildIDForVersioning need to also be set to enable the new Worker Versioning-3 feature. + DeploymentName string + + // Optional: Provides a default Versioning Behavior to workflows that do not set one in their first task + // (see workflow.SetVersioningBehavior). + // NOTE: Experimental + // NOTE: If the Worker Versioning-3 feature is on, and DefaultVersioningBehavior is unspecified, + // workflows that do not set the Versioning Behavior will fail in their first task. + DefaultVersioningBehavior VersioningBehavior + } + // WorkerOptions is used to configure a worker instance. // The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is // subjected to change in the future. @@ -238,24 +268,10 @@ type ( // workflow or activities. DisableRegistrationAliasing bool - // Assign a BuildID to this worker. This replaces the deprecated binary checksum concept, - // and is used to provide a unique identifier for a set of worker code, and is necessary - // to opt in to the Worker Versioning feature. See UseBuildIDForVersioning. - // NOTE: Experimental - BuildID string - - // Optional: If set, opts this worker into the Worker Versioning feature. It will only - // operate on workflows it claims to be compatible with. You must set BuildID if this flag - // is true. - // NOTE: Experimental - // Note: Cannot be enabled at the same time as EnableSessionWorker - UseBuildIDForVersioning bool - - // Optional: Assign a Deployment Name to this worker, an identifier for Worker Versioning that - // groups task queues for the given BuildID. + // Optional: If set, configure Worker Versioning for this worker. See WorkerDeploymentOptions + // for more. This is incompatible with setting EnableSessionWorker. // NOTE: Experimental - // NOTE: Both BuildID and UseBuildIDForVersioning need to also be set to enable the new Worker Versioning-3 feature. - DeploymentName string + DeploymentOptions WorkerDeploymentOptions // Optional: If set, use a custom tuner for this worker. See WorkerTuner for more. // Mutually exclusive with MaxConcurrentWorkflowTaskExecutionSize, diff --git a/internal/workflow.go b/internal/workflow.go index 12a08b622..a6bd9ab1d 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -59,6 +59,23 @@ const ( HandlerUnfinishedPolicyAbandon ) +// VersioningBehavior specifies when existing workflows could change their Build ID. +// NOTE: Experimental +type VersioningBehavior int + +const ( + // Workflow versioning policy unknown. A default VersioningBehaviorUnspecified policy forces + // every workflow to explicitly set a VersioningBehavior different from VersioningBehaviorUnspecified. + VersioningBehaviorUnspecified VersioningBehavior = iota + + // Workflow should be pinned to the current Build ID until manually moved. + VersioningBehaviorPinned + + // Workflow automatically moves to the latest version (default Build ID of the task queue) + // when the next task is dispatched. + VersioningBehaviorAutoUpgrade +) + var ( errWorkflowIDNotSet = errors.New("workflowId is not set") errLocalActivityParamsBadRequest = errors.New("missing local activity parameters through context, check LocalActivityOptions") @@ -1171,6 +1188,12 @@ type WorkflowInfo struct { // which is currently or about to be executing. If no longer replaying will be set to the ID of // this worker currentTaskBuildID string + // currentVersioningBehavior, if not unspecified, sets the strategy to upgrade + // this workflow when the default Build ID + // has changed, and Worker Versioning-3 has been enabled. Otherwise, the current Worker + // option DefaultVersioningBehavior will be used instead. + // NOTE: Experimental + currentVersioningBehavior VersioningBehavior continueAsNewSuggested bool currentHistorySize int @@ -2531,3 +2554,16 @@ func (wc *workflowEnvironmentInterceptor) ExecuteNexusOperation(ctx Context, inp func (wc *workflowEnvironmentInterceptor) RequestCancelNexusOperation(ctx Context, input RequestCancelNexusOperationInput) { wc.env.RequestCancelNexusOperation(input.seq) } + +func versioningBehaviorToProto(t VersioningBehavior) enumspb.VersioningBehavior { + switch t { + case VersioningBehaviorUnspecified: + return enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED + case VersioningBehaviorPinned: + return enumspb.VERSIONING_BEHAVIOR_PINNED + case VersioningBehaviorAutoUpgrade: + return enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE + default: + panic("unknown versioning behavior type") + } +} diff --git a/test/integration_test.go b/test/integration_test.go index d8c80cee8..e8e3cac7a 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -6174,6 +6174,153 @@ func (ts *IntegrationTestSuite) TestScheduleUpdateWorkflowActionMemo() { } } +func (ts *IntegrationTestSuite) TestVersioningBehaviorInRespondWorkflowTaskCompletedRequest() { + versioningBehaviorAll := make([]enumspb.VersioningBehavior, 0) + ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) + defer cancel() + + // We are setting the default build ID with versioning-2 rules to test + // with existing servers. TODO(antlai-temporal) use versioning-3 APIs + // after there is a server release that supports versioning-3 + res, err := ts.client.GetWorkerVersioningRules(ctx, client.GetWorkerVersioningOptions{ + TaskQueue: ts.taskQueueName, + }) + ts.NoError(err) + + _, err = ts.client.UpdateWorkerVersioningRules(ctx, client.UpdateWorkerVersioningRulesOptions{ + TaskQueue: ts.taskQueueName, + ConflictToken: res.ConflictToken, + Operation: &client.VersioningOperationInsertAssignmentRule{ + RuleIndex: 0, + Rule: client.VersioningAssignmentRule{ + TargetBuildID: "1.0", + }, + }, + }) + ts.NoError(err) + + c, err := client.Dial(client.Options{ + HostPort: ts.config.ServiceAddr, + Namespace: ts.config.Namespace, + ConnectionOptions: client.ConnectionOptions{ + TLS: ts.config.TLS, + DialOptions: []grpc.DialOption{ + grpc.WithUnaryInterceptor(func( + ctx context.Context, + method string, + req interface{}, + reply interface{}, + cc *grpc.ClientConn, + invoker grpc.UnaryInvoker, + opts ...grpc.CallOption, + ) error { + if method == "/temporal.api.workflowservice.v1.WorkflowService/RespondWorkflowTaskCompleted" { + asReq := req.(*workflowservice.RespondWorkflowTaskCompletedRequest) + versioningBehaviorAll = append(versioningBehaviorAll, asReq.VersioningBehavior) + } + return invoker(ctx, method, req, reply, cc, opts...) + }), + }, + }, + }) + ts.NoError(err) + defer c.Close() + + ts.worker.Stop() + ts.workerStopped = true + w := worker.New(c, ts.taskQueueName, worker.Options{ + DeploymentOptions: worker.DeploymentOptions{ + BuildID: "1.0", + UseBuildIDForVersioning: true, + DeploymentName: "deploy-test1", + DefaultVersioningBehavior: workflow.VersioningBehaviorAutoUpgrade, + }, + }) + ts.registerWorkflowsAndActivities(w) + ts.Nil(w.Start()) + defer w.Stop() + + wfOpts := ts.startWorkflowOptions("test-default-versioning-behavior") + ts.NoError(ts.executeWorkflowWithOption(wfOpts, ts.workflows.Basic, nil)) + + ts.Equal(enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE, versioningBehaviorAll[0]) + for i := 1; i < len(versioningBehaviorAll); i++ { + ts.Equal(versioningBehaviorAll[i], enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE) + } + + // Now override Worker defaults with an explicit setter in the workflow code + versioningBehaviorAll = nil + + wfOpts = ts.startWorkflowOptions("test-override-versioning-behavior") + ts.NoError(ts.executeWorkflowWithOption(wfOpts, + ts.workflows.SetPinnedVersioningBehavior, nil)) + ts.Equal(enumspb.VERSIONING_BEHAVIOR_PINNED, versioningBehaviorAll[0]) + for i := 1; i < len(versioningBehaviorAll); i++ { + ts.Equal(versioningBehaviorAll[i], enumspb.VERSIONING_BEHAVIOR_PINNED) + } + + // Ensure other workflows not affected by override + versioningBehaviorAll = nil + + wfOpts = ts.startWorkflowOptions("test-default-versioning-behavior-2") + ts.NoError(ts.executeWorkflowWithOption(wfOpts, ts.workflows.Basic, nil)) + + ts.Equal(enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE, versioningBehaviorAll[0]) + for i := 1; i < len(versioningBehaviorAll); i++ { + ts.Equal(versioningBehaviorAll[i], enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE) + } +} + +func (ts *IntegrationTestSuite) TestGetVersioningBehavior() { + ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) + defer cancel() + + // We are setting the default build ID with versioning-2 rules to test + // with existing servers. TODO(antlai-temporal) use versioning-3 APIs + // after there is a server release that supports versioning-3 + res, err := ts.client.GetWorkerVersioningRules(ctx, client.GetWorkerVersioningOptions{ + TaskQueue: ts.taskQueueName, + }) + ts.NoError(err) + + _, err = ts.client.UpdateWorkerVersioningRules(ctx, client.UpdateWorkerVersioningRulesOptions{ + TaskQueue: ts.taskQueueName, + ConflictToken: res.ConflictToken, + Operation: &client.VersioningOperationInsertAssignmentRule{ + RuleIndex: 0, + Rule: client.VersioningAssignmentRule{ + TargetBuildID: "1.0", + }, + }, + }) + ts.NoError(err) + + ts.worker.Stop() + ts.workerStopped = true + w := worker.New(ts.client, ts.taskQueueName, worker.Options{ + DeploymentOptions: worker.DeploymentOptions{ + BuildID: "1.0", + UseBuildIDForVersioning: true, + DeploymentName: "deploy-test1", + DefaultVersioningBehavior: workflow.VersioningBehaviorAutoUpgrade, + }, + }) + ts.registerWorkflowsAndActivities(w) + ts.Nil(w.Start()) + defer w.Stop() + + wfOpts := ts.startWorkflowOptions("test-get-versioning-behavior") + run, err := ts.client.ExecuteWorkflow(ctx, wfOpts, ts.workflows.SetPinnedVersioningBehavior) + ts.NoError(err) + value, err := ts.client.QueryWorkflow(ctx, "test-get-versioning-behavior", run.GetRunID(), + "get-versioning-behavior") + ts.NoError(err) + ts.NotNil(value) + var behavior workflow.VersioningBehavior + ts.Nil(value.Get(&behavior)) + ts.Equal(workflow.VersioningBehaviorPinned, behavior) +} + func (ts *IntegrationTestSuite) TestSendsCorrectMeteringData() { nonfirstLAAttemptCounts := make([]uint32, 0) c, err := client.Dial(client.Options{ diff --git a/test/worker_versioning_test.go b/test/worker_versioning_test.go index b9624df99..919c9c469 100644 --- a/test/worker_versioning_test.go +++ b/test/worker_versioning_test.go @@ -400,7 +400,12 @@ func (ts *WorkerVersioningTestSuite) TestTwoWorkersGetDifferentTasks() { }) ts.NoError(err) - worker1 := worker.New(ts.client, ts.taskQueueName, worker.Options{BuildID: "1.0", UseBuildIDForVersioning: true}) + worker1 := worker.New(ts.client, ts.taskQueueName, worker.Options{ + DeploymentOptions: worker.DeploymentOptions{ + BuildID: "1.0", + UseBuildIDForVersioning: true, + }, + }) ts.workflows.register(worker1) ts.NoError(worker1.Start()) defer worker1.Stop() @@ -419,14 +424,24 @@ func (ts *WorkerVersioningTestSuite) TestTwoWorkersGetDifferentTasks() { }, }) ts.NoError(err) - worker2 := worker.New(ts.client, ts.taskQueueName, worker.Options{BuildID: "2.0", UseBuildIDForVersioning: true}) + worker2 := worker.New(ts.client, ts.taskQueueName, worker.Options{ + DeploymentOptions: worker.DeploymentOptions{ + BuildID: "2.0", + UseBuildIDForVersioning: true, + }, + }) ts.workflows.register(worker2) ts.NoError(worker2.Start()) defer worker2.Stop() // If we add the worker before the BuildID "2.0" has been registered, the worker poller ends up // in the new versioning queue, and it only recovers after 1m timeout. - worker3 := worker.New(ts.client, ts.taskQueueName, worker.Options{BuildID: "2.0", UseBuildIDForVersioning: true}) + worker3 := worker.New(ts.client, ts.taskQueueName, worker.Options{ + DeploymentOptions: worker.DeploymentOptions{ + BuildID: "2.0", + UseBuildIDForVersioning: true, + }, + }) ts.workflows.register(worker3) ts.NoError(worker3.Start()) defer worker3.Stop() @@ -471,11 +486,21 @@ func (ts *WorkerVersioningTestSuite) TestTwoWorkersGetDifferentTasksWithRules() }) ts.NoError(err) - worker1 := worker.New(ts.client, ts.taskQueueName, worker.Options{BuildID: "1.0", UseBuildIDForVersioning: true}) + worker1 := worker.New(ts.client, ts.taskQueueName, worker.Options{ + DeploymentOptions: worker.DeploymentOptions{ + BuildID: "1.0", + UseBuildIDForVersioning: true, + }, + }) ts.workflows.register(worker1) ts.NoError(worker1.Start()) defer worker1.Stop() - worker2 := worker.New(ts.client, ts.taskQueueName, worker.Options{BuildID: "2.0", UseBuildIDForVersioning: true}) + worker2 := worker.New(ts.client, ts.taskQueueName, worker.Options{ + DeploymentOptions: worker.DeploymentOptions{ + BuildID: "2.0", + UseBuildIDForVersioning: true, + }, + }) ts.workflows.register(worker2) ts.NoError(worker2.Start()) defer worker2.Stop() @@ -643,10 +668,12 @@ func (ts *WorkerVersioningTestSuite) TestDeploymentNameWorker() { defer cancel() worker1 := worker.New(ts.client, ts.taskQueueName, worker.Options{ - Identity: "worker1", - BuildID: "b1", - UseBuildIDForVersioning: false, - DeploymentName: "deploy1", + Identity: "worker1", + DeploymentOptions: worker.DeploymentOptions{ + BuildID: "b1", + UseBuildIDForVersioning: false, + DeploymentName: "deploy1", + }, }) ts.workflows.register(worker1) ts.NoError(worker1.Start()) @@ -700,7 +727,12 @@ func (ts *WorkerVersioningTestSuite) TestReachabilityVersions() { }) ts.NoError(err) - worker1 := worker.New(ts.client, ts.taskQueueName, worker.Options{BuildID: buildID1, UseBuildIDForVersioning: true}) + worker1 := worker.New(ts.client, ts.taskQueueName, worker.Options{ + DeploymentOptions: worker.DeploymentOptions{ + BuildID: buildID1, + UseBuildIDForVersioning: true, + }, + }) ts.workflows.register(worker1) ts.NoError(worker1.Start()) defer worker1.Stop() @@ -719,7 +751,12 @@ func (ts *WorkerVersioningTestSuite) TestReachabilityVersions() { ts.NoError(handle12.Get(ctx, nil)) // Start the second worker - worker2 := worker.New(ts.client, ts.taskQueueName, worker.Options{BuildID: buildID2, UseBuildIDForVersioning: true}) + worker2 := worker.New(ts.client, ts.taskQueueName, worker.Options{ + DeploymentOptions: worker.DeploymentOptions{ + BuildID: buildID2, + UseBuildIDForVersioning: true, + }, + }) ts.workflows.register(worker2) ts.NoError(worker2.Start()) defer worker2.Stop() @@ -789,7 +826,12 @@ func (ts *WorkerVersioningTestSuite) TestReachabilityVersionsWithRules() { }) ts.NoError(err) - worker1 := worker.New(ts.client, ts.taskQueueName, worker.Options{BuildID: buildID1, UseBuildIDForVersioning: true}) + worker1 := worker.New(ts.client, ts.taskQueueName, worker.Options{ + DeploymentOptions: worker.DeploymentOptions{ + BuildID: buildID1, + UseBuildIDForVersioning: true, + }, + }) ts.workflows.register(worker1) ts.NoError(worker1.Start()) defer worker1.Stop() @@ -808,7 +850,12 @@ func (ts *WorkerVersioningTestSuite) TestReachabilityVersionsWithRules() { ts.NoError(handle12.Get(ctx, nil)) // Start the second worker - worker2 := worker.New(ts.client, ts.taskQueueName, worker.Options{BuildID: buildID2, UseBuildIDForVersioning: true}) + worker2 := worker.New(ts.client, ts.taskQueueName, worker.Options{ + DeploymentOptions: worker.DeploymentOptions{ + BuildID: buildID2, + UseBuildIDForVersioning: true, + }, + }) ts.workflows.register(worker2) ts.NoError(worker2.Start()) defer worker2.Stop() @@ -962,7 +1009,12 @@ func (ts *WorkerVersioningTestSuite) TestBuildIDChangesOverWorkflowLifetime() { }) ts.NoError(err) - worker1 := worker.New(ts.client, ts.taskQueueName, worker.Options{BuildID: "1.0", UseBuildIDForVersioning: true}) + worker1 := worker.New(ts.client, ts.taskQueueName, worker.Options{ + DeploymentOptions: worker.DeploymentOptions{ + BuildID: "1.0", + UseBuildIDForVersioning: true, + }, + }) ts.workflows.register(worker1) ts.activities.register(worker1) ts.NoError(worker1.Start()) @@ -996,7 +1048,12 @@ func (ts *WorkerVersioningTestSuite) TestBuildIDChangesOverWorkflowLifetime() { }) ts.NoError(err) - worker11 := worker.New(ts.client, ts.taskQueueName, worker.Options{BuildID: "1.1", UseBuildIDForVersioning: true}) + worker11 := worker.New(ts.client, ts.taskQueueName, worker.Options{ + DeploymentOptions: worker.DeploymentOptions{ + BuildID: "1.1", + UseBuildIDForVersioning: true, + }, + }) ts.workflows.register(worker11) ts.activities.register(worker11) ts.NoError(worker11.Start()) @@ -1052,7 +1109,12 @@ func (ts *WorkerVersioningTestSuite) TestBuildIDChangesOverWorkflowLifetimeWithR }) ts.NoError(err) - worker1 := worker.New(ts.client, ts.taskQueueName, worker.Options{BuildID: "1.0", UseBuildIDForVersioning: true}) + worker1 := worker.New(ts.client, ts.taskQueueName, worker.Options{ + DeploymentOptions: worker.DeploymentOptions{ + BuildID: "1.0", + UseBuildIDForVersioning: true, + }, + }) ts.workflows.register(worker1) ts.activities.register(worker1) ts.NoError(worker1.Start()) @@ -1100,7 +1162,12 @@ func (ts *WorkerVersioningTestSuite) TestBuildIDChangesOverWorkflowLifetimeWithR }) ts.NoError(err) - worker11 := worker.New(ts.client, ts.taskQueueName, worker.Options{BuildID: "1.1", UseBuildIDForVersioning: true}) + worker11 := worker.New(ts.client, ts.taskQueueName, worker.Options{ + DeploymentOptions: worker.DeploymentOptions{ + BuildID: "1.1", + UseBuildIDForVersioning: true, + }, + }) ts.workflows.register(worker11) ts.activities.register(worker11) ts.NoError(worker11.Start()) diff --git a/test/workflow_test.go b/test/workflow_test.go index 562761d8d..5f214eb0d 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -70,6 +70,34 @@ func (w *Workflows) Basic(ctx workflow.Context) ([]string, error) { return []string{"toUpperWithDelay", "toUpper"}, nil } +// Similar to Basic, but setting the versioning behavior to pinned. +func (w *Workflows) SetPinnedVersioningBehavior(ctx workflow.Context) ([]string, error) { + workflow.SetVersioningBehavior(ctx, workflow.VersioningBehaviorPinned) + + err := workflow.SetQueryHandler(ctx, "get-versioning-behavior", func(input []byte) (workflow.VersioningBehavior, error) { + return workflow.GetVersioningBehavior(ctx), nil + }) + if err != nil { + return nil, err + } + + ctx = workflow.WithActivityOptions(ctx, w.defaultActivityOptions()) + var ans1 string + workflow.GetLogger(ctx).Info("calling ExecuteActivity") + err = workflow.ExecuteActivity(ctx, "Prefix_ToUpperWithDelay", "hello", time.Second).Get(ctx, &ans1) + if err != nil { + return nil, err + } + var ans2 string + if err := workflow.ExecuteActivity(ctx, "Prefix_ToUpper", ans1).Get(ctx, &ans2); err != nil { + return nil, err + } + if ans2 != "HELLO" { + return nil, fmt.Errorf("incorrect return value from activity: expected=%v,got=%v", "HELLO", ans2) + } + return []string{"toUpperWithDelay", "toUpper"}, nil +} + func (w *Workflows) Echo(ctx workflow.Context, message string) (string, error) { ctx = workflow.WithActivityOptions(ctx, w.defaultActivityOptions()) var ans1 string @@ -3186,6 +3214,7 @@ func (w *Workflows) register(worker worker.Worker) { worker.RegisterWorkflow(w.ActivityWaitForWorkerStop) worker.RegisterWorkflow(w.ActivityHeartbeatUntilSignal) worker.RegisterWorkflow(w.Basic) + worker.RegisterWorkflow(w.SetPinnedVersioningBehavior) worker.RegisterWorkflow(w.Deadlocked) worker.RegisterWorkflow(w.DeadlockedWithLocalActivity) worker.RegisterWorkflow(w.Panicked) diff --git a/worker/worker.go b/worker/worker.go index dd4f8687a..49fd2211c 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -211,6 +211,10 @@ type ( ReplayWorkflowExecution(ctx context.Context, service workflowservice.WorkflowServiceClient, logger log.Logger, namespace string, execution workflow.Execution) error } + // DeploymentOptions provides configuration to enable Worker Versioning. + // NOTE: Experimental + DeploymentOptions = internal.WorkerDeploymentOptions + // Options is used to configure a worker instance. Options = internal.WorkerOptions diff --git a/workflow/workflow.go b/workflow/workflow.go index a96e690a9..6c5d224a7 100644 --- a/workflow/workflow.go +++ b/workflow/workflow.go @@ -35,6 +35,22 @@ import ( "golang.org/x/exp/constraints" ) +// VersioningBehavior specifies when existing workflows could change their Build ID. +// NOTE: Experimental +type VersioningBehavior = internal.VersioningBehavior + +const ( + // Workflow versioning policy unknown. + VersioningBehaviorUnspecified = internal.VersioningBehaviorUnspecified + + // Workflow should be pinned to the current Build ID until manually moved. + VersioningBehaviorPinned = internal.VersioningBehaviorPinned + + // Workflow automatically moves to the latest version (default Build ID of the task queue) + // when the next task is dispatched. + VersioningBehaviorAutoUpgrade = internal.VersioningBehaviorAutoUpgrade +) + // HandlerUnfinishedPolicy defines the actions taken when a workflow exits while update handlers are // running. The workflow exit may be due to successful return, failure, cancellation, or // continue-as-new. @@ -281,7 +297,7 @@ func GetCurrentUpdateInfo(ctx Context) *UpdateInfo { // GetLogger returns a logger to be used in workflow's context. // This logger does not record logs during replay. // -// The logger may also extract additional fields from the context, such as update info +// The logger may also extract additional fields from the context, such as update info // if used in an update handler. func GetLogger(ctx Context) log.Logger { return internal.GetLogger(ctx) @@ -607,6 +623,34 @@ func SetCurrentDetails(ctx Context, details string) { internal.SetCurrentDetails(ctx, details) } +// SetVersioningBehavior sets the strategy to upgrade this workflow when the default Build ID +// has changed, and Worker Versioning-3 has been enabled. +// +// SetVersioningBehavior should be called during the first workflow task, and the context `ctx` should +// be the original context workflow argument, or derived from this context. +// +// If not set, a default behavior provided in worker.Options.DefaultVersioningBehavior will be used. +// +// If not set, and the default behavior is also unspecified in the Worker, the first task of this workflow +// will fail when Worker Versioning-3 has been enabled. +// +// NOTE: Experimental +func SetVersioningBehavior(ctx Context, behavior VersioningBehavior) { + internal.SetVersioningBehavior(ctx, behavior) +} + +// GetVersioningBehavior returns the last versioning behavior set with +// the SetVersioningBehavior method. +// +// When SetVersioningBehavior has not been called, it always returns +// VersioningBehaviorUnspecified, ignoring any Worker +// defaults to avoid non-deterministic errors. +// +// NOTE: Experimental +func GetVersioningBehavior(ctx Context) VersioningBehavior { + return internal.GetVersioningBehavior(ctx) +} + // IsReplaying returns whether the current workflow code is replaying. // // Warning! Never make commands, like schedule activity/childWorkflow/timer or send/wait on future/channel, based on