From cf64b2ce70dd5c2a87bf84480674a338133c1b8d Mon Sep 17 00:00:00 2001 From: Michael Snowden Date: Wed, 17 Jan 2024 12:19:05 -0800 Subject: [PATCH] Verify callbacks succeeded only after all workflows are complete --- go.mod | 3 +- go.sum | 1 - scenarios/completion_callbacks.go | 54 ++++++++++++++++++-------- scenarios/completion_callbacks_test.go | 41 ++++++++++--------- 4 files changed, 62 insertions(+), 37 deletions(-) diff --git a/go.mod b/go.mod index b647cb6..32b94b0 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a github.com/gogo/protobuf v1.3.2 github.com/golang/protobuf v1.5.3 - github.com/pkg/errors v0.8.1 + github.com/pborman/uuid v1.2.1 github.com/prometheus/client_golang v1.16.0 github.com/spf13/cobra v1.7.0 github.com/spf13/pflag v1.0.5 @@ -41,7 +41,6 @@ require ( github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/otiai10/copy v1.14.0 // indirect - github.com/pborman/uuid v1.2.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/robfig/cron v1.2.0 // indirect github.com/stretchr/objx v0.5.1 // indirect diff --git a/go.sum b/go.sum index c779de4..c9db48a 100644 --- a/go.sum +++ b/go.sum @@ -65,7 +65,6 @@ github.com/otiai10/mint v1.5.1 h1:XaPLeE+9vGbuyEHem1JNk3bYc7KKqyI/na0/mLd/Kks= github.com/otiai10/mint v1.5.1/go.mod h1:MJm72SBthJjz8qhefc4z1PYEieWmy8Bku7CjcAqyUSM= github.com/pborman/uuid v1.2.1 h1:+ZZIw58t/ozdjRaXh/3awHfmWRbzYxJoAdNJxe/3pvw= github.com/pborman/uuid v1.2.1/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= -github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= diff --git a/scenarios/completion_callbacks.go b/scenarios/completion_callbacks.go index a09a088..f5f4f9f 100644 --- a/scenarios/completion_callbacks.go +++ b/scenarios/completion_callbacks.go @@ -10,7 +10,6 @@ import ( "github.com/facebookgo/clock" "github.com/pborman/uuid" - "github.com/pkg/errors" "github.com/temporalio/omes/common" "github.com/temporalio/omes/loadgen" "github.com/temporalio/omes/loadgen/kitchensink" @@ -116,7 +115,7 @@ func init() { // The `u` parameter should be a uniform random number in [0, 1). func ExponentialSample(n int, lambda float64, u float64) (int, error) { if u < 0 || u >= 1 { - return 0, errors.Errorf("u must be in [0, 1)") + return 0, fmt.Errorf("u must be in [0, 1)") } totalProbability := 1 - math.Exp(-lambda*float64(n)) for i := 1; i < n; i++ { @@ -139,16 +138,42 @@ func RunCompletionCallbackScenario( if err := validateOptions(opts); err != nil { return err } + results := make(chan *completionCallbackScenarioIterationResult) l := &loadgen.GenericExecutor{ Execute: func(ctx context.Context, run *loadgen.Run) error { - res, err := runIteration(ctx, opts, run.DefaultStartWorkflowOptions()) + res, err := runWorkflow(ctx, opts, run.DefaultStartWorkflowOptions()) if err != nil { return err } - return verifyCallbackSucceeded(ctx, opts, res.WorkflowID, res.RunID, res.URL) + opts.Logger.Debugw("Workflow finished", "url", res.URL.String()) + results <- res + return nil }, } - return l.Run(ctx, info) + numErrs := 0 + verificationDone := make(chan struct{}) + go func() { + for res := range results { + opts.Logger.Debugw("Verifying callback succeeded", "url", res.URL.String()) + err := verifyCallbackSucceeded(ctx, opts, res.WorkflowID, res.RunID, res.URL) + if err != nil { + numErrs++ + opts.Logger.Errorw("Callback verification failed", "url", res.URL.String(), "error", err) + } else { + opts.Logger.Debugw("Callback succeeded", "url", res.URL.String()) + } + } + close(verificationDone) + }() + if err := l.Run(ctx, info); err != nil { + return fmt.Errorf("run workflows: %w", err) + } + close(results) + <-verificationDone + if numErrs > 0 { + return fmt.Errorf("%d callbacks failed", numErrs) + } + return nil } func (completionCallbackScenarioExecutor) Run(ctx context.Context, info loadgen.ScenarioInfo) error { @@ -160,7 +185,7 @@ func (completionCallbackScenarioExecutor) Run(ctx context.Context, info loadgen. return RunCompletionCallbackScenario(ctx, opts, info) } -func runIteration( +func runWorkflow( ctx context.Context, scenarioOptions *CompletionCallbackScenarioOptions, startWorkflowOptions client.StartWorkflowOptions, @@ -171,8 +196,6 @@ func runIteration( return nil, err } - scenarioOptions.Logger.Debugw("Using callback URL", "url", u.String()) - if scenarioOptions.DryRun { return nil, nil } @@ -193,11 +216,11 @@ func runIteration( } workflowRun, err := scenarioOptions.SdkClient.ExecuteWorkflow(ctx, startWorkflowOptions, common.WorkflowNameKitchenSink, input) if err != nil { - return nil, errors.Wrap(err, "failed to execute workflow") + return nil, fmt.Errorf("iteration start workflow: %w", err) } err = workflowRun.Get(ctx, nil) if err != nil { - return nil, errors.Wrap(err, "failed to get workflow result") + return nil, fmt.Errorf("iteration wait for workflow: %w", err) } return &completionCallbackScenarioIterationResult{ @@ -212,7 +235,7 @@ func verifyCallbackSucceeded(ctx context.Context, options *CompletionCallbackSce for { execution, err := options.SdkClient.DescribeWorkflowExecution(ctx, workflowID, runID) if err != nil { - return errors.Wrap(err, "failed to describe workflow") + return fmt.Errorf("verify callback succeeded describe workflow: %w", err) } callbacks := execution.Callbacks if len(callbacks) != 1 { @@ -220,21 +243,20 @@ func verifyCallbackSucceeded(ctx context.Context, options *CompletionCallbackSce for i, callback := range callbacks { callbacksString += fmt.Sprintf("%d: %t: %+v\n", i, callback == nil, callback) } - return errors.Errorf("expected 1 callback, got %d: %s", len(callbacks), callbacksString) + return fmt.Errorf("expected 1 callback, got %d: %s", len(callbacks), callbacksString) } callback := callbacks[0] if callback.State == enums.CALLBACK_STATE_SUCCEEDED { if callback.Callback.GetNexus().Url != u.String() { - return errors.Errorf("expected callback URL %q, got %q", u.String(), callback.Callback.GetNexus().Url) + return fmt.Errorf("expected callback URL %q, got %q", u.String(), callback.Callback.GetNexus().Url) } return nil } if callback.State == enums.CALLBACK_STATE_BACKING_OFF { - options.Logger.Infow("Callback backing off", "failure", callback.LastAttemptFailure) + options.Logger.Debugw("Callback backing off", "failure", callback.LastAttemptFailure) } if callback.State == enums.CALLBACK_STATE_FAILED { - options.Logger.Errorw("Callback failed", "failure", callback.LastAttemptFailure) - return errors.New("one or more callbacks failed") + return fmt.Errorf("callback failed: %+v", callback.LastAttemptFailure) } timer := options.Clock.Timer(retryDelay) select { diff --git a/scenarios/completion_callbacks_test.go b/scenarios/completion_callbacks_test.go index 669b71e..6404d9f 100644 --- a/scenarios/completion_callbacks_test.go +++ b/scenarios/completion_callbacks_test.go @@ -161,13 +161,16 @@ func TestCompletionCallbackScenario_Run(t *testing.T) { }).Return(workflowRun, nil) // First call to DescribeWorkflowExecution returns a backing off callback. - sdkClient.On("DescribeWorkflowExecution", mock.Anything, mock.Anything, mock.Anything).Return(&workflowservice.DescribeWorkflowExecutionResponse{ - Callbacks: []*workflow.CallbackInfo{ - { - State: enums.CALLBACK_STATE_BACKING_OFF, + sdkClient.On("DescribeWorkflowExecution", mock.Anything, mock.Anything, mock.Anything).Return( + &workflowservice.DescribeWorkflowExecutionResponse{ + Callbacks: []*workflow.CallbackInfo{ + { + State: enums.CALLBACK_STATE_BACKING_OFF, + }, }, }, - }, nil).Times(1) + nil, + ).Times(1) // Advance the clock so that we retry after 1 timer. clkDone := make(chan struct{}) @@ -188,24 +191,26 @@ func TestCompletionCallbackScenario_Run(t *testing.T) { }() // Second call to DescribeWorkflowExecution returns a succeeded callback. - sdkClient.On("DescribeWorkflowExecution", mock.Anything, mock.Anything, mock.Anything).Return(&workflowservice.DescribeWorkflowExecutionResponse{ - Callbacks: []*workflow.CallbackInfo{ - { - State: enums.CALLBACK_STATE_SUCCEEDED, - Callback: &common.Callback{ - Variant: &common.Callback_Nexus_{ - Nexus: &common.Callback_Nexus{ - Url: "http://localhost:1024?delay=0s&failure-probability=0.000000", + sdkClient.On("DescribeWorkflowExecution", mock.Anything, mock.Anything, mock.Anything).Return( + &workflowservice.DescribeWorkflowExecutionResponse{ + Callbacks: []*workflow.CallbackInfo{ + { + State: enums.CALLBACK_STATE_SUCCEEDED, + Callback: &common.Callback{ + Variant: &common.Callback_Nexus_{ + Nexus: &common.Callback_Nexus{ + Url: "http://localhost:1024?delay=0s&failure-probability=0.000000", + }, }, }, }, }, }, - }, nil).Times(1) + nil, + ).Times(1) // Create the scenario. - logger, err := zap.NewDevelopment() - require.NoError(t, err) + logger := zap.NewNop() opts := &scenarios.CompletionCallbackScenarioOptions{ Logger: logger.Sugar(), SdkClient: sdkClient, @@ -221,8 +226,8 @@ func TestCompletionCallbackScenario_Run(t *testing.T) { AttachWorkflowID: false, } - // Run an iteration. - err = scenarios.RunCompletionCallbackScenario(ctx, opts, loadgen.ScenarioInfo{ + // Run the scenario. + err := scenarios.RunCompletionCallbackScenario(ctx, opts, loadgen.ScenarioInfo{ MetricsHandler: client.MetricsNopHandler, Logger: logger.Sugar(), Configuration: loadgen.RunConfiguration{