Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added worker.NewV2 with validation on decision poller count #1370

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Added
- Added worker.NewV2 with validation on decision poller count (#1370)

## [v1.2.10] - 2024-07-10
### Added
- Revert "Handle panics while polling for tasks (#1352)" (#1357)
Expand Down Expand Up @@ -83,16 +86,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Fixed in TestEnv workflow interceptor is not propagated correctly for child workflows #1289

## [v1.0.2] - 2023-09-25
### Added
### Added
- Add a structured error for non-determinism failures

### Changed
- Do not log when automatic heart beating fails due to cancellations
### Changed
- Do not log when automatic heart beating fails due to cancellations

## [v1.0.1] - 2023-08-14
### Added
- Emit cadence worker's hardware utilization inside worker once per host by @timl3136 in #1260
### Changed
### Changed
- Updated supported Go version to 1.19
- Log when the automatic heartbeating fails
- Updated golang.org/x/net and github.com/prometheus/client_golang
Expand Down
4 changes: 3 additions & 1 deletion evictiontest/workflow_cache_eviction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,13 @@ func (s *CacheEvictionSuite) TestResetStickyOnEviction() {
// so if our worker puts *cacheSize* entries in the cache, it should evict exactly one
s.service.EXPECT().ResetStickyTaskList(gomock.Any(), gomock.Any(), callOptions()...).DoAndReturn(mockResetStickyTaskList).Times(1)

workflowWorker := internal.NewWorker(s.service, "test-domain", "tasklist", worker.Options{
workflowWorker, err := internal.NewWorker(s.service, "test-domain", "tasklist", worker.Options{
DisableActivityWorker: true,
Logger: zaptest.NewLogger(s.T()),
IsolationGroup: "zone-1",
})
s.Require().NoError(err)

// this is an arbitrary workflow we use for this test
// NOTE: a simple helloworld that doesn't execute an activity
// won't work because the workflow will simply just complete
Expand Down
3 changes: 2 additions & 1 deletion internal/internal_poller_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ import (
const (
defaultPollerAutoScalerCooldown = time.Minute
defaultPollerAutoScalerTargetUtilization = 0.6
defaultMinConcurrentPollerSize = 1
defaultMinConcurrentActivityPollerSize = 1
defaultMinConcurrentDecisionPollerSize = 2
)

var (
Expand Down
2 changes: 1 addition & 1 deletion internal/internal_poller_autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func Test_pollerAutoscaler(t *testing.T) {
taskPoll: 0,
unrelated: 0,
initialPollerCount: 10,
minPollerCount: 1,
minPollerCount: 2,
maxPollerCount: 10,
targetMilliUsage: 500,
cooldownTime: coolDownTime,
Expand Down
12 changes: 8 additions & 4 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1025,8 +1025,12 @@ func newAggregatedWorker(
domain string,
taskList string,
options WorkerOptions,
) (worker *aggregatedWorker) {
) (worker *aggregatedWorker, err error) {
wOptions := AugmentWorkerOptions(options)
if err := options.Validate(); err != nil {
return nil, fmt.Errorf("worker options validation error: %w", err)
}

ctx := wOptions.BackgroundActivityContext
if ctx == nil {
ctx = context.Background()
Expand Down Expand Up @@ -1156,7 +1160,7 @@ func newAggregatedWorker(
logger: logger,
registry: registry,
workerstats: workerParams.WorkerStats,
}
}, nil
}

// tagScope with one or multiple tags, like
Expand Down Expand Up @@ -1286,10 +1290,10 @@ func AugmentWorkerOptions(options WorkerOptions) WorkerOptions {
options.MaxConcurrentSessionExecutionSize = defaultMaxConcurrentSessionExecutionSize
}
if options.MinConcurrentActivityTaskPollers == 0 {
options.MinConcurrentActivityTaskPollers = defaultMinConcurrentPollerSize
options.MinConcurrentActivityTaskPollers = defaultMinConcurrentActivityPollerSize
}
if options.MinConcurrentDecisionTaskPollers == 0 {
options.MinConcurrentDecisionTaskPollers = defaultMinConcurrentPollerSize
options.MinConcurrentDecisionTaskPollers = defaultMinConcurrentDecisionPollerSize
}
if options.PollerAutoScalerCooldown == 0 {
options.PollerAutoScalerCooldown = defaultPollerAutoScalerCooldown
Expand Down
11 changes: 7 additions & 4 deletions internal/internal_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,11 +414,12 @@ func createWorkerWithThrottle(
workerOptions.EnableSessionWorker = true

// Start Worker.
worker := NewWorker(
worker, err := NewWorker(
service,
domain,
"testGroupName2",
workerOptions)
require.NoError(t, err)
return worker
}

Expand Down Expand Up @@ -1075,7 +1076,8 @@ func TestActivityNilArgs(t *testing.T) {
func TestWorkerOptionDefaults(t *testing.T) {
domain := "worker-options-test"
taskList := "worker-options-tl"
aggWorker := newAggregatedWorker(nil, domain, taskList, WorkerOptions{})
aggWorker, err := newAggregatedWorker(nil, domain, taskList, WorkerOptions{})
require.NoError(t, err)
decisionWorker := aggWorker.workflowWorker
require.True(t, decisionWorker.executionParameters.Identity != "")
require.NotNil(t, decisionWorker.executionParameters.Logger)
Expand Down Expand Up @@ -1143,7 +1145,8 @@ func TestWorkerOptionNonDefaults(t *testing.T) {
Tracer: opentracing.NoopTracer{},
}

aggWorker := newAggregatedWorker(nil, domain, taskList, options)
aggWorker, err := newAggregatedWorker(nil, domain, taskList, options)
require.NoError(t, err)
decisionWorker := aggWorker.workflowWorker
require.True(t, len(decisionWorker.executionParameters.ContextPropagators) > 0)

Expand Down Expand Up @@ -1388,7 +1391,7 @@ func Test_augmentWorkerOptions(t *testing.T) {
MaxConcurrentDecisionTaskExecutionSize: 1000,
WorkerDecisionTasksPerSecond: 100000,
MaxConcurrentDecisionTaskPollers: 2,
MinConcurrentDecisionTaskPollers: 1,
MinConcurrentDecisionTaskPollers: 2,
PollerAutoScalerCooldown: time.Minute,
PollerAutoScalerTargetUtilization: 0.6,
PollerAutoScalerDryRun: false,
Expand Down
17 changes: 11 additions & 6 deletions internal/internal_workers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,8 @@ func (s *WorkersTestSuite) TestLongRunningDecisionTask() {
DisableActivityWorker: true,
Identity: "test-worker-identity",
}
worker := newAggregatedWorker(s.service, domain, taskList, options)
worker, err := newAggregatedWorker(s.service, domain, taskList, options)
s.Require().NoError(err)
worker.RegisterWorkflowWithOptions(
longDecisionWorkflowFn,
RegisterWorkflowOptions{Name: "long-running-decision-workflow-type"},
Expand Down Expand Up @@ -515,7 +516,8 @@ func (s *WorkersTestSuite) TestQueryTask_WorkflowCacheEvicted() {
// See the mock function for the second PollForDecisionTask call above.
MaxConcurrentDecisionTaskExecutionSize: 1,
}
worker := newAggregatedWorker(s.service, domain, taskList, options)
worker, err := newAggregatedWorker(s.service, domain, taskList, options)
s.Require().NoError(err)
worker.RegisterWorkflowWithOptions(
queryWorkflowFn,
RegisterWorkflowOptions{Name: workflowType},
Expand Down Expand Up @@ -638,7 +640,8 @@ func (s *WorkersTestSuite) TestMultipleLocalActivities() {
DisableActivityWorker: true,
Identity: "test-worker-identity",
}
worker := newAggregatedWorker(s.service, domain, taskList, options)
worker, err := newAggregatedWorker(s.service, domain, taskList, options)
s.Require().NoError(err)
worker.RegisterWorkflowWithOptions(
longDecisionWorkflowFn,
RegisterWorkflowOptions{Name: "multiple-local-activities-workflow-type"},
Expand Down Expand Up @@ -748,7 +751,8 @@ func (s *WorkersTestSuite) TestLocallyDispatchedActivity() {
Logger: zaptest.NewLogger(s.T()),
Identity: "test-worker-identity",
}
worker := newAggregatedWorker(s.service, domain, taskList, options)
worker, err := newAggregatedWorker(s.service, domain, taskList, options)
s.Require().NoError(err)
worker.RegisterWorkflowWithOptions(
workflowFn,
RegisterWorkflowOptions{Name: workflowType},
Expand Down Expand Up @@ -859,14 +863,15 @@ func (s *WorkersTestSuite) TestMultipleLocallyDispatchedActivity() {
return nil
}).MinTimes(1)

worker := newAggregatedWorker(s.service, domain, taskList, options)
worker, err := newAggregatedWorker(s.service, domain, taskList, options)
s.Require().NoError(err)
worker.RegisterWorkflowWithOptions(
workflowFn,
RegisterWorkflowOptions{Name: workflowType},
)
worker.RegisterActivityWithOptions(activitySleep, RegisterActivityOptions{Name: "activitySleep"})
s.NotNil(worker.locallyDispatchedActivityWorker)
err := worker.Start()
err = worker.Start()
s.NoError(err, "worker failed to start")

// wait for test to complete
Expand Down
14 changes: 12 additions & 2 deletions internal/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package internal

import (
"context"
"fmt"
"time"

"go.uber.org/cadence/internal/common/debug"
Expand Down Expand Up @@ -108,7 +109,7 @@ type (
// optional: Sets the minimum number of goroutines that will concurrently poll the
// cadence-server to retrieve decision tasks. If FeatureFlags.PollerAutoScalerEnabled is set to true,
// changing this value will NOT affect the rate at which the worker is able to consume tasks from a task list.
// Default value is 1
// Default value is 2
MinConcurrentDecisionTaskPollers int

// optional: Sets the interval of poller autoscaling, between which poller autoscaler changes the poller count
Expand Down Expand Up @@ -333,7 +334,7 @@ func NewWorker(
domain string,
taskList string,
options WorkerOptions,
) *aggregatedWorker {
) (*aggregatedWorker, error) {
return newAggregatedWorker(service, domain, taskList, options)
}

Expand Down Expand Up @@ -383,3 +384,12 @@ func ReplayPartialWorkflowHistoryFromJSONFile(logger *zap.Logger, jsonfileName s
r := NewWorkflowReplayer()
return r.ReplayPartialWorkflowHistoryFromJSONFile(logger, jsonfileName, lastEventID)
}

// Validate sanity validation of WorkerOptions
func (o WorkerOptions) Validate() error {
// decision task pollers must be >= 2 or unset if sticky tasklist is enabled https://github.com/uber-go/cadence-client/issues/1369
if !o.DisableStickyExecution && (o.MaxConcurrentDecisionTaskPollers == 1 || o.MinConcurrentDecisionTaskPollers == 1) {
return fmt.Errorf("DecisionTaskPollers must be >= 2 or use default value")
}
return nil
}
68 changes: 68 additions & 0 deletions internal/worker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright (c) 2017-2021 Uber Technologies Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package internal

import (
"testing"

"github.com/stretchr/testify/assert"
)

func Test_NewWorker(t *testing.T) {
tests := []struct {
name string
options WorkerOptions
expectErr string
}{
{
name: "happy with default value",
options: WorkerOptions{},
expectErr: "",
},
{
name: "happy with explicit decision task poller set to 1 if sticky task list is disabled",
options: WorkerOptions{
MaxConcurrentDecisionTaskPollers: 1,
DisableStickyExecution: true,
},
expectErr: "",
},
{
name: "invalid worker with explicit decision task poller set to 1",
options: WorkerOptions{
MaxConcurrentDecisionTaskPollers: 1,
},
expectErr: "DecisionTaskPollers must be >= 2 or use default value",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
w, err := NewWorker(nil, "test-domain", "test-tasklist", tt.options)
if tt.expectErr != "" {
assert.ErrorContains(t, err, tt.expectErr)
assert.Nil(t, w)
} else {
assert.NoError(t, err)
assert.NotNil(t, w)
}
})
}
}
20 changes: 17 additions & 3 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,20 +269,34 @@ const (
ShadowModeContinuous = internal.ShadowModeContinuous
)

// New creates an instance of worker for managing workflow and activity executions.
// Deprecated: use NewV2 instead since this implementation will panic on error
func New(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please update CHANGELOG.md about this new validation and the new worker.NewV2 constructor

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

service workflowserviceclient.Interface,
domain string,
taskList string,
options Options,
) Worker {
w, err := internal.NewWorker(service, domain, taskList, options)
if err != nil {
panic(err)
Groxx marked this conversation as resolved.
Show resolved Hide resolved
}
return w
}

// NewV2 returns an instance of worker for managing workflow and activity executions and an error.
//
// service - thrift connection to the cadence server
// domain - the name of the cadence domain
// taskList - is the task list name you use to identify your client worker, also
// identifies group of workflow and activity implementations that are
// hosted by a single worker process
// options - configure any worker specific options like logger, metrics, identity
func New(
func NewV2(
service workflowserviceclient.Interface,
domain string,
taskList string,
options Options,
) Worker {
) (Worker, error) {
return internal.NewWorker(service, domain, taskList, options)
}

Expand Down
Loading