From 1f0296cdf9d7655269beed439ad1769dd8040654 Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Fri, 26 Jul 2024 09:16:08 -0700 Subject: [PATCH] Support for WorkflowIdConflictPolicy (#1563) --- internal/client.go | 9 +++- internal/internal_workflow.go | 1 + internal/internal_workflow_client.go | 3 ++ test/integration_test.go | 72 ++++++++++++++++++++++++++++ test/workflow_test.go | 9 ++++ 5 files changed, 92 insertions(+), 2 deletions(-) diff --git a/internal/client.go b/internal/client.go index 345e82a09..5d53dfaef 100644 --- a/internal/client.go +++ b/internal/client.go @@ -633,11 +633,16 @@ type ( // Optional: defaulted to 10 secs. WorkflowTaskTimeout time.Duration - // WorkflowIDReusePolicy - Whether server allow reuse of workflow ID, can be useful - // for dedupe logic if set to RejectDuplicate. + // WorkflowIDReusePolicy - Specifies server behavior if a *completed* workflow with the same id exists. + // This can be useful for dedupe logic if set to RejectDuplicate // Optional: defaulted to AllowDuplicate. WorkflowIDReusePolicy enumspb.WorkflowIdReusePolicy + // WorkflowIDConflictPolicy - Specifies server behavior if a *running* workflow with the same id exists. + // This cannot be set if WorkflowIDReusePolicy is set to TerminateIfRunning. + // Optional: defaulted to Fail. + WorkflowIDConflictPolicy enumspb.WorkflowIdConflictPolicy + // When WorkflowExecutionErrorWhenAlreadyStarted is true, Client.ExecuteWorkflow will return an error if the // workflow id has already been used and WorkflowIDReusePolicy would disallow a re-run. If it is set to false, // rather than erroring a WorkflowRun instance representing the current or last run will be returned. diff --git a/internal/internal_workflow.go b/internal/internal_workflow.go index 106a58a31..ec7a2b6f9 100644 --- a/internal/internal_workflow.go +++ b/internal/internal_workflow.go @@ -211,6 +211,7 @@ type ( WorkflowID string WaitForCancellation bool WorkflowIDReusePolicy enumspb.WorkflowIdReusePolicy + WorkflowIDConflictPolicy enumspb.WorkflowIdConflictPolicy DataConverter converter.DataConverter RetryPolicy *commonpb.RetryPolicy CronSchedule string diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index cb68c0a79..90aa85dbf 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -52,6 +52,7 @@ import ( taskqueuepb "go.temporal.io/api/taskqueue/v1" updatepb "go.temporal.io/api/update/v1" "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/sdk/converter" "go.temporal.io/sdk/internal/common/metrics" "go.temporal.io/sdk/internal/common/retry" @@ -1592,6 +1593,7 @@ func (w *workflowClientInterceptor) ExecuteWorkflow( WorkflowTaskTimeout: durationpb.New(workflowTaskTimeout), Identity: w.client.identity, WorkflowIdReusePolicy: in.Options.WorkflowIDReusePolicy, + WorkflowIdConflictPolicy: in.Options.WorkflowIDConflictPolicy, RetryPolicy: convertToPBRetryPolicy(in.Options.RetryPolicy), CronSchedule: in.Options.CronSchedule, Memo: memo, @@ -1745,6 +1747,7 @@ func (w *workflowClientInterceptor) SignalWithStartWorkflow( Memo: memo, SearchAttributes: searchAttr, WorkflowIdReusePolicy: in.Options.WorkflowIDReusePolicy, + WorkflowIdConflictPolicy: in.Options.WorkflowIDConflictPolicy, Header: header, } diff --git a/test/integration_test.go b/test/integration_test.go index 73da72101..f8fce9011 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -949,6 +949,43 @@ func (ts *IntegrationTestSuite) TestWorkflowIDReuseIgnoreDuplicateWhileRunning() ts.NotEqual(run1.GetRunID(), run3.GetRunID()) } +func (ts *IntegrationTestSuite) TestWorkflowIDConflictPolicy() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + opts := ts.startWorkflowOptions("test-workflowidconflict-" + uuid.New()) + opts.WorkflowExecutionErrorWhenAlreadyStarted = true + + var alreadyStartedErr *serviceerror.WorkflowExecutionAlreadyStarted + + // Start a workflow + run1, err := ts.client.ExecuteWorkflow(ctx, opts, ts.workflows.IDConflictPolicy) + ts.NoError(err) + + // Confirm another fails by default + _, err = ts.client.ExecuteWorkflow(ctx, opts, ts.workflows.IDConflictPolicy) + ts.ErrorAs(err, &alreadyStartedErr) + + // Confirm fails if explicitly given that option + opts.WorkflowIDConflictPolicy = enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL + _, err = ts.client.ExecuteWorkflow(ctx, opts, ts.workflows.IDConflictPolicy) + ts.ErrorAs(err, &alreadyStartedErr) + + // Confirm gives back same WorkflowRun if requested + opts.WorkflowIDConflictPolicy = enumspb.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING + run2, err := ts.client.ExecuteWorkflow(ctx, opts, ts.workflows.IDConflictPolicy) + ts.Equal(run1.GetRunID(), run2.GetRunID()) + + // Confirm terminates and starts new if requested + opts.WorkflowIDConflictPolicy = enumspb.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING + run3, err := ts.client.ExecuteWorkflow(ctx, opts, ts.workflows.IDConflictPolicy) + ts.NotEqual(run1.GetRunID(), run3.GetRunID()) + + statusRun1, err := ts.client.DescribeWorkflowExecution(ctx, run1.GetID(), run1.GetRunID()) + ts.NoError(err) + ts.Equal(statusRun1.WorkflowExecutionInfo.Status, enumspb.WORKFLOW_EXECUTION_STATUS_TERMINATED) +} + func (ts *IntegrationTestSuite) TestChildWFWithRetryPolicy_ShortLived() { ts.testChildWFWithRetryPolicy(ts.workflows.ChildWorkflowWithRetryPolicy, 0) } @@ -1919,6 +1956,41 @@ func (ts *IntegrationTestSuite) TestStartDelaySignalWithStart() { ts.Equal(5*time.Second, event.GetWorkflowExecutionStartedEventAttributes().GetFirstWorkflowTaskBackoff().AsDuration()) } +func (ts *IntegrationTestSuite) TestSignalWithStartIdConflictPolicy() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var invalidArgErr *serviceerror.InvalidArgument + opts := ts.startWorkflowOptions("test-signalwithstart-workflowidconflict-" + uuid.New()) + + // Start a workflow + run1, err := ts.client.SignalWithStartWorkflow(ctx, opts.ID, "signal", true, opts, ts.workflows.IDConflictPolicy) + ts.NoError(err) + + // Confirm gives back same WorkflowRun by default + run2, err := ts.client.SignalWithStartWorkflow(ctx, opts.ID, "signal", true, opts, ts.workflows.IDConflictPolicy) + ts.Equal(run1.GetRunID(), run2.GetRunID()) + + // Confirm gives back same WorkflowRun if requested explicitly + opts.WorkflowIDConflictPolicy = enumspb.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING + run3, err := ts.client.SignalWithStartWorkflow(ctx, opts.ID, "signal", true, opts, ts.workflows.IDConflictPolicy) + ts.Equal(run1.GetRunID(), run3.GetRunID()) + + // Confirm policy to fail is invalid + opts.WorkflowIDConflictPolicy = enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL + _, err = ts.client.SignalWithStartWorkflow(ctx, opts.ID, "signal", true, opts, ts.workflows.IDConflictPolicy) + ts.ErrorAs(err, &invalidArgErr) + + // Confirm terminates and starts new if requested + opts.WorkflowIDConflictPolicy = enumspb.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING + run4, err := ts.client.SignalWithStartWorkflow(ctx, opts.ID, "signal", true, opts, ts.workflows.IDConflictPolicy) + ts.NotEqual(run1.GetRunID(), run4.GetRunID()) + + statusRun1, err := ts.client.DescribeWorkflowExecution(ctx, run1.GetID(), run1.GetRunID()) + ts.NoError(err) + ts.Equal(statusRun1.WorkflowExecutionInfo.Status, enumspb.WORKFLOW_EXECUTION_STATUS_TERMINATED) +} + func (ts *IntegrationTestSuite) TestResetWorkflowExecution() { var originalResult []string err := ts.executeWorkflow("basic-reset-workflow-execution", ts.workflows.Basic, &originalResult) diff --git a/test/workflow_test.go b/test/workflow_test.go index 9bc3007cf..46d4599ef 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -38,6 +38,7 @@ import ( commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" + "go.temporal.io/sdk/converter" "go.temporal.io/sdk/internal" "go.temporal.io/sdk/temporal" @@ -644,6 +645,13 @@ func (w *Workflows) IDReusePolicy( return ans1 + ans2, nil } +func (w *Workflows) IDConflictPolicy( + ctx workflow.Context, +) error { + workflow.Await(ctx, func() bool { return false }) + return nil +} + func (w *Workflows) ChildWorkflowWithRetryPolicy(ctx workflow.Context, expectedMaximumAttempts int, iterations int) error { return w.childWorkflowWithRetryPolicy(ctx, w.childWithRetryPolicy, expectedMaximumAttempts, iterations) } @@ -3063,6 +3071,7 @@ func (w *Workflows) register(worker worker.Worker) { worker.RegisterWorkflow(w.ContinueAsNewWithRetryPolicy) worker.RegisterWorkflow(w.ContinueAsNewWithChildWF) worker.RegisterWorkflow(w.IDReusePolicy) + worker.RegisterWorkflow(w.IDConflictPolicy) worker.RegisterWorkflow(w.InspectActivityInfo) worker.RegisterWorkflow(w.InspectLocalActivityInfo) worker.RegisterWorkflow(w.LargeQueryResultWorkflow)