From f2db04589454ecfd627709ee4eb4d476c5d3be87 Mon Sep 17 00:00:00 2001 From: Shijie Sheng Date: Tue, 10 Dec 2024 09:57:40 -0800 Subject: [PATCH] rework on poller auto scaler --- internal/common/autoscaler/autoscaler.go | 32 -- internal/common/autoscaler/estimator.go | 30 -- internal/common/autoscaler/recommender.go | 66 ---- .../common/autoscaler/recommender_test.go | 144 --------- internal/common/autoscaler/types.go | 50 --- internal/internal_poller_autoscaler.go | 154 --------- internal/internal_poller_autoscaler_test.go | 300 ------------------ internal/internal_worker_base.go | 82 +++-- internal/worker/concurrency_auto_scaler.go | 231 ++++++++++++++ 9 files changed, 280 insertions(+), 809 deletions(-) delete mode 100644 internal/common/autoscaler/autoscaler.go delete mode 100644 internal/common/autoscaler/estimator.go delete mode 100644 internal/common/autoscaler/recommender.go delete mode 100644 internal/common/autoscaler/recommender_test.go delete mode 100644 internal/common/autoscaler/types.go delete mode 100644 internal/internal_poller_autoscaler_test.go create mode 100644 internal/worker/concurrency_auto_scaler.go diff --git a/internal/common/autoscaler/autoscaler.go b/internal/common/autoscaler/autoscaler.go deleted file mode 100644 index c080917ce..000000000 --- a/internal/common/autoscaler/autoscaler.go +++ /dev/null @@ -1,32 +0,0 @@ -// Copyright (c) 2017-2021 Uber Technologies Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package autoscaler - -// AutoScaler collects data and estimate usage -type ( - AutoScaler interface { - Estimator - // Start starts the autoscaler go routine that scales the ResourceUnit according to Estimator - Start() - // Stop stops the autoscaler if started or do nothing if not yet started - Stop() - } -) diff --git a/internal/common/autoscaler/estimator.go b/internal/common/autoscaler/estimator.go deleted file mode 100644 index 3814b2d21..000000000 --- a/internal/common/autoscaler/estimator.go +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright (c) 2017-2021 Uber Technologies Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package autoscaler - -type ( - // Estimator collects data and estimate usage - Estimator interface { - CollectUsage(data interface{}) error - Estimate() (Usages, error) - Reset() - } -) diff --git a/internal/common/autoscaler/recommender.go b/internal/common/autoscaler/recommender.go deleted file mode 100644 index 4338886a7..000000000 --- a/internal/common/autoscaler/recommender.go +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright (c) 2017-2021 Uber Technologies Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package autoscaler - -import "math" - -// Recommender a recommendation generator for ResourceUnit -type Recommender interface { - Recommend(currentResource ResourceUnit, currentUsages Usages) ResourceUnit -} - -type linearRecommender struct { - lower, upper ResourceUnit - targetUsages Usages -} - -// NewLinearRecommender create a linear Recommender -func NewLinearRecommender(lower, upper ResourceUnit, targetUsages Usages) Recommender { - return &linearRecommender{ - lower: lower, - upper: upper, - targetUsages: targetUsages, - } -} - -// Recommend recommends the new value -func (l *linearRecommender) Recommend(currentResource ResourceUnit, currentUsages Usages) ResourceUnit { - var recommend float64 - - // average recommendation over all UsageType - for usageType := range currentUsages { - var r float64 - if l.targetUsages[usageType] == 0 { // avoid division by zero - r = math.MaxFloat64 - } else { - if currentUsages[usageType].Value() == float64(1000) { - r = l.upper.Value() - } else { - r = currentResource.Value() * currentUsages[usageType].Value() / l.targetUsages[usageType].Value() - } - } - // boundary check - r = math.Min(l.upper.Value(), math.Max(l.lower.Value(), r)) - recommend += r - } - recommend /= float64(len(currentUsages)) - return ResourceUnit(recommend) -} diff --git a/internal/common/autoscaler/recommender_test.go b/internal/common/autoscaler/recommender_test.go deleted file mode 100644 index 8b716e4cb..000000000 --- a/internal/common/autoscaler/recommender_test.go +++ /dev/null @@ -1,144 +0,0 @@ -// Copyright (c) 2017-2021 Uber Technologies Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package autoscaler - -import "testing" - -func Test_linearRecommender_Recommend(t *testing.T) { - type fields struct { - lower ResourceUnit - upper ResourceUnit - targetUsages Usages - } - type args struct { - currentResource ResourceUnit - currentUsages Usages - } - - defaultFields := fields{ - lower: 5, - upper: 15, - targetUsages: map[UsageType]MilliUsage{ - PollerUtilizationRate: 500, - }, - } - - highUpperValue := fields{ - lower: 5, - upper: 100, - targetUsages: map[UsageType]MilliUsage{ - PollerUtilizationRate: 500, - }, - } - - tests := []struct { - name string - fields fields - args args - want ResourceUnit - }{ - { - name: "on target usage", - fields: defaultFields, - args: args{ - currentResource: 10, - currentUsages: map[UsageType]MilliUsage{ - PollerUtilizationRate: 500, - }, - }, - want: ResourceUnit(10), - }, - { - name: "under utilized, scale down", - fields: defaultFields, - args: args{ - currentResource: 10, - currentUsages: map[UsageType]MilliUsage{ - PollerUtilizationRate: 400, - }, - }, - want: ResourceUnit(8), - }, - { - name: "under utilized, scale down but bounded", - fields: defaultFields, - args: args{ - currentResource: 10, - currentUsages: map[UsageType]MilliUsage{ - PollerUtilizationRate: 200, - }, - }, - want: ResourceUnit(5), - }, - { - name: "zero utilization, scale down to min", - fields: defaultFields, - args: args{ - currentResource: 10, - currentUsages: map[UsageType]MilliUsage{ - PollerUtilizationRate: 0, - }, - }, - want: ResourceUnit(5), - }, - { - name: "over utilized, scale up", - fields: defaultFields, - args: args{ - currentResource: 10, - currentUsages: map[UsageType]MilliUsage{ - PollerUtilizationRate: 600, - }, - }, - want: ResourceUnit(12), - }, - { - name: "over utilized, scale up but bounded", - fields: defaultFields, - args: args{ - currentResource: 10, - currentUsages: map[UsageType]MilliUsage{ - PollerUtilizationRate: 1000, - }, - }, - want: ResourceUnit(15), - }, - { - name: "over utilized, since we do not how many tasks are in the queue (because poller usage at 100%), scale up to max", - fields: highUpperValue, - args: args{ - currentResource: 10, - currentUsages: map[UsageType]MilliUsage{ - PollerUtilizationRate: 1000, - }, - }, - want: ResourceUnit(100), - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - l := NewLinearRecommender(tt.fields.lower, tt.fields.upper, tt.fields.targetUsages) - if got := l.Recommend(tt.args.currentResource, tt.args.currentUsages); got != tt.want { - t.Errorf("Recommend() = %v, want %v", got, tt.want) - } - }) - } -} diff --git a/internal/common/autoscaler/types.go b/internal/common/autoscaler/types.go deleted file mode 100644 index e7a8c699a..000000000 --- a/internal/common/autoscaler/types.go +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright (c) 2017-2021 Uber Technologies Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package autoscaler - -type ( - // ResourceUnit is the unit of scalable resources - ResourceUnit uint - - // MilliUsage is the custom defined usage of ResourceUnit times 1000 - MilliUsage uint64 - - // Usages are different measurements used by a Recommender to provide a recommended ResourceUnit - Usages map[UsageType]MilliUsage - - // UsageType type of usage - UsageType string -) - -const ( - // PollerUtilizationRate is a scale from 0 to 1 to indicate poller usages - PollerUtilizationRate UsageType = "pollerUtilizationRate" -) - -// Value helper method for type conversion -func (r ResourceUnit) Value() float64 { - return float64(r) -} - -// Value helper method for type conversion -func (u MilliUsage) Value() float64 { - return float64(u) -} diff --git a/internal/internal_poller_autoscaler.go b/internal/internal_poller_autoscaler.go index 2dc81e7ba..ca96a2f46 100644 --- a/internal/internal_poller_autoscaler.go +++ b/internal/internal_poller_autoscaler.go @@ -21,16 +21,7 @@ package internal import ( - "context" - "errors" - "sync" "time" - - "go.uber.org/atomic" - "go.uber.org/zap" - - "go.uber.org/cadence/internal/common/autoscaler" - "go.uber.org/cadence/internal/worker" ) // defaultPollerScalerCooldownInSeconds @@ -41,33 +32,7 @@ const ( defaultMinConcurrentDecisionPollerSize = 2 ) -var ( - _ autoscaler.AutoScaler = (*pollerAutoScaler)(nil) - _ autoscaler.Estimator = (*pollerUsageEstimator)(nil) -) - type ( - pollerAutoScaler struct { - pollerUsageEstimator - - isDryRun bool - cooldownTime time.Duration - logger *zap.Logger - permit worker.Permit - ctx context.Context - cancel context.CancelFunc - wg *sync.WaitGroup // graceful stop - recommender autoscaler.Recommender - onAutoScale []func() // hook functions that run post autoscale - } - - pollerUsageEstimator struct { - // This single atomic variable stores two variables: - // left 32 bits is noTaskCounts, right 32 bits is taskCounts. - // This avoids unnecessary usage of CompareAndSwap - atomicBits *atomic.Uint64 - } - pollerAutoScalerOptions struct { Enabled bool InitCount int @@ -78,122 +43,3 @@ type ( TargetUtilization float64 } ) - -func newPollerScaler( - options pollerAutoScalerOptions, - logger *zap.Logger, - permit worker.Permit, - hooks ...func()) *pollerAutoScaler { - if !options.Enabled { - return nil - } - ctx, cancel := context.WithCancel(context.Background()) - return &pollerAutoScaler{ - isDryRun: options.DryRun, - cooldownTime: options.Cooldown, - logger: logger, - permit: permit, - wg: &sync.WaitGroup{}, - ctx: ctx, - cancel: cancel, - pollerUsageEstimator: pollerUsageEstimator{atomicBits: atomic.NewUint64(0)}, - recommender: autoscaler.NewLinearRecommender( - autoscaler.ResourceUnit(options.MinCount), - autoscaler.ResourceUnit(options.MaxCount), - autoscaler.Usages{ - autoscaler.PollerUtilizationRate: autoscaler.MilliUsage(options.TargetUtilization * 1000), - }, - ), - onAutoScale: hooks, - } -} - -// Start an auto-scaler go routine and returns a done to stop it -func (p *pollerAutoScaler) Start() { - logger := p.logger.Sugar() - p.wg.Add(1) - go func() { - defer p.wg.Done() - for { - select { - case <-p.ctx.Done(): - return - case <-time.After(p.cooldownTime): - currentResource := autoscaler.ResourceUnit(p.permit.Quota()) - currentUsages, err := p.pollerUsageEstimator.Estimate() - if err != nil { - logger.Warnw("poller autoscaler skip due to estimator error", "error", err) - continue - } - proposedResource := p.recommender.Recommend(currentResource, currentUsages) - logger.Debugw("poller autoscaler recommendation", - "currentUsage", currentUsages, - "current", uint64(currentResource), - "recommend", uint64(proposedResource), - "isDryRun", p.isDryRun) - if !p.isDryRun { - p.permit.SetQuota(int(proposedResource)) - } - p.pollerUsageEstimator.Reset() - - // hooks - for i := range p.onAutoScale { - p.onAutoScale[i]() - } - } - } - }() - return -} - -// Stop stops the poller autoscaler -func (p *pollerAutoScaler) Stop() { - p.cancel() - p.wg.Wait() -} - -// Reset metrics from the start -func (m *pollerUsageEstimator) Reset() { - m.atomicBits.Store(0) -} - -// CollectUsage counts past poll results to estimate autoscaler.Usages -func (m *pollerUsageEstimator) CollectUsage(data interface{}) error { - isEmpty, err := isTaskEmpty(data) - if err != nil { - return err - } - if isEmpty { // no-task poll - m.atomicBits.Add(1 << 32) - } else { - m.atomicBits.Add(1) - } - return nil -} - -func isTaskEmpty(task interface{}) (bool, error) { - switch t := task.(type) { - case *workflowTask: - return t == nil || t.task == nil, nil - case *activityTask: - return t == nil || t.task == nil, nil - case *localActivityTask: - return t == nil || t.workflowTask == nil, nil - default: - return false, errors.New("unknown task type") - } -} - -// Estimate is based on past poll counts -func (m *pollerUsageEstimator) Estimate() (autoscaler.Usages, error) { - bits := m.atomicBits.Load() - noTaskCounts := bits >> 32 // left 32 bits - taskCounts := bits & ((1 << 32) - 1) // right 32 bits - if noTaskCounts+taskCounts == 0 { - return nil, errors.New("autoscaler.Estimator::Estimate error: not enough data") - } - - return autoscaler.Usages{ - autoscaler.PollerUtilizationRate: autoscaler.MilliUsage(taskCounts * 1000 / (noTaskCounts + taskCounts)), - }, nil -} diff --git a/internal/internal_poller_autoscaler_test.go b/internal/internal_poller_autoscaler_test.go deleted file mode 100644 index 4a441b642..000000000 --- a/internal/internal_poller_autoscaler_test.go +++ /dev/null @@ -1,300 +0,0 @@ -// Copyright (c) 2017-2021 Uber Technologies Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package internal - -import ( - "context" - "math/rand" - "sync" - "testing" - "time" - - "go.uber.org/cadence/internal/common/testlogger" - "go.uber.org/cadence/internal/worker" - - "github.com/stretchr/testify/assert" - "go.uber.org/atomic" - - s "go.uber.org/cadence/.gen/go/shared" - "go.uber.org/cadence/internal/common/autoscaler" -) - -func Test_pollerAutoscaler(t *testing.T) { - type args struct { - disabled bool - noTaskPoll, taskPoll, unrelated int - initialPollerCount int - minPollerCount int - maxPollerCount int - targetMilliUsage uint64 - cooldownTime time.Duration - autoScalerEpoch int - isDryRun bool - } - - coolDownTime := time.Millisecond * 50 - - tests := []struct { - name string - args args - want int - }{ - { - name: "dry run doesn't change anything", - args: args{ - noTaskPoll: 100, - taskPoll: 0, - unrelated: 0, - initialPollerCount: 10, - minPollerCount: 2, - maxPollerCount: 10, - targetMilliUsage: 500, - cooldownTime: coolDownTime, - autoScalerEpoch: 1, - isDryRun: true, - }, - want: 10, - }, - { - name: "no utilization, scale to min", - args: args{ - noTaskPoll: 100, - taskPoll: 0, - unrelated: 0, - initialPollerCount: 10, - minPollerCount: 1, - maxPollerCount: 10, - targetMilliUsage: 500, - cooldownTime: coolDownTime, - autoScalerEpoch: 1, - isDryRun: false, - }, - want: 1, - }, - { - name: "low utilization, scale down", - args: args{ - noTaskPoll: 75, - taskPoll: 25, - unrelated: 0, - initialPollerCount: 10, - minPollerCount: 1, - maxPollerCount: 10, - targetMilliUsage: 500, - cooldownTime: coolDownTime, - autoScalerEpoch: 1, - isDryRun: false, - }, - want: 5, - }, - { - name: "over utilized, scale up", - args: args{ - noTaskPoll: 0, - taskPoll: 100, - unrelated: 0, - initialPollerCount: 2, - minPollerCount: 1, - maxPollerCount: 10, - targetMilliUsage: 500, - cooldownTime: coolDownTime, - autoScalerEpoch: 1, - isDryRun: false, - }, - want: 10, - }, - { - name: "over utilized, scale up to max", - args: args{ - noTaskPoll: 0, - taskPoll: 100, - unrelated: 0, - initialPollerCount: 6, - minPollerCount: 1, - maxPollerCount: 10, - targetMilliUsage: 500, - cooldownTime: coolDownTime, - autoScalerEpoch: 1, - isDryRun: false, - }, - want: 10, - }, - { - name: "over utilized, but wait time less than cooldown time", - args: args{ - noTaskPoll: 0, - taskPoll: 100, - unrelated: 0, - initialPollerCount: 6, - minPollerCount: 1, - maxPollerCount: 10, - targetMilliUsage: 500, - cooldownTime: coolDownTime, - autoScalerEpoch: 0, - isDryRun: false, - }, - want: 6, - }, - { - name: "disabled", - args: args{disabled: true}, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - autoscalerEpoch := atomic.NewUint64(0) - pollerScaler := newPollerScaler( - pollerAutoScalerOptions{ - Enabled: !tt.args.disabled, - InitCount: tt.args.initialPollerCount, - MinCount: tt.args.minPollerCount, - MaxCount: tt.args.maxPollerCount, - Cooldown: tt.args.cooldownTime, - DryRun: tt.args.isDryRun, - TargetUtilization: float64(tt.args.targetMilliUsage) / 1000, - }, - testlogger.NewZap(t), - worker.NewResizablePermit(tt.args.initialPollerCount), - // hook function that collects number of iterations - func() { - autoscalerEpoch.Add(1) - }) - if tt.args.disabled { - assert.Nil(t, pollerScaler) - return - } - - pollerScaler.Start() - - // simulate concurrent polling - pollChan := generateRandomPollResults(tt.args.noTaskPoll, tt.args.taskPoll, tt.args.unrelated) - wg := &sync.WaitGroup{} - for i := 0; i < tt.args.maxPollerCount; i++ { - wg.Add(1) - go func() { - defer wg.Done() - for pollResult := range pollChan { - err := pollerScaler.permit.Acquire(context.Background()) - assert.NoError(t, err) - pollerScaler.CollectUsage(pollResult) - pollerScaler.permit.Release() - } - }() - } - - assert.Eventually(t, func() bool { - return autoscalerEpoch.Load() == uint64(tt.args.autoScalerEpoch) - }, tt.args.cooldownTime+100*time.Millisecond, 10*time.Millisecond) - pollerScaler.Stop() - res := pollerScaler.permit.Quota() - pollerScaler.permit.Count() - assert.Equal(t, tt.want, int(res)) - }) - } -} - -func Test_pollerUsageEstimator(t *testing.T) { - type args struct { - noTaskPoll, taskPoll, unrelated int - pollerCount int - } - tests := []struct { - name string - args args - want autoscaler.Usages - wantErr bool - }{ - { - name: "400 no-task, 100 task, 100 unrelated", - args: args{ - noTaskPoll: 400, - taskPoll: 100, - unrelated: 100, - pollerCount: 5, - }, - want: autoscaler.Usages{ - autoscaler.PollerUtilizationRate: 200, - }, - }, - { - name: "0 no-task, 0 task, 100 unrelated", - args: args{ - noTaskPoll: 0, - taskPoll: 0, - unrelated: 100, - pollerCount: 5, - }, - wantErr: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - estimator := &pollerUsageEstimator{atomicBits: atomic.NewUint64(0)} - pollChan := generateRandomPollResults(tt.args.noTaskPoll, tt.args.taskPoll, tt.args.unrelated) - wg := &sync.WaitGroup{} - for i := 0; i < tt.args.pollerCount; i++ { - wg.Add(1) - go func() { - defer wg.Done() - for pollResult := range pollChan { - estimator.CollectUsage(pollResult) - } - }() - } - wg.Wait() - - res, err := estimator.Estimate() - if tt.wantErr { - assert.Error(t, err) - assert.Nil(t, res) - } else { - assert.NoError(t, err) - assert.Equal(t, tt.want, res) - } - }) - } -} - -type unrelatedPolledTask struct{} - -func generateRandomPollResults(noTaskPoll, taskPoll, unrelated int) <-chan interface{} { - var result []interface{} - for i := 0; i < noTaskPoll; i++ { - result = append(result, &activityTask{}) - } - for i := 0; i < taskPoll; i++ { - result = append(result, &activityTask{task: &s.PollForActivityTaskResponse{}}) - } - for i := 0; i < unrelated; i++ { - result = append(result, &unrelatedPolledTask{}) - } - rand.Seed(time.Now().UnixNano()) - rand.Shuffle(len(result), func(i, j int) { - result[i], result[j] = result[j], result[i] - }) - - pollChan := make(chan interface{}, len(result)) - defer close(pollChan) - for i := range result { - pollChan <- result[i] - } - return pollChan -} diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index b4bfb0ad6..adca4c3cf 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -142,10 +142,10 @@ type ( logger *zap.Logger metricsScope tally.Scope - concurrency *worker.ConcurrencyLimit - pollerAutoScaler *pollerAutoScaler - taskQueueCh chan interface{} - sessionTokenBucket *sessionTokenBucket + concurrency *worker.ConcurrencyLimit + concurrencyAutoScaler *worker.ConcurrencyAutoScaler + taskQueueCh chan interface{} + sessionTokenBucket *sessionTokenBucket } polledTask struct { @@ -173,28 +173,31 @@ func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope t TaskPermit: worker.NewResizablePermit(options.maxConcurrentTask), } - var pollerAS *pollerAutoScaler + var concurrencyAS *worker.ConcurrencyAutoScaler if pollerOptions := options.pollerAutoScaler; pollerOptions.Enabled { - pollerAS = newPollerScaler( - pollerOptions, - logger, - concurrency.PollerPermit, - ) + concurrencyAS = worker.NewPollerAutoScaler(worker.ConcurrencyAutoScalerInput{ + Concurrency: concurrency, + Cooldown: pollerOptions.Cooldown, + PollerMaxCount: pollerOptions.MaxCount, + PollerMinCount: pollerOptions.MinCount, + Logger: logger, + Scope: metricsScope, + }) } bw := &baseWorker{ - options: options, - shutdownCh: make(chan struct{}), - taskLimiter: rate.NewLimiter(rate.Limit(options.maxTaskPerSecond), 1), - retrier: backoff.NewConcurrentRetrier(pollOperationRetryPolicy), - logger: logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType}), - metricsScope: tagScope(metricsScope, tagWorkerType, options.workerType), - concurrency: concurrency, - pollerAutoScaler: pollerAS, - taskQueueCh: make(chan interface{}), // no buffer, so poller only able to poll new task after previous is dispatched. - limiterContext: ctx, - limiterContextCancel: cancel, - sessionTokenBucket: sessionTokenBucket, + options: options, + shutdownCh: make(chan struct{}), + taskLimiter: rate.NewLimiter(rate.Limit(options.maxTaskPerSecond), 1), + retrier: backoff.NewConcurrentRetrier(pollOperationRetryPolicy), + logger: logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType}), + metricsScope: tagScope(metricsScope, tagWorkerType, options.workerType), + concurrency: concurrency, + concurrencyAutoScaler: concurrencyAS, + taskQueueCh: make(chan interface{}), // no buffer, so poller only able to poll new task after previous is dispatched. + limiterContext: ctx, + limiterContextCancel: cancel, + sessionTokenBucket: sessionTokenBucket, } if options.pollerRate > 0 { bw.pollLimiter = rate.NewLimiter(rate.Limit(options.pollerRate), 1) @@ -210,8 +213,8 @@ func (bw *baseWorker) Start() { bw.metricsScope.Counter(metrics.WorkerStartCounter).Inc(1) - if bw.pollerAutoScaler != nil { - bw.pollerAutoScaler.Start() + if bw.concurrencyAutoScaler != nil { + bw.concurrencyAutoScaler.Start() } for i := 0; i < bw.options.pollerCount; i++ { @@ -301,7 +304,7 @@ func (bw *baseWorker) pollTask() { var err error var task interface{} - if bw.pollerAutoScaler != nil { + if bw.concurrencyAutoScaler != nil { if pErr := bw.concurrency.PollerPermit.Acquire(bw.limiterContext); pErr == nil { defer bw.concurrency.PollerPermit.Release() } else { @@ -324,12 +327,8 @@ func (bw *baseWorker) pollTask() { } bw.retrier.Failed() } else { - if bw.pollerAutoScaler != nil { - if pErr := bw.pollerAutoScaler.CollectUsage(task); pErr != nil { - bw.logger.Sugar().Warnw("poller auto scaler collect usage error", - "error", pErr, - "task", task) - } + if bw.concurrencyAutoScaler != nil { + bw.concurrencyAutoScaler.ProcessPollerHint(getAutoConfigHint(task)) } bw.retrier.Succeeded() } @@ -405,8 +404,8 @@ func (bw *baseWorker) Stop() { } close(bw.shutdownCh) bw.limiterContextCancel() - if bw.pollerAutoScaler != nil { - bw.pollerAutoScaler.Stop() + if bw.concurrencyAutoScaler != nil { + bw.concurrencyAutoScaler.Stop() } if success := util.AwaitWaitGroup(&bw.shutdownWG, bw.options.shutdownTimeout); !success { @@ -421,3 +420,20 @@ func (bw *baseWorker) Stop() { } return } + +func getAutoConfigHint(task interface{}) *shared.AutoConfigHint { + switch t := task.(type) { + case workflowTask: + if t.task == nil { + return nil + } + return t.task.AutoConfigHint + case activityTask: + if t.task == nil { + return nil + } + return t.task.AutoConfigHint + default: + return nil + } +} diff --git a/internal/worker/concurrency_auto_scaler.go b/internal/worker/concurrency_auto_scaler.go new file mode 100644 index 000000000..790d430b3 --- /dev/null +++ b/internal/worker/concurrency_auto_scaler.go @@ -0,0 +1,231 @@ +// Copyright (c) 2017-2021 Uber Technologies Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package worker + +import ( + "context" + "math" + "sync" + "sync/atomic" + "time" + + "github.com/uber-go/tally" + "go.uber.org/zap" + + "go.uber.org/cadence/.gen/go/shared" +) + +const ( + concurrencyAutoScalerUpdateTick = time.Second + concurrencyAutoScalerObservabilityTick = time.Millisecond * 500 + targetPollerWaitTimeInMsLog2 = 4 // 16 ms + numberOfPollsInRollingAverage = 20 +) + +type ConcurrencyAutoScaler struct { + ctx context.Context + cancel context.CancelFunc + wg *sync.WaitGroup + log *zap.Logger + scope tally.Scope + + concurrency *ConcurrencyLimit + cooldown time.Duration + + // enable auto scaler on concurrency or not + enable atomic.Bool + + // poller + pollerMaxCount int + pollerMinCount int + pollerWaitTimeInMsLog2 *rollingAverage // log2(pollerWaitTimeInMs+1) for smoothing (ideal value is 0) + pollerPermitLastUpdate time.Time +} + +type ConcurrencyAutoScalerInput struct { + Concurrency *ConcurrencyLimit + Cooldown time.Duration // cooldown time of update + PollerMaxCount int + PollerMinCount int + Logger *zap.Logger + Scope tally.Scope +} + +func NewPollerAutoScaler(input ConcurrencyAutoScalerInput) *ConcurrencyAutoScaler { + ctx, cancel := context.WithCancel(context.Background()) + + return &ConcurrencyAutoScaler{ + ctx: ctx, + cancel: cancel, + wg: &sync.WaitGroup{}, + concurrency: input.Concurrency, + cooldown: input.Cooldown, + log: input.Logger, + scope: input.Scope, + enable: atomic.Bool{}, // initial value should be false and is only turned on from auto config hint + pollerMaxCount: input.PollerMaxCount, + pollerMinCount: input.PollerMinCount, + pollerWaitTimeInMsLog2: newRollingAverage(numberOfPollsInRollingAverage), + } +} + +func (c *ConcurrencyAutoScaler) Start() { + c.wg.Add(1) + go func() { // scaling daemon + defer c.wg.Done() + ticker := time.NewTicker(concurrencyAutoScalerUpdateTick) + for { + select { + case <-c.ctx.Done(): + ticker.Stop() + case <-ticker.C: + c.updatePollerPermit() + } + } + }() + c.wg.Add(1) + go func() { // observability daemon + defer c.wg.Done() + ticker := time.NewTicker(concurrencyAutoScalerUpdateTick) + for { + select { + case <-c.ctx.Done(): + ticker.Stop() + case <-ticker.C: + c.emit() + } + } + }() +} + +func (c *ConcurrencyAutoScaler) Stop() { + c.cancel() + c.wg.Wait() +} + +// ProcessPollerHint reads the poller response hint and take actions +// 1. update poller wait time +// 2. enable/disable auto scaler +func (c *ConcurrencyAutoScaler) ProcessPollerHint(hint *shared.AutoConfigHint) { + if hint == nil { + return + } + if hint.PollerWaitTimeInMs != nil { + waitTimeInMs := *hint.PollerWaitTimeInMs + c.pollerWaitTimeInMsLog2.Add(math.Log2(float64(waitTimeInMs + 1))) + } + + /* + Atomically compare and switch the auto scaler enable flag. If auto scaler is turned off, reset the concurrency limits. + */ + var shouldEnable bool + if hint.EnableAutoConfig != nil && *hint.EnableAutoConfig { + shouldEnable = true + } + if switched := c.enable.CompareAndSwap(!shouldEnable, shouldEnable); switched { + if shouldEnable { + c.log.Sugar().Infof("auto scaler enabled") + } else { + c.log.Sugar().Infof("auto scaler disabled") + c.ResetConcurrency() + } + } +} + +// ResetConcurrency reset poller quota to the max value. This will be used for gracefully switching the auto scaler off to avoid workers stuck in the wrong state +func (c *ConcurrencyAutoScaler) ResetConcurrency() { + c.concurrency.PollerPermit.SetQuota(c.pollerMaxCount) +} + +func (c *ConcurrencyAutoScaler) emit() { + if c.enable.Load() { + c.scope.Counter("concurrency_auto_scaler.enabled").Inc(1) + } else { + c.scope.Counter("concurrency_auto_scaler.disabled").Inc(1) + } + c.scope.Gauge("poller_in_action").Update(float64(c.concurrency.PollerPermit.Quota() - c.concurrency.PollerPermit.Count())) + c.scope.Gauge("poller_quota").Update(float64(c.concurrency.PollerPermit.Quota())) + c.scope.Gauge("poller_wait_time").Update(math.Exp2(c.pollerWaitTimeInMsLog2.Average())) +} + +func (c *ConcurrencyAutoScaler) updatePollerPermit() { + updateTime := time.Now() + if updateTime.Before(c.pollerPermitLastUpdate.Add(c.cooldown)) { // before cooldown + return + } + currentQuota := c.concurrency.PollerPermit.Quota() + newQuota := int(math.Round(float64(currentQuota) * c.pollerWaitTimeInMsLog2.Average() / targetPollerWaitTimeInMsLog2)) + if newQuota < c.pollerMinCount { + newQuota = c.pollerMinCount + } + if newQuota > c.pollerMaxCount { + newQuota = c.pollerMaxCount + } + if newQuota == currentQuota { + return + } + enabled := c.enable.Load() + c.log.Sugar().With("applied", enabled).Infof("update poller permit: %v -> %v", currentQuota, newQuota) + if !c.enable.Load() { + return + } + c.concurrency.PollerPermit.SetQuota(newQuota) + c.pollerPermitLastUpdate = updateTime +} + +type rollingAverage struct { + mu sync.Mutex + window []float64 + index int + sum float64 + count int +} + +func newRollingAverage(capacity int) *rollingAverage { + return &rollingAverage{ + window: make([]float64, capacity), + } +} + +// Add always add positive numbers +func (r *rollingAverage) Add(value float64) { + r.mu.Lock() + defer r.mu.Unlock() + + // replace the old value with the new value + r.index %= len(r.window) + r.sum += value - r.window[r.index] + r.window[r.index] = value + r.index++ + + if r.count < len(r.window) { + r.count++ + } +} + +func (r *rollingAverage) Average() float64 { + r.mu.Lock() + defer r.mu.Unlock() + if r.count == 0 { + return 0 + } + return r.sum / float64(r.count) +}