diff --git a/CHANGELOG.md b/CHANGELOG.md index 2dcd2bbe8..caa2b9e29 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +### Added +- Added worker.NewV2 with validation on decision poller count (#1370) + ## [v1.2.10] - 2024-07-10 ### Added - Revert "Handle panics while polling for tasks (#1352)" (#1357) @@ -83,16 +86,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Fixed in TestEnv workflow interceptor is not propagated correctly for child workflows #1289 ## [v1.0.2] - 2023-09-25 -### Added +### Added - Add a structured error for non-determinism failures -### Changed -- Do not log when automatic heart beating fails due to cancellations +### Changed +- Do not log when automatic heart beating fails due to cancellations ## [v1.0.1] - 2023-08-14 ### Added - Emit cadence worker's hardware utilization inside worker once per host by @timl3136 in #1260 -### Changed +### Changed - Updated supported Go version to 1.19 - Log when the automatic heartbeating fails - Updated golang.org/x/net and github.com/prometheus/client_golang diff --git a/evictiontest/workflow_cache_eviction_test.go b/evictiontest/workflow_cache_eviction_test.go index fe0f66ffa..6c92762b9 100644 --- a/evictiontest/workflow_cache_eviction_test.go +++ b/evictiontest/workflow_cache_eviction_test.go @@ -174,11 +174,13 @@ func (s *CacheEvictionSuite) TestResetStickyOnEviction() { // so if our worker puts *cacheSize* entries in the cache, it should evict exactly one s.service.EXPECT().ResetStickyTaskList(gomock.Any(), gomock.Any(), callOptions()...).DoAndReturn(mockResetStickyTaskList).Times(1) - workflowWorker := internal.NewWorker(s.service, "test-domain", "tasklist", worker.Options{ + workflowWorker, err := internal.NewWorker(s.service, "test-domain", "tasklist", worker.Options{ DisableActivityWorker: true, Logger: zaptest.NewLogger(s.T()), IsolationGroup: "zone-1", }) + s.Require().NoError(err) + // this is an arbitrary workflow we use for this test // NOTE: a simple helloworld that doesn't execute an activity // won't work because the workflow will simply just complete diff --git a/internal/internal_poller_autoscaler.go b/internal/internal_poller_autoscaler.go index 333f03933..6fbd4ccbc 100644 --- a/internal/internal_poller_autoscaler.go +++ b/internal/internal_poller_autoscaler.go @@ -37,7 +37,8 @@ import ( const ( defaultPollerAutoScalerCooldown = time.Minute defaultPollerAutoScalerTargetUtilization = 0.6 - defaultMinConcurrentPollerSize = 1 + defaultMinConcurrentActivityPollerSize = 1 + defaultMinConcurrentDecisionPollerSize = 2 ) var ( diff --git a/internal/internal_poller_autoscaler_test.go b/internal/internal_poller_autoscaler_test.go index 48337284b..3e9757979 100644 --- a/internal/internal_poller_autoscaler_test.go +++ b/internal/internal_poller_autoscaler_test.go @@ -61,7 +61,7 @@ func Test_pollerAutoscaler(t *testing.T) { taskPoll: 0, unrelated: 0, initialPollerCount: 10, - minPollerCount: 1, + minPollerCount: 2, maxPollerCount: 10, targetMilliUsage: 500, cooldownTime: coolDownTime, diff --git a/internal/internal_worker.go b/internal/internal_worker.go index 78911d0a4..e765e3515 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -1025,8 +1025,12 @@ func newAggregatedWorker( domain string, taskList string, options WorkerOptions, -) (worker *aggregatedWorker) { +) (worker *aggregatedWorker, err error) { wOptions := AugmentWorkerOptions(options) + if err := options.Validate(); err != nil { + return nil, fmt.Errorf("worker options validation error: %w", err) + } + ctx := wOptions.BackgroundActivityContext if ctx == nil { ctx = context.Background() @@ -1156,7 +1160,7 @@ func newAggregatedWorker( logger: logger, registry: registry, workerstats: workerParams.WorkerStats, - } + }, nil } // tagScope with one or multiple tags, like @@ -1286,10 +1290,10 @@ func AugmentWorkerOptions(options WorkerOptions) WorkerOptions { options.MaxConcurrentSessionExecutionSize = defaultMaxConcurrentSessionExecutionSize } if options.MinConcurrentActivityTaskPollers == 0 { - options.MinConcurrentActivityTaskPollers = defaultMinConcurrentPollerSize + options.MinConcurrentActivityTaskPollers = defaultMinConcurrentActivityPollerSize } if options.MinConcurrentDecisionTaskPollers == 0 { - options.MinConcurrentDecisionTaskPollers = defaultMinConcurrentPollerSize + options.MinConcurrentDecisionTaskPollers = defaultMinConcurrentDecisionPollerSize } if options.PollerAutoScalerCooldown == 0 { options.PollerAutoScalerCooldown = defaultPollerAutoScalerCooldown diff --git a/internal/internal_worker_test.go b/internal/internal_worker_test.go index 8b02cf329..65792bb45 100644 --- a/internal/internal_worker_test.go +++ b/internal/internal_worker_test.go @@ -414,11 +414,12 @@ func createWorkerWithThrottle( workerOptions.EnableSessionWorker = true // Start Worker. - worker := NewWorker( + worker, err := NewWorker( service, domain, "testGroupName2", workerOptions) + require.NoError(t, err) return worker } @@ -1075,7 +1076,8 @@ func TestActivityNilArgs(t *testing.T) { func TestWorkerOptionDefaults(t *testing.T) { domain := "worker-options-test" taskList := "worker-options-tl" - aggWorker := newAggregatedWorker(nil, domain, taskList, WorkerOptions{}) + aggWorker, err := newAggregatedWorker(nil, domain, taskList, WorkerOptions{}) + require.NoError(t, err) decisionWorker := aggWorker.workflowWorker require.True(t, decisionWorker.executionParameters.Identity != "") require.NotNil(t, decisionWorker.executionParameters.Logger) @@ -1143,7 +1145,8 @@ func TestWorkerOptionNonDefaults(t *testing.T) { Tracer: opentracing.NoopTracer{}, } - aggWorker := newAggregatedWorker(nil, domain, taskList, options) + aggWorker, err := newAggregatedWorker(nil, domain, taskList, options) + require.NoError(t, err) decisionWorker := aggWorker.workflowWorker require.True(t, len(decisionWorker.executionParameters.ContextPropagators) > 0) @@ -1388,7 +1391,7 @@ func Test_augmentWorkerOptions(t *testing.T) { MaxConcurrentDecisionTaskExecutionSize: 1000, WorkerDecisionTasksPerSecond: 100000, MaxConcurrentDecisionTaskPollers: 2, - MinConcurrentDecisionTaskPollers: 1, + MinConcurrentDecisionTaskPollers: 2, PollerAutoScalerCooldown: time.Minute, PollerAutoScalerTargetUtilization: 0.6, PollerAutoScalerDryRun: false, diff --git a/internal/internal_workers_test.go b/internal/internal_workers_test.go index 49d4041b2..8b6e08049 100644 --- a/internal/internal_workers_test.go +++ b/internal/internal_workers_test.go @@ -340,7 +340,8 @@ func (s *WorkersTestSuite) TestLongRunningDecisionTask() { DisableActivityWorker: true, Identity: "test-worker-identity", } - worker := newAggregatedWorker(s.service, domain, taskList, options) + worker, err := newAggregatedWorker(s.service, domain, taskList, options) + s.Require().NoError(err) worker.RegisterWorkflowWithOptions( longDecisionWorkflowFn, RegisterWorkflowOptions{Name: "long-running-decision-workflow-type"}, @@ -515,7 +516,8 @@ func (s *WorkersTestSuite) TestQueryTask_WorkflowCacheEvicted() { // See the mock function for the second PollForDecisionTask call above. MaxConcurrentDecisionTaskExecutionSize: 1, } - worker := newAggregatedWorker(s.service, domain, taskList, options) + worker, err := newAggregatedWorker(s.service, domain, taskList, options) + s.Require().NoError(err) worker.RegisterWorkflowWithOptions( queryWorkflowFn, RegisterWorkflowOptions{Name: workflowType}, @@ -638,7 +640,8 @@ func (s *WorkersTestSuite) TestMultipleLocalActivities() { DisableActivityWorker: true, Identity: "test-worker-identity", } - worker := newAggregatedWorker(s.service, domain, taskList, options) + worker, err := newAggregatedWorker(s.service, domain, taskList, options) + s.Require().NoError(err) worker.RegisterWorkflowWithOptions( longDecisionWorkflowFn, RegisterWorkflowOptions{Name: "multiple-local-activities-workflow-type"}, @@ -748,7 +751,8 @@ func (s *WorkersTestSuite) TestLocallyDispatchedActivity() { Logger: zaptest.NewLogger(s.T()), Identity: "test-worker-identity", } - worker := newAggregatedWorker(s.service, domain, taskList, options) + worker, err := newAggregatedWorker(s.service, domain, taskList, options) + s.Require().NoError(err) worker.RegisterWorkflowWithOptions( workflowFn, RegisterWorkflowOptions{Name: workflowType}, @@ -859,14 +863,15 @@ func (s *WorkersTestSuite) TestMultipleLocallyDispatchedActivity() { return nil }).MinTimes(1) - worker := newAggregatedWorker(s.service, domain, taskList, options) + worker, err := newAggregatedWorker(s.service, domain, taskList, options) + s.Require().NoError(err) worker.RegisterWorkflowWithOptions( workflowFn, RegisterWorkflowOptions{Name: workflowType}, ) worker.RegisterActivityWithOptions(activitySleep, RegisterActivityOptions{Name: "activitySleep"}) s.NotNil(worker.locallyDispatchedActivityWorker) - err := worker.Start() + err = worker.Start() s.NoError(err, "worker failed to start") // wait for test to complete diff --git a/internal/worker.go b/internal/worker.go index 1d2fbe4af..e246ab080 100644 --- a/internal/worker.go +++ b/internal/worker.go @@ -23,6 +23,7 @@ package internal import ( "context" + "fmt" "time" "go.uber.org/cadence/internal/common/debug" @@ -108,7 +109,7 @@ type ( // optional: Sets the minimum number of goroutines that will concurrently poll the // cadence-server to retrieve decision tasks. If FeatureFlags.PollerAutoScalerEnabled is set to true, // changing this value will NOT affect the rate at which the worker is able to consume tasks from a task list. - // Default value is 1 + // Default value is 2 MinConcurrentDecisionTaskPollers int // optional: Sets the interval of poller autoscaling, between which poller autoscaler changes the poller count @@ -333,7 +334,7 @@ func NewWorker( domain string, taskList string, options WorkerOptions, -) *aggregatedWorker { +) (*aggregatedWorker, error) { return newAggregatedWorker(service, domain, taskList, options) } @@ -383,3 +384,12 @@ func ReplayPartialWorkflowHistoryFromJSONFile(logger *zap.Logger, jsonfileName s r := NewWorkflowReplayer() return r.ReplayPartialWorkflowHistoryFromJSONFile(logger, jsonfileName, lastEventID) } + +// Validate sanity validation of WorkerOptions +func (o WorkerOptions) Validate() error { + // decision task pollers must be >= 2 or unset if sticky tasklist is enabled https://github.com/uber-go/cadence-client/issues/1369 + if !o.DisableStickyExecution && (o.MaxConcurrentDecisionTaskPollers == 1 || o.MinConcurrentDecisionTaskPollers == 1) { + return fmt.Errorf("DecisionTaskPollers must be >= 2 or use default value") + } + return nil +} diff --git a/internal/worker_test.go b/internal/worker_test.go new file mode 100644 index 000000000..950955efa --- /dev/null +++ b/internal/worker_test.go @@ -0,0 +1,68 @@ +// Copyright (c) 2017-2021 Uber Technologies Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package internal + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_NewWorker(t *testing.T) { + tests := []struct { + name string + options WorkerOptions + expectErr string + }{ + { + name: "happy with default value", + options: WorkerOptions{}, + expectErr: "", + }, + { + name: "happy with explicit decision task poller set to 1 if sticky task list is disabled", + options: WorkerOptions{ + MaxConcurrentDecisionTaskPollers: 1, + DisableStickyExecution: true, + }, + expectErr: "", + }, + { + name: "invalid worker with explicit decision task poller set to 1", + options: WorkerOptions{ + MaxConcurrentDecisionTaskPollers: 1, + }, + expectErr: "DecisionTaskPollers must be >= 2 or use default value", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + w, err := NewWorker(nil, "test-domain", "test-tasklist", tt.options) + if tt.expectErr != "" { + assert.ErrorContains(t, err, tt.expectErr) + assert.Nil(t, w) + } else { + assert.NoError(t, err) + assert.NotNil(t, w) + } + }) + } +} diff --git a/worker/worker.go b/worker/worker.go index 13cdf3164..3dd3cee85 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -269,7 +269,21 @@ const ( ShadowModeContinuous = internal.ShadowModeContinuous ) -// New creates an instance of worker for managing workflow and activity executions. +// Deprecated: use NewV2 instead since this implementation will panic on error +func New( + service workflowserviceclient.Interface, + domain string, + taskList string, + options Options, +) Worker { + w, err := internal.NewWorker(service, domain, taskList, options) + if err != nil { + panic(err) + } + return w +} + +// NewV2 returns an instance of worker for managing workflow and activity executions and an error. // // service - thrift connection to the cadence server // domain - the name of the cadence domain @@ -277,12 +291,12 @@ const ( // identifies group of workflow and activity implementations that are // hosted by a single worker process // options - configure any worker specific options like logger, metrics, identity -func New( +func NewV2( service workflowserviceclient.Interface, domain string, taskList string, options Options, -) Worker { +) (Worker, error) { return internal.NewWorker(service, domain, taskList, options) }