Skip to content

Commit

Permalink
Add Eager Workflow Start test (#374)
Browse files Browse the repository at this point in the history
  • Loading branch information
antlai-temporal authored Nov 22, 2023
1 parent 6e7388c commit 46d78ca
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 0 deletions.
1 change: 1 addition & 0 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ func (r *Runner) Run(ctx context.Context, patterns []string) error {
ExtraArgs: []string{
"--dynamic-config-value", "system.forceSearchAttributesCacheRefreshOnRead=true",
"--dynamic-config-value", "system.enableActivityEagerExecution=true",
"--dynamic-config-value", "system.enableEagerWorkflowStart=true",
"--dynamic-config-value", "frontend.enableUpdateWorkflowExecution=true",
"--dynamic-config-value", "frontend.enableUpdateWorkflowExecutionAsyncAccepted=true",
},
Expand Down
10 changes: 10 additions & 0 deletions features/eager_workflow/successful_start/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Successful start of a workflow using eager mode
Eager Workflow Start (EWS) is a latency optimization to reduce the time to start processing the first task of a workflow. The starter program provisions a slot in a suitable worker, requests that the server starts a workflow in eager mode, and then, when it receives the first WFT in the response, it directly schedules the task to the worker, eliminating a network roundtrip and a database transaction.

In each scenario, the starter program and the worker should share a client. The starter program will create a simple workflow in eager mode, and then verify that eager mode was actually used, and the first workflow task was processed correctly.

# Detailed spec
* The `EnableEagerStart` start workflow option should be `true`.
* The server response to start workflow should include a non-nil `eager_workflow_task` field.
* The task timeout for the workflow should be large enough to hang the program on a task retry. A server response with an `eager_workflow_task` alone does not guarantee eager execution because the worker could still refuse to process it. In that exceptional case the task would be retried through the non-eager path, and may succeed. A large timeout effectively disables retries, ensuring success always comes from the eager path.
* The simple workflow should return `"Hello World"` and exit without errors.
76 changes: 76 additions & 0 deletions features/eager_workflow/successful_start/feature.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package successful_start

import (
"context"
"fmt"
"sync/atomic"
"time"

"github.com/temporalio/features/harness/go/harness"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/workflow"
"google.golang.org/grpc"
)

const expectedResult = "Hello World"

var numEagerlyStarted atomic.Uint64

var Feature = harness.Feature{
Workflows: Workflow,
StartWorkflowOptions: client.StartWorkflowOptions{EnableEagerStart: true, WorkflowTaskTimeout: 1 * time.Hour},
CheckResult: CheckResult,
ClientOptions: client.Options{
ConnectionOptions: client.ConnectionOptions{
DialOptions: []grpc.DialOption{grpc.WithUnaryInterceptor(EagerDetector(&numEagerlyStarted))},
},
},
}

// A "hello world" workflow
func Workflow(ctx workflow.Context) (string, error) {
return expectedResult, nil
}

func CheckResult(ctx context.Context, runner *harness.Runner, run client.WorkflowRun) error {
var result string
if err := run.Get(ctx, &result); err != nil {
return err
}
if result != expectedResult {
return fmt.Errorf("expected %s, got: %s", expectedResult, result)
}
if numEager := numEagerlyStarted.Load(); numEager != 1 {
// There is no way to check that this dynamic config is enabled in the namespace,
// unless we run this test...
// Instead of failing the test just skip it.
msg := fmt.Sprintf("Enable dynamic config system.enableEagerWorkflowStart=true: numEagerlyStarted=%d", numEager)
return runner.Skip(msg)
}
return nil
}

func EagerDetector(cntEager *atomic.Uint64) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, response interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
request_eager := false
switch o := req.(type) {
case *workflowservice.StartWorkflowExecutionRequest:
request_eager = o.RequestEagerExecution
}

err := invoker(ctx, method, req, response, cc, opts...)
if err != nil {
return err
}

switch o := response.(type) {
case *workflowservice.StartWorkflowExecutionResponse:
if request_eager && o.GetEagerWorkflowTask() != nil {
cntEager.Add(1)
}
}

return nil
}
}
2 changes: 2 additions & 0 deletions features/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
data_converter_json "github.com/temporalio/features/features/data_converter/json"
data_converter_json_protobuf "github.com/temporalio/features/features/data_converter/json_protobuf"
eager_activity_non_remote_activities_worker "github.com/temporalio/features/features/eager_activity/non_remote_activities_worker"
eager_workflow_successful_start "github.com/temporalio/features/features/eager_workflow/successful_start"
query_successful_query "github.com/temporalio/features/features/query/successful_query"
query_timeout_due_to_no_active_workers "github.com/temporalio/features/features/query/timeout_due_to_no_active_workers"
query_unexpected_arguments "github.com/temporalio/features/features/query/unexpected_arguments"
Expand Down Expand Up @@ -72,6 +73,7 @@ func init() {
data_converter_json_protobuf.Feature,
data_converter_json.Feature,
eager_activity_non_remote_activities_worker.Feature,
eager_workflow_successful_start.Feature,
query_successful_query.Feature,
query_timeout_due_to_no_active_workers.Feature,
query_unexpected_arguments.Feature,
Expand Down

0 comments on commit 46d78ca

Please sign in to comment.