Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added worker.NewV2 with validation on decision poller count #1370

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion evictiontest/workflow_cache_eviction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,13 @@ func (s *CacheEvictionSuite) TestResetStickyOnEviction() {
// so if our worker puts *cacheSize* entries in the cache, it should evict exactly one
s.service.EXPECT().ResetStickyTaskList(gomock.Any(), gomock.Any(), callOptions()...).DoAndReturn(mockResetStickyTaskList).Times(1)

workflowWorker := internal.NewWorker(s.service, "test-domain", "tasklist", worker.Options{
workflowWorker, err := internal.NewWorker(s.service, "test-domain", "tasklist", worker.Options{
DisableActivityWorker: true,
Logger: zaptest.NewLogger(s.T()),
IsolationGroup: "zone-1",
})
s.Require().NoError(err)

// this is an arbitrary workflow we use for this test
// NOTE: a simple helloworld that doesn't execute an activity
// won't work because the workflow will simply just complete
Expand Down
3 changes: 2 additions & 1 deletion internal/internal_poller_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ import (
const (
defaultPollerAutoScalerCooldown = time.Minute
defaultPollerAutoScalerTargetUtilization = 0.6
defaultMinConcurrentPollerSize = 1
defaultMinConcurrentActivityPollerSize = 1
defaultMinConcurrentDecisionPollerSize = 2
)

var (
Expand Down
4 changes: 2 additions & 2 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1286,10 +1286,10 @@ func AugmentWorkerOptions(options WorkerOptions) WorkerOptions {
options.MaxConcurrentSessionExecutionSize = defaultMaxConcurrentSessionExecutionSize
}
if options.MinConcurrentActivityTaskPollers == 0 {
options.MinConcurrentActivityTaskPollers = defaultMinConcurrentPollerSize
options.MinConcurrentActivityTaskPollers = defaultMinConcurrentActivityPollerSize
}
if options.MinConcurrentDecisionTaskPollers == 0 {
options.MinConcurrentDecisionTaskPollers = defaultMinConcurrentPollerSize
options.MinConcurrentDecisionTaskPollers = defaultMinConcurrentDecisionPollerSize
}
if options.PollerAutoScalerCooldown == 0 {
options.PollerAutoScalerCooldown = defaultPollerAutoScalerCooldown
Expand Down
5 changes: 3 additions & 2 deletions internal/internal_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,11 +414,12 @@ func createWorkerWithThrottle(
workerOptions.EnableSessionWorker = true

// Start Worker.
worker := NewWorker(
worker, err := NewWorker(
service,
domain,
"testGroupName2",
workerOptions)
require.NoError(t, err)
return worker
}

Expand Down Expand Up @@ -1388,7 +1389,7 @@ func Test_augmentWorkerOptions(t *testing.T) {
MaxConcurrentDecisionTaskExecutionSize: 1000,
WorkerDecisionTasksPerSecond: 100000,
MaxConcurrentDecisionTaskPollers: 2,
MinConcurrentDecisionTaskPollers: 1,
MinConcurrentDecisionTaskPollers: 2,
PollerAutoScalerCooldown: time.Minute,
PollerAutoScalerTargetUtilization: 0.6,
PollerAutoScalerDryRun: false,
Expand Down
19 changes: 16 additions & 3 deletions internal/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package internal

import (
"context"
"fmt"
"time"

"go.uber.org/cadence/internal/common/debug"
Expand Down Expand Up @@ -108,7 +109,7 @@ type (
// optional: Sets the minimum number of goroutines that will concurrently poll the
// cadence-server to retrieve decision tasks. If FeatureFlags.PollerAutoScalerEnabled is set to true,
// changing this value will NOT affect the rate at which the worker is able to consume tasks from a task list.
// Default value is 1
// Default value is 2
MinConcurrentDecisionTaskPollers int

// optional: Sets the interval of poller autoscaling, between which poller autoscaler changes the poller count
Expand Down Expand Up @@ -333,8 +334,11 @@ func NewWorker(
domain string,
taskList string,
options WorkerOptions,
) *aggregatedWorker {
return newAggregatedWorker(service, domain, taskList, options)
) (*aggregatedWorker, error) {
if err := options.Validate(); err != nil {
return nil, fmt.Errorf("worker options validation error: %w", err)
}
return newAggregatedWorker(service, domain, taskList, options), nil
Groxx marked this conversation as resolved.
Show resolved Hide resolved
}

// ReplayWorkflowExecution loads a workflow execution history from the Cadence service and executes a single decision task for it.
Expand Down Expand Up @@ -383,3 +387,12 @@ func ReplayPartialWorkflowHistoryFromJSONFile(logger *zap.Logger, jsonfileName s
r := NewWorkflowReplayer()
return r.ReplayPartialWorkflowHistoryFromJSONFile(logger, jsonfileName, lastEventID)
}

// Validate sanity validation of WorkerOptions
func (o WorkerOptions) Validate() error {
// decision task pollers must be >= 2 or unset if sticky tasklist is enabled https://github.com/uber-go/cadence-client/issues/1369
if !o.DisableStickyExecution && (o.MaxConcurrentDecisionTaskPollers == 1 || o.MinConcurrentDecisionTaskPollers == 1) {
return fmt.Errorf("DecisionTaskPollers must be >= 2 or use default value")
}
return nil
}
68 changes: 68 additions & 0 deletions internal/worker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright (c) 2017-2021 Uber Technologies Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package internal

import (
"testing"

"github.com/stretchr/testify/assert"
)

func Test_NewWorker(t *testing.T) {
tests := []struct {
name string
options WorkerOptions
expectErr string
}{
{
name: "happy with default value",
options: WorkerOptions{},
expectErr: "",
},
{
name: "happy with explicit decision task poller set to 1 if sticky task list is disabled",
options: WorkerOptions{
MaxConcurrentDecisionTaskPollers: 1,
DisableStickyExecution: true,
},
expectErr: "",
},
{
name: "invalid worker with explicit decision task poller set to 1",
options: WorkerOptions{
MaxConcurrentDecisionTaskPollers: 1,
},
expectErr: "DecisionTaskPollers must be >= 2 or use default value",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
w, err := NewWorker(nil, "test-domain", "test-tasklist", tt.options)
if tt.expectErr != "" {
assert.ErrorContains(t, err, tt.expectErr)
assert.Nil(t, w)
} else {
assert.NoError(t, err)
assert.NotNil(t, w)
}
})
}
}
6 changes: 5 additions & 1 deletion worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,11 @@ func New(
taskList string,
options Options,
) Worker {
return internal.NewWorker(service, domain, taskList, options)
w, err := internal.NewWorker(service, domain, taskList, options)
if err != nil {
panic(err)
Groxx marked this conversation as resolved.
Show resolved Hide resolved
}
return w
}

// NewWorkflowReplayer creates a WorkflowReplayer instance.
Expand Down
Loading