Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Deployment Names to Worker options #1675

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions internal/internal_nexus_task_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func newNexusTaskPoller(
stopC: params.WorkerStopChannel,
workerBuildID: params.getBuildID(),
useBuildIDVersioning: params.UseBuildIDForVersioning,
deploymentName: params.DeploymentName,
capabilities: params.capabilities,
},
taskHandler: taskHandler,
Expand Down Expand Up @@ -91,8 +92,9 @@ func (ntp *nexusTaskPoller) poll(ctx context.Context) (taskForWorker, error) {
TaskQueue: &taskqueuepb.TaskQueue{Name: ntp.taskQueueName, Kind: enumspb.TASK_QUEUE_KIND_NORMAL},
Identity: ntp.identity,
WorkerVersionCapabilities: &commonpb.WorkerVersionCapabilities{
BuildId: ntp.workerBuildID,
UseVersioning: ntp.useBuildIDVersioning,
BuildId: ntp.workerBuildID,
UseVersioning: ntp.useBuildIDVersioning,
DeploymentName: ntp.deploymentName,
},
}

Expand Down
12 changes: 8 additions & 4 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ type (
identity string
workerBuildID string
useBuildIDForVersioning bool
deploymentName string
enableLoggingInReplay bool
registry *registry
laTunnel *localActivityTunnel
Expand Down Expand Up @@ -556,6 +557,7 @@ func newWorkflowTaskHandler(params workerExecutionParameters, ppMgr pressurePoin
identity: params.Identity,
workerBuildID: params.getBuildID(),
useBuildIDForVersioning: params.UseBuildIDForVersioning,
deploymentName: params.DeploymentName,
enableLoggingInReplay: params.EnableLoggingInReplay,
registry: registry,
workflowPanicPolicy: params.WorkflowPanicPolicy,
Expand Down Expand Up @@ -1904,8 +1906,9 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
SdkVersion: eventHandler.getNewSdkVersionAndReset(),
},
WorkerVersionStamp: &commonpb.WorkerVersionStamp{
BuildId: wth.workerBuildID,
UseVersioning: wth.useBuildIDForVersioning,
BuildId: wth.workerBuildID,
UseVersioning: wth.useBuildIDForVersioning,
DeploymentName: wth.deploymentName,
},
}
if wth.capabilities != nil && wth.capabilities.BuildIdBasedVersioning {
Expand Down Expand Up @@ -1961,8 +1964,9 @@ func newActivityTaskHandlerWithCustomProvider(
defaultHeartbeatThrottleInterval: params.DefaultHeartbeatThrottleInterval,
maxHeartbeatThrottleInterval: params.MaxHeartbeatThrottleInterval,
versionStamp: &commonpb.WorkerVersionStamp{
BuildId: params.getBuildID(),
UseVersioning: params.UseBuildIDForVersioning,
BuildId: params.getBuildID(),
UseVersioning: params.UseBuildIDForVersioning,
DeploymentName: params.DeploymentName,
},
}
}
Expand Down
21 changes: 14 additions & 7 deletions internal/internal_task_pollers.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ type (
workerBuildID string
// Whether the worker has opted in to the build-id based versioning feature
useBuildIDVersioning bool
// The worker's deployment name, an identifier in versioning-3 to group Task Queues for a given build ID
deploymentName string
// Server's capabilities
capabilities *workflowservice.GetSystemInfoResponse_Capabilities
}
Expand Down Expand Up @@ -289,6 +291,7 @@ func newWorkflowTaskPoller(
stopC: params.WorkerStopChannel,
workerBuildID: params.getBuildID(),
useBuildIDVersioning: params.UseBuildIDForVersioning,
deploymentName: params.DeploymentName,
capabilities: params.capabilities,
},
service: service,
Expand Down Expand Up @@ -562,8 +565,9 @@ func (wtp *workflowTaskPoller) errorToFailWorkflowTask(taskToken []byte, err err
BinaryChecksum: wtp.workerBuildID,
Namespace: wtp.namespace,
WorkerVersion: &commonpb.WorkerVersionStamp{
BuildId: wtp.workerBuildID,
UseVersioning: wtp.useBuildIDVersioning,
BuildId: wtp.workerBuildID,
UseVersioning: wtp.useBuildIDVersioning,
DeploymentName: wtp.deploymentName,
},
}

Expand Down Expand Up @@ -798,8 +802,9 @@ func (wtp *workflowTaskPoller) getNextPollRequest() (request *workflowservice.Po
Identity: wtp.identity,
BinaryChecksum: wtp.workerBuildID,
WorkerVersionCapabilities: &commonpb.WorkerVersionCapabilities{
BuildId: wtp.workerBuildID,
UseVersioning: wtp.useBuildIDVersioning,
BuildId: wtp.workerBuildID,
UseVersioning: wtp.useBuildIDVersioning,
DeploymentName: wtp.deploymentName,
},
}
if wtp.getCapabilities().BuildIdBasedVersioning {
Expand Down Expand Up @@ -953,7 +958,7 @@ func newGetHistoryPageFunc(
// a new workflow task or the server looses the workflow task if it is a speculative workflow task. In either
// case, the new workflow task could have events that are beyond the last event ID that the SDK expects to process.
// In such cases, the SDK should return error indicating that the workflow task is stale since the result will not be used.
if size > 0 && lastEventID > 0 &&
if size > 0 && lastEventID > 0 &&
h.Events[size-1].GetEventId() > lastEventID {
return nil, nil, fmt.Errorf("history contains events past expected last event ID (%v) "+
"likely this means the current workflow task is no longer valid", lastEventID)
Expand All @@ -971,6 +976,7 @@ func newActivityTaskPoller(taskHandler ActivityTaskHandler, service workflowserv
stopC: params.WorkerStopChannel,
workerBuildID: params.getBuildID(),
useBuildIDVersioning: params.UseBuildIDForVersioning,
deploymentName: params.DeploymentName,
capabilities: params.capabilities,
},
taskHandler: taskHandler,
Expand Down Expand Up @@ -1003,8 +1009,9 @@ func (atp *activityTaskPoller) poll(ctx context.Context) (taskForWorker, error)
Identity: atp.identity,
TaskQueueMetadata: &taskqueuepb.TaskQueueMetadata{MaxTasksPerSecond: wrapperspb.Double(atp.activitiesPerSecond)},
WorkerVersionCapabilities: &commonpb.WorkerVersionCapabilities{
BuildId: atp.workerBuildID,
UseVersioning: atp.useBuildIDVersioning,
BuildId: atp.workerBuildID,
UseVersioning: atp.useBuildIDVersioning,
DeploymentName: atp.deploymentName,
},
}

Expand Down
7 changes: 5 additions & 2 deletions internal/internal_versioning_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ type (
BuildID string
// Whether the worker is using the versioning feature.
UseVersioning bool
// An identifier to group task queues based on Build ID.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very low-level description. The point of a deployment is to abstract the task queues.
I would say something like "A Deployment corresponds to a set of Worker instances you want deployed together. For example, it may represent a Worker service. Each time you deploy, you'll be changing the build ID for that deployment."

DeploymentName string
}

// TaskQueuePollerInfo provides information about a worker/client polling a task queue.
Expand Down Expand Up @@ -243,8 +245,9 @@ func workerVersionCapabilitiesFromResponse(response *common.WorkerVersionCapabil
}

return &WorkerVersionCapabilities{
BuildID: response.GetBuildId(),
UseVersioning: response.GetUseVersioning(),
BuildID: response.GetBuildId(),
UseVersioning: response.GetUseVersioning(),
DeploymentName: response.GetDeploymentName(),
}
}

Expand Down
4 changes: 2 additions & 2 deletions internal/internal_versioning_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func Test_TaskQueueDescription_fromProtoResponse(t *testing.T) {
TypesInfo: map[int32]*taskqueuepb.TaskQueueTypeInfo{
int32(enumspb.TASK_QUEUE_TYPE_WORKFLOW): {
Pollers: []*taskqueuepb.PollerInfo{
{LastAccessTime: nowProto, Identity: "me", RatePerSecond: 3.0, WorkerVersionCapabilities: &common.WorkerVersionCapabilities{BuildId: "1.0", UseVersioning: true}},
{LastAccessTime: nowProto, Identity: "me", RatePerSecond: 3.0, WorkerVersionCapabilities: &common.WorkerVersionCapabilities{BuildId: "1.0", UseVersioning: true, DeploymentName: "prod1"}},
},
},
},
Expand All @@ -108,7 +108,7 @@ func Test_TaskQueueDescription_fromProtoResponse(t *testing.T) {
TypesInfo: map[TaskQueueType]TaskQueueTypeInfo{
TaskQueueTypeWorkflow: {
Pollers: []TaskQueuePollerInfo{
{LastAccessTime: now, Identity: "me", RatePerSecond: 3.0, WorkerVersionCapabilities: &WorkerVersionCapabilities{BuildID: "1.0", UseVersioning: true}},
{LastAccessTime: now, Identity: "me", RatePerSecond: 3.0, WorkerVersionCapabilities: &WorkerVersionCapabilities{BuildID: "1.0", UseVersioning: true, DeploymentName: "prod1"}},
},
},
},
Expand Down
3 changes: 3 additions & 0 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ type (
WorkerBuildID string
// If true the worker is opting in to build ID based versioning.
UseBuildIDForVersioning bool
// The worker's deployment name, an identifier in versioning-3 to group Task Queues for a given build ID.
DeploymentName string

MetricsHandler metrics.Handler

Expand Down Expand Up @@ -1665,6 +1667,7 @@ func NewAggregatedWorker(client *WorkflowClient, taskQueue string, options Worke
Identity: client.identity,
WorkerBuildID: options.BuildID,
UseBuildIDForVersioning: options.UseBuildIDForVersioning,
DeploymentName: options.DeploymentName,
MetricsHandler: client.metricsHandler.WithTags(metrics.TaskQueueTags(taskQueue)),
Logger: client.logger,
EnableLoggingInReplay: options.EnableLoggingInReplay,
Expand Down
6 changes: 6 additions & 0 deletions internal/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,12 @@ type (
// Note: Cannot be enabled at the same time as EnableSessionWorker
UseBuildIDForVersioning bool

// Optional: Assign a Deployment Name to this worker, an identifier for Worker Versioning that
// groups task queues for the given BuildID.
// NOTE: Experimental
// NOTE: Both BuildID and UseBuildIDForVersioning need to also be set to enable the new Worker Versioning-3 feature.
DeploymentName string

// Optional: If set, use a custom tuner for this worker. See WorkerTuner for more.
// Mutually exclusive with MaxConcurrentWorkflowTaskExecutionSize,
// MaxConcurrentActivityExecutionSize, and MaxConcurrentLocalActivityExecutionSize.
Expand Down
45 changes: 45 additions & 0 deletions test/worker_versioning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,51 @@ func (ts *WorkerVersioningTestSuite) TestReachabilityUnversionedWorkerWithRules(
ts.Equal(false, taskQueueTypeInfo.Pollers[0].WorkerVersionCapabilities.UseVersioning)
}

func (ts *WorkerVersioningTestSuite) TestDeploymentNameWorker() {
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
defer cancel()

worker1 := worker.New(ts.client, ts.taskQueueName, worker.Options{
Identity: "worker1",
BuildID: "b1",
UseBuildIDForVersioning: false,
DeploymentName: "deploy1",
})
ts.workflows.register(worker1)
ts.NoError(worker1.Start())
defer worker1.Stop()

// Give time for worker pollers stats to show up
time.Sleep(2 * time.Second)

taskQueueInfo, err := ts.client.DescribeTaskQueueEnhanced(ctx, client.DescribeTaskQueueEnhancedOptions{
TaskQueue: ts.taskQueueName,
Versions: &client.TaskQueueVersionSelection{
// `client.UnversionedBuildID` is an empty string
BuildIDs: []string{client.UnversionedBuildID},
},
TaskQueueTypes: []client.TaskQueueType{
client.TaskQueueTypeWorkflow,
},
ReportPollers: true,
ReportTaskReachability: true,
})
ts.NoError(err)
ts.Equal(1, len(taskQueueInfo.VersionsInfo))

taskQueueVersionInfo, ok := taskQueueInfo.VersionsInfo[client.UnversionedBuildID]
ts.True(ok)
ts.Equal(client.BuildIDTaskReachability(client.BuildIDTaskReachabilityReachable), taskQueueVersionInfo.TaskReachability)

ts.Equal(1, len(taskQueueVersionInfo.TypesInfo))
taskQueueTypeInfo, ok := taskQueueVersionInfo.TypesInfo[client.TaskQueueTypeWorkflow]
ts.True(ok)
ts.True(len(taskQueueTypeInfo.Pollers) > 0)
ts.Equal("worker1", taskQueueTypeInfo.Pollers[0].Identity)
ts.Equal(false, taskQueueTypeInfo.Pollers[0].WorkerVersionCapabilities.UseVersioning)
ts.Equal("deploy1", taskQueueTypeInfo.Pollers[0].WorkerVersionCapabilities.DeploymentName)
}

func (ts *WorkerVersioningTestSuite) TestReachabilityVersions() {
// Skip this test because it is flaky with server 1.25.0, versioning api is also actively undergoing changes
ts.T().SkipNow()
Expand Down
Loading