Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support annotations for versioning behavior #1692

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 44 additions & 35 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,24 +130,25 @@ type (

// workflowTaskHandlerImpl is the implementation of WorkflowTaskHandler
workflowTaskHandlerImpl struct {
namespace string
metricsHandler metrics.Handler
ppMgr pressurePointMgr
logger log.Logger
identity string
workerBuildID string
useBuildIDForVersioning bool
deploymentName string
enableLoggingInReplay bool
registry *registry
laTunnel *localActivityTunnel
workflowPanicPolicy WorkflowPanicPolicy
dataConverter converter.DataConverter
failureConverter converter.FailureConverter
contextPropagators []ContextPropagator
cache *WorkerCache
deadlockDetectionTimeout time.Duration
capabilities *workflowservice.GetSystemInfoResponse_Capabilities
namespace string
metricsHandler metrics.Handler
ppMgr pressurePointMgr
logger log.Logger
identity string
workerBuildID string
useBuildIDForVersioning bool
deploymentName string
defaultVersioningBehavior VersioningBehavior
enableLoggingInReplay bool
registry *registry
laTunnel *localActivityTunnel
workflowPanicPolicy WorkflowPanicPolicy
dataConverter converter.DataConverter
failureConverter converter.FailureConverter
contextPropagators []ContextPropagator
cache *WorkerCache
deadlockDetectionTimeout time.Duration
capabilities *workflowservice.GetSystemInfoResponse_Capabilities
}

activityProvider func(name string) activity
Expand Down Expand Up @@ -550,23 +551,24 @@ func inferMessageFromAcceptedEvent(attrs *historypb.WorkflowExecutionUpdateAccep
func newWorkflowTaskHandler(params workerExecutionParameters, ppMgr pressurePointMgr, registry *registry) WorkflowTaskHandler {
ensureRequiredParams(&params)
return &workflowTaskHandlerImpl{
namespace: params.Namespace,
logger: params.Logger,
ppMgr: ppMgr,
metricsHandler: params.MetricsHandler,
identity: params.Identity,
workerBuildID: params.getBuildID(),
useBuildIDForVersioning: params.UseBuildIDForVersioning,
deploymentName: params.DeploymentName,
enableLoggingInReplay: params.EnableLoggingInReplay,
registry: registry,
workflowPanicPolicy: params.WorkflowPanicPolicy,
dataConverter: params.DataConverter,
failureConverter: params.FailureConverter,
contextPropagators: params.ContextPropagators,
cache: params.cache,
deadlockDetectionTimeout: params.DeadlockDetectionTimeout,
capabilities: params.capabilities,
namespace: params.Namespace,
logger: params.Logger,
ppMgr: ppMgr,
metricsHandler: params.MetricsHandler,
identity: params.Identity,
workerBuildID: params.getBuildID(),
useBuildIDForVersioning: params.UseBuildIDForVersioning,
deploymentName: params.DeploymentName,
defaultVersioningBehavior: params.DefaultVersioningBehavior,
enableLoggingInReplay: params.EnableLoggingInReplay,
registry: registry,
workflowPanicPolicy: params.WorkflowPanicPolicy,
dataConverter: params.DataConverter,
failureConverter: params.FailureConverter,
contextPropagators: params.ContextPropagators,
cache: params.cache,
deadlockDetectionTimeout: params.DeadlockDetectionTimeout,
capabilities: params.capabilities,
}
}

Expand Down Expand Up @@ -1914,6 +1916,13 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
if wth.capabilities != nil && wth.capabilities.BuildIdBasedVersioning {
builtRequest.BinaryChecksum = ""
}
if wth.useBuildIDForVersioning && wth.deploymentName != "" {
if workflowContext.workflowInfo.currentVersioningBehavior != VersioningBehaviorUnspecified {
builtRequest.VersioningBehavior = versioningBehaviorToProto(workflowContext.workflowInfo.currentVersioningBehavior)
} else {
builtRequest.VersioningBehavior = versioningBehaviorToProto(wth.defaultVersioningBehavior)
}
}
return builtRequest
}

Expand Down
12 changes: 8 additions & 4 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ type (
// The worker's deployment name, an identifier in versioning-3 to group Task Queues for a given build ID.
DeploymentName string

// The Versioning Behavior for workflows that do not set one in their first task.
DefaultVersioningBehavior VersioningBehavior

MetricsHandler metrics.Handler

Logger log.Logger
Expand Down Expand Up @@ -1621,7 +1624,7 @@ func NewAggregatedWorker(client *WorkflowClient, taskQueue string, options Worke

// Sessions are not currently compatible with worker versioning
// See: https://github.com/temporalio/sdk-go/issues/1227
if options.EnableSessionWorker && options.UseBuildIDForVersioning {
if options.EnableSessionWorker && options.DeploymentOptions.UseBuildIDForVersioning {
panic("cannot set both EnableSessionWorker and UseBuildIDForVersioning")
}

Expand Down Expand Up @@ -1665,9 +1668,10 @@ func NewAggregatedWorker(client *WorkflowClient, taskQueue string, options Worke
MaxConcurrentWorkflowTaskQueuePollers: options.MaxConcurrentWorkflowTaskPollers,
MaxConcurrentNexusTaskQueuePollers: options.MaxConcurrentNexusTaskPollers,
Identity: client.identity,
WorkerBuildID: options.BuildID,
UseBuildIDForVersioning: options.UseBuildIDForVersioning,
DeploymentName: options.DeploymentName,
WorkerBuildID: options.DeploymentOptions.BuildID,
UseBuildIDForVersioning: options.DeploymentOptions.UseBuildIDForVersioning,
DeploymentName: options.DeploymentOptions.DeploymentName,
DefaultVersioningBehavior: options.DeploymentOptions.DefaultVersioningBehavior,
MetricsHandler: client.metricsHandler.WithTags(metrics.TaskQueueTags(taskQueue)),
Logger: client.logger,
EnableLoggingInReplay: options.EnableLoggingInReplay,
Expand Down
3 changes: 2 additions & 1 deletion internal/internal_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2865,7 +2865,8 @@ func TestWorkerBuildIDAndSessionPanic(t *testing.T) {
var recovered interface{}
func() {
defer func() { recovered = recover() }()
worker := NewAggregatedWorker(&WorkflowClient{}, "some-task-queue", WorkerOptions{EnableSessionWorker: true, UseBuildIDForVersioning: true})
worker := NewAggregatedWorker(&WorkflowClient{}, "some-task-queue", WorkerOptions{EnableSessionWorker: true,
DeploymentOptions: WorkerDeploymentOptions{UseBuildIDForVersioning: true}})
worker.RegisterWorkflow(testReplayWorkflow)
}()
require.Equal(t, "cannot set both EnableSessionWorker and UseBuildIDForVersioning", recovered)
Expand Down
17 changes: 17 additions & 0 deletions internal/internal_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -1611,6 +1611,23 @@ func SetCurrentDetails(ctx Context, details string) {
getWorkflowEnvOptions(ctx).currentDetails = details
}

// SetVersioningBehavior sets the strategy to upgrade this workflow when the default Build ID
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we intentionally omit a getter here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, versioning behavior can be dependent on worker options so if app code branches based on it they will get an NDE. And in general, we wanted to be able to change annotations without needing patching, so no-getter helps there...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the versioning behaviour written into history?

Copy link
Member

@cretz cretz Oct 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need to be able to get something you may have set. I think this should either be in workflow register options and/or this should be called SetVersioningBehaviorOverride (and GetVersioningBehaviorOverride will just return what you have called with the "set", not the resolved value). I assume there are good use cases for setting this do different values throughout the workflow logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Quinn-With-Two-Ns My understanding is that it is not part of the history, at least I do nothing special during replay, @ShahabT can you confirm?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the proto changes that @ShahabT did, the versioning behavior has been added to WorkflowTaskCompletedEventAttributes in history. I don't do anything with it (maybe I should?) but that would give the server a placeholder to record the versioning behavior that I send after each task...

Copy link
Member

@cretz cretz Oct 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, so it is in history, which makes sense if workflow author is expected to update it mid-workflow. I do wonder whether the getter should take that into account. I'll defer to y'all on desired behavior of the getter, but I do believe we should not have a setter without a getter (even if we have to add caveats to the behavior of the getter).

Copy link

@carlydf carlydf Oct 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless I'm misunderstanding, versioning behavior is more of a workflow worker level thing not workflow run/execution thing.

@cretz In general, I think of versioning behavior as primarily a workflow type level thing, with a workflow worker level default in case it's not specified at the type level (but from a product perspective, we want to encourage explicitly setting it for each workflow type, so that any developer reading the workflow definition can immediately see what the expected behavior is, and keep that in mind when they make changes to the code that could change it from short-running <-> long-running).

Still, there are some limited use-cases for overriding the versioning behavior at the workflow execution level.

  1. Overriding the versioning behavior of an already-running workflow execution (UpdateWorkflowExecutionProperties):

    • User wants to pin a specific workflow execution that had a weird bug and is still being investigated, but they want to keep the workflow type as a whole unpinned because it is still long-running in general. I think our cloud release validation workflow would be an example of this, or other long-running test pipelines.
    • A lot of other versioning behavior change scenarios are likely to be applied for all workflow executions of the same type, in which case the behavior change could be accomplished by changing the annotation in the workflow code, deploying a new build, then using the batch api to manually change the build id of running workflows so that they use the new workflow code and pick up the new annotation.
    • I still think being able to pin a specific workflow execution without pinning the whole type is useful for debugging. Max liked this idea in my DevX presentation on UpdateWorkflowExecutionProperties scenarios.
    • I think overriding a specific wf execution from pinned --> unpinned is much less important, because the use case is less clear. If you want a specific wf execution to run on the current default build id, you can just change it's build id explicitly. Most use cases for moving to unpinned apply across the wf type.
  2. Overriding the versioning behavior of a workflow execution on workflow start:

    • Customers have requested that we provide a way to start a one-off workflow on a specific build id that is not yet the default build. This is the basic functionality they need to set up deployment canaries easily.
    • It seems like this would involve some parameter such as pinned_build_id_override in StartWorkflowOptions. I think it would be nice if this is symmetric to a concept of pinned_build_id_override in UpdateWorkflowExecutionProperties

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The server stores the current build id and versioning behavior in Mutable State and in visibility. The previously-used build ids are also stored in visibility -- I'm not sure if previously-used build ids are also stores in Mutable State.

"getting" the current versioning behavior of a given workflow execution could be done by reading the mutable state or visibility entry.

Querying a bunch of workflow executions for a specific versioning behavior and/or build id combination would be a visibility query (we will use this for reachability).

It seems to me that to support a workflow-execution-level versioning behavior override, the server would need to store not just the (versioning behavior, build id) tuple, but also an is_override (or some other name) bit, that basically means, if is_override, ignore whatever the sdk says about my workflow type and do task routing based on what is currently written in my mutable state as an override. To me it seems like it could be handled on the server side, since that's where the task routing decision is made.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think of versioning behavior as primarily a workflow type level thing, with a workflow worker level default in case it's not specified at the type level (but from a product perspective, we want to encourage explicitly setting it for each workflow type

If this is the case, it also needs to be added to workflow.RegisterOptions.

"getting" the current versioning behavior of a given workflow execution could be done by reading the mutable state or visibility entry.

We don't want to get from server what's not already in history. We just need to make sure any SetWhatever inside a workflow has a corresponding GetWhatever. If this just returns the value that was "set" before in the same workflow code and not the actual server value, that's fine (though it does seem like task complete may have some form of the value). The docs of the getter just need to clarify.

// has changed, and Worker Versioning-3 has been enabled.
//
// NOTE: Experimental
func SetVersioningBehavior(ctx Context, behavior VersioningBehavior) {
env := getWorkflowEnvironment(ctx)
env.WorkflowInfo().currentVersioningBehavior = behavior
}

// GetVersioningBehavior returns the last versioning behavior set with SetVersioningBehavior.
//
// NOTE: Experimental
func GetVersioningBehavior(ctx Context) VersioningBehavior {
env := getWorkflowEnvironment(ctx)
return env.WorkflowInfo().currentVersioningBehavior
}

func getWorkflowMetadata(ctx Context) (*sdk.WorkflowMetadata, error) {
info := GetWorkflowInfo(ctx)
eo := getWorkflowEnvOptions(ctx)
Expand Down
50 changes: 33 additions & 17 deletions internal/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,36 @@ import (
)

type (
// WorkerDeploymentOptions provides configuration to enable Worker Versioning.
// NOTE: Experimental
WorkerDeploymentOptions struct {
// Assign a BuildID to this worker. This replaces the deprecated binary checksum concept,
// and is used to provide a unique identifier for a set of worker code, and is necessary
// to opt in to the Worker Versioning feature. See UseBuildIDForVersioning.
// NOTE: Experimental
BuildID string

// If set, opts this worker into the Worker Versioning feature. It will only
// operate on workflows it claims to be compatible with. You must set BuildID if this flag
// is true.
// NOTE: Experimental
// Note: Cannot be enabled at the same time as WorkerOptions.EnableSessionWorker
UseBuildIDForVersioning bool

// Assign a Deployment Name to this worker, an identifier for Worker Versioning that
// groups task queues for the given BuildID.
// NOTE: Experimental
// NOTE: Both BuildID and UseBuildIDForVersioning need to also be set to enable the new Worker Versioning-3 feature.
DeploymentName string

// Optional: Provides a default Versioning Behavior to workflows that do not set one in their first task
// (see workflow.SetVersioningBehavior).
// NOTE: Experimental
// NOTE: If the Worker Versioning-3 feature is on, and DefaultVersioningBehavior is unspecified,
// workflows that do not set the Versioning Behavior will fail in their first task.
DefaultVersioningBehavior VersioningBehavior
}

// WorkerOptions is used to configure a worker instance.
// The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is
// subjected to change in the future.
Expand Down Expand Up @@ -238,24 +268,10 @@ type (
// workflow or activities.
DisableRegistrationAliasing bool

// Assign a BuildID to this worker. This replaces the deprecated binary checksum concept,
// and is used to provide a unique identifier for a set of worker code, and is necessary
// to opt in to the Worker Versioning feature. See UseBuildIDForVersioning.
// NOTE: Experimental
BuildID string

// Optional: If set, opts this worker into the Worker Versioning feature. It will only
// operate on workflows it claims to be compatible with. You must set BuildID if this flag
// is true.
// NOTE: Experimental
// Note: Cannot be enabled at the same time as EnableSessionWorker
UseBuildIDForVersioning bool

// Optional: Assign a Deployment Name to this worker, an identifier for Worker Versioning that
// groups task queues for the given BuildID.
// Optional: If set, configure Worker Versioning for this worker. See WorkerDeploymentOptions
// for more. This is incompatible with setting EnableSessionWorker.
// NOTE: Experimental
// NOTE: Both BuildID and UseBuildIDForVersioning need to also be set to enable the new Worker Versioning-3 feature.
DeploymentName string
DeploymentOptions WorkerDeploymentOptions

// Optional: If set, use a custom tuner for this worker. See WorkerTuner for more.
// Mutually exclusive with MaxConcurrentWorkflowTaskExecutionSize,
Expand Down
36 changes: 36 additions & 0 deletions internal/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,23 @@ const (
HandlerUnfinishedPolicyAbandon
)

// VersioningBehavior specifies when existing workflows could change their Build ID.
// NOTE: Experimental
type VersioningBehavior int

const (
// Workflow versioning policy unknown. A default VersioningBehaviorUnspecified policy forces
// every workflow to explicitly set a VersioningBehavior different from VersioningBehaviorUnspecified.
VersioningBehaviorUnspecified VersioningBehavior = iota
antlai-temporal marked this conversation as resolved.
Show resolved Hide resolved

// Workflow should be pinned to the current Build ID until manually moved.
VersioningBehaviorPinned

// Workflow automatically moves to the latest version (default Build ID of the task queue)
// when the next task is dispatched.
VersioningBehaviorAutoUpgrade
)

var (
errWorkflowIDNotSet = errors.New("workflowId is not set")
errLocalActivityParamsBadRequest = errors.New("missing local activity parameters through context, check LocalActivityOptions")
Expand Down Expand Up @@ -1171,6 +1188,12 @@ type WorkflowInfo struct {
// which is currently or about to be executing. If no longer replaying will be set to the ID of
// this worker
currentTaskBuildID string
// currentVersioningBehavior, if not unspecified, sets the strategy to upgrade
// this workflow when the default Build ID
// has changed, and Worker Versioning-3 has been enabled. Otherwise, the current Worker
// option DefaultVersioningBehavior will be used instead.
// NOTE: Experimental
currentVersioningBehavior VersioningBehavior

continueAsNewSuggested bool
currentHistorySize int
Expand Down Expand Up @@ -2531,3 +2554,16 @@ func (wc *workflowEnvironmentInterceptor) ExecuteNexusOperation(ctx Context, inp
func (wc *workflowEnvironmentInterceptor) RequestCancelNexusOperation(ctx Context, input RequestCancelNexusOperationInput) {
wc.env.RequestCancelNexusOperation(input.seq)
}

func versioningBehaviorToProto(t VersioningBehavior) enumspb.VersioningBehavior {
switch t {
case VersioningBehaviorUnspecified:
return enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED
case VersioningBehaviorPinned:
return enumspb.VERSIONING_BEHAVIOR_PINNED
case VersioningBehaviorAutoUpgrade:
return enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE
default:
panic("unknown versioning behavior type")
}
}
Loading
Loading