Skip to content

Commit

Permalink
Verify callbacks succeeded only after all workflows are complete
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Jan 17, 2024
1 parent 0d04510 commit cf64b2c
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 37 deletions.
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a
github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.5.3
github.com/pkg/errors v0.8.1
github.com/pborman/uuid v1.2.1
github.com/prometheus/client_golang v1.16.0
github.com/spf13/cobra v1.7.0
github.com/spf13/pflag v1.0.5
Expand Down Expand Up @@ -41,7 +41,6 @@ require (
github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/otiai10/copy v1.14.0 // indirect
github.com/pborman/uuid v1.2.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/robfig/cron v1.2.0 // indirect
github.com/stretchr/objx v0.5.1 // indirect
Expand Down
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ github.com/otiai10/mint v1.5.1 h1:XaPLeE+9vGbuyEHem1JNk3bYc7KKqyI/na0/mLd/Kks=
github.com/otiai10/mint v1.5.1/go.mod h1:MJm72SBthJjz8qhefc4z1PYEieWmy8Bku7CjcAqyUSM=
github.com/pborman/uuid v1.2.1 h1:+ZZIw58t/ozdjRaXh/3awHfmWRbzYxJoAdNJxe/3pvw=
github.com/pborman/uuid v1.2.1/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand Down
54 changes: 38 additions & 16 deletions scenarios/completion_callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/facebookgo/clock"
"github.com/pborman/uuid"
"github.com/pkg/errors"
"github.com/temporalio/omes/common"
"github.com/temporalio/omes/loadgen"
"github.com/temporalio/omes/loadgen/kitchensink"
Expand Down Expand Up @@ -116,7 +115,7 @@ func init() {
// The `u` parameter should be a uniform random number in [0, 1).
func ExponentialSample(n int, lambda float64, u float64) (int, error) {
if u < 0 || u >= 1 {
return 0, errors.Errorf("u must be in [0, 1)")
return 0, fmt.Errorf("u must be in [0, 1)")
}
totalProbability := 1 - math.Exp(-lambda*float64(n))
for i := 1; i < n; i++ {
Expand All @@ -139,16 +138,42 @@ func RunCompletionCallbackScenario(
if err := validateOptions(opts); err != nil {
return err
}
results := make(chan *completionCallbackScenarioIterationResult)
l := &loadgen.GenericExecutor{
Execute: func(ctx context.Context, run *loadgen.Run) error {
res, err := runIteration(ctx, opts, run.DefaultStartWorkflowOptions())
res, err := runWorkflow(ctx, opts, run.DefaultStartWorkflowOptions())
if err != nil {
return err
}
return verifyCallbackSucceeded(ctx, opts, res.WorkflowID, res.RunID, res.URL)
opts.Logger.Debugw("Workflow finished", "url", res.URL.String())
results <- res
return nil
},
}
return l.Run(ctx, info)
numErrs := 0
verificationDone := make(chan struct{})
go func() {
for res := range results {
opts.Logger.Debugw("Verifying callback succeeded", "url", res.URL.String())
err := verifyCallbackSucceeded(ctx, opts, res.WorkflowID, res.RunID, res.URL)
if err != nil {
numErrs++
opts.Logger.Errorw("Callback verification failed", "url", res.URL.String(), "error", err)
} else {
opts.Logger.Debugw("Callback succeeded", "url", res.URL.String())
}
}
close(verificationDone)
}()
if err := l.Run(ctx, info); err != nil {
return fmt.Errorf("run workflows: %w", err)
}
close(results)
<-verificationDone
if numErrs > 0 {
return fmt.Errorf("%d callbacks failed", numErrs)
}
return nil
}

func (completionCallbackScenarioExecutor) Run(ctx context.Context, info loadgen.ScenarioInfo) error {
Expand All @@ -160,7 +185,7 @@ func (completionCallbackScenarioExecutor) Run(ctx context.Context, info loadgen.
return RunCompletionCallbackScenario(ctx, opts, info)
}

func runIteration(
func runWorkflow(
ctx context.Context,
scenarioOptions *CompletionCallbackScenarioOptions,
startWorkflowOptions client.StartWorkflowOptions,
Expand All @@ -171,8 +196,6 @@ func runIteration(
return nil, err
}

scenarioOptions.Logger.Debugw("Using callback URL", "url", u.String())

if scenarioOptions.DryRun {
return nil, nil
}
Expand All @@ -193,11 +216,11 @@ func runIteration(
}
workflowRun, err := scenarioOptions.SdkClient.ExecuteWorkflow(ctx, startWorkflowOptions, common.WorkflowNameKitchenSink, input)
if err != nil {
return nil, errors.Wrap(err, "failed to execute workflow")
return nil, fmt.Errorf("iteration start workflow: %w", err)
}
err = workflowRun.Get(ctx, nil)
if err != nil {
return nil, errors.Wrap(err, "failed to get workflow result")
return nil, fmt.Errorf("iteration wait for workflow: %w", err)
}

return &completionCallbackScenarioIterationResult{
Expand All @@ -212,29 +235,28 @@ func verifyCallbackSucceeded(ctx context.Context, options *CompletionCallbackSce
for {
execution, err := options.SdkClient.DescribeWorkflowExecution(ctx, workflowID, runID)
if err != nil {
return errors.Wrap(err, "failed to describe workflow")
return fmt.Errorf("verify callback succeeded describe workflow: %w", err)
}
callbacks := execution.Callbacks
if len(callbacks) != 1 {
callbacksString := ""
for i, callback := range callbacks {
callbacksString += fmt.Sprintf("%d: %t: %+v\n", i, callback == nil, callback)
}
return errors.Errorf("expected 1 callback, got %d: %s", len(callbacks), callbacksString)
return fmt.Errorf("expected 1 callback, got %d: %s", len(callbacks), callbacksString)
}
callback := callbacks[0]
if callback.State == enums.CALLBACK_STATE_SUCCEEDED {
if callback.Callback.GetNexus().Url != u.String() {
return errors.Errorf("expected callback URL %q, got %q", u.String(), callback.Callback.GetNexus().Url)
return fmt.Errorf("expected callback URL %q, got %q", u.String(), callback.Callback.GetNexus().Url)
}
return nil
}
if callback.State == enums.CALLBACK_STATE_BACKING_OFF {
options.Logger.Infow("Callback backing off", "failure", callback.LastAttemptFailure)
options.Logger.Debugw("Callback backing off", "failure", callback.LastAttemptFailure)
}
if callback.State == enums.CALLBACK_STATE_FAILED {
options.Logger.Errorw("Callback failed", "failure", callback.LastAttemptFailure)
return errors.New("one or more callbacks failed")
return fmt.Errorf("callback failed: %+v", callback.LastAttemptFailure)
}
timer := options.Clock.Timer(retryDelay)
select {
Expand Down
41 changes: 23 additions & 18 deletions scenarios/completion_callbacks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,16 @@ func TestCompletionCallbackScenario_Run(t *testing.T) {
}).Return(workflowRun, nil)

// First call to DescribeWorkflowExecution returns a backing off callback.
sdkClient.On("DescribeWorkflowExecution", mock.Anything, mock.Anything, mock.Anything).Return(&workflowservice.DescribeWorkflowExecutionResponse{
Callbacks: []*workflow.CallbackInfo{
{
State: enums.CALLBACK_STATE_BACKING_OFF,
sdkClient.On("DescribeWorkflowExecution", mock.Anything, mock.Anything, mock.Anything).Return(
&workflowservice.DescribeWorkflowExecutionResponse{
Callbacks: []*workflow.CallbackInfo{
{
State: enums.CALLBACK_STATE_BACKING_OFF,
},
},
},
}, nil).Times(1)
nil,
).Times(1)

// Advance the clock so that we retry after 1 timer.
clkDone := make(chan struct{})
Expand All @@ -188,24 +191,26 @@ func TestCompletionCallbackScenario_Run(t *testing.T) {
}()

// Second call to DescribeWorkflowExecution returns a succeeded callback.
sdkClient.On("DescribeWorkflowExecution", mock.Anything, mock.Anything, mock.Anything).Return(&workflowservice.DescribeWorkflowExecutionResponse{
Callbacks: []*workflow.CallbackInfo{
{
State: enums.CALLBACK_STATE_SUCCEEDED,
Callback: &common.Callback{
Variant: &common.Callback_Nexus_{
Nexus: &common.Callback_Nexus{
Url: "http://localhost:1024?delay=0s&failure-probability=0.000000",
sdkClient.On("DescribeWorkflowExecution", mock.Anything, mock.Anything, mock.Anything).Return(
&workflowservice.DescribeWorkflowExecutionResponse{
Callbacks: []*workflow.CallbackInfo{
{
State: enums.CALLBACK_STATE_SUCCEEDED,
Callback: &common.Callback{
Variant: &common.Callback_Nexus_{
Nexus: &common.Callback_Nexus{
Url: "http://localhost:1024?delay=0s&failure-probability=0.000000",
},
},
},
},
},
},
}, nil).Times(1)
nil,
).Times(1)

// Create the scenario.
logger, err := zap.NewDevelopment()
require.NoError(t, err)
logger := zap.NewNop()
opts := &scenarios.CompletionCallbackScenarioOptions{
Logger: logger.Sugar(),
SdkClient: sdkClient,
Expand All @@ -221,8 +226,8 @@ func TestCompletionCallbackScenario_Run(t *testing.T) {
AttachWorkflowID: false,
}

// Run an iteration.
err = scenarios.RunCompletionCallbackScenario(ctx, opts, loadgen.ScenarioInfo{
// Run the scenario.
err := scenarios.RunCompletionCallbackScenario(ctx, opts, loadgen.ScenarioInfo{
MetricsHandler: client.MetricsNopHandler,
Logger: logger.Sugar(),
Configuration: loadgen.RunConfiguration{
Expand Down

0 comments on commit cf64b2c

Please sign in to comment.