diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index adca4c3cf..39deb2282 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -35,6 +35,7 @@ import ( "go.uber.org/cadence/internal/common/debug" "go.uber.org/cadence/internal/worker" + "github.com/jonboulle/clockwork" "github.com/uber-go/tally" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -175,13 +176,14 @@ func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope t var concurrencyAS *worker.ConcurrencyAutoScaler if pollerOptions := options.pollerAutoScaler; pollerOptions.Enabled { - concurrencyAS = worker.NewPollerAutoScaler(worker.ConcurrencyAutoScalerInput{ + concurrencyAS = worker.NewConcurrencyAutoScaler(worker.ConcurrencyAutoScalerInput{ Concurrency: concurrency, Cooldown: pollerOptions.Cooldown, PollerMaxCount: pollerOptions.MaxCount, PollerMinCount: pollerOptions.MinCount, Logger: logger, Scope: metricsScope, + Clock: clockwork.NewRealClock(), }) } diff --git a/internal/worker/concurrency_auto_scaler.go b/internal/worker/concurrency_auto_scaler.go index 790d430b3..28699f2ed 100644 --- a/internal/worker/concurrency_auto_scaler.go +++ b/internal/worker/concurrency_auto_scaler.go @@ -21,12 +21,12 @@ package worker import ( - "context" "math" "sync" "sync/atomic" "time" + "github.com/jonboulle/clockwork" "github.com/uber-go/tally" "go.uber.org/zap" @@ -34,91 +34,108 @@ import ( ) const ( - concurrencyAutoScalerUpdateTick = time.Second - concurrencyAutoScalerObservabilityTick = time.Millisecond * 500 - targetPollerWaitTimeInMsLog2 = 4 // 16 ms - numberOfPollsInRollingAverage = 20 + defaultAutoScalerUpdateTick = time.Second + // concurrencyAutoScalerObservabilityTick = time.Millisecond * 500 + targetPollerWaitTimeInMsLog2 = 4 // 16 ms + numberOfPollsInRollingAverage = 20 + + autoScalerEventPollerUpdate autoScalerEvent = "update-poller-limit" + autoScalerEventPollerSkipUpdateCooldown = "skip-update-poller-limit-cooldown" + autoScalerEventPollerSkipUpdateNoChange = "skip-update-poller-limit-no-change" + autoScalerEventPollerSkipUpdateNotEnabled = "skip-update-poller-limit-not-enabled" + autoScalerEventMetrics = "metrics" + autoScalerEventEnable = "enable" + autoScalerEventDisable = "disable" + autoScalerEventStart = "start" + autoScalerEventStop = "stop" + autoScalerEventLogMsg string = "concurrency auto scaler event" + testTimeFormat string = "15:04:05" ) -type ConcurrencyAutoScaler struct { - ctx context.Context - cancel context.CancelFunc - wg *sync.WaitGroup - log *zap.Logger - scope tally.Scope +type ( + ConcurrencyAutoScaler struct { + shutdownChan chan struct{} + wg sync.WaitGroup + log *zap.Logger + scope tally.Scope + clock clockwork.Clock - concurrency *ConcurrencyLimit - cooldown time.Duration + concurrency *ConcurrencyLimit + cooldown time.Duration + updateTick time.Duration - // enable auto scaler on concurrency or not - enable atomic.Bool + // enable auto scaler on concurrency or not + enable atomic.Bool - // poller - pollerMaxCount int - pollerMinCount int - pollerWaitTimeInMsLog2 *rollingAverage // log2(pollerWaitTimeInMs+1) for smoothing (ideal value is 0) - pollerPermitLastUpdate time.Time -} + // poller + pollerInitCount int + pollerMaxCount int + pollerMinCount int + pollerWaitTimeInMsLog2 *rollingAverage // log2(pollerWaitTimeInMs+1) for smoothing (ideal value is 0) + pollerPermitLastUpdate time.Time + } -type ConcurrencyAutoScalerInput struct { - Concurrency *ConcurrencyLimit - Cooldown time.Duration // cooldown time of update - PollerMaxCount int - PollerMinCount int - Logger *zap.Logger - Scope tally.Scope -} + ConcurrencyAutoScalerInput struct { + Concurrency *ConcurrencyLimit + Cooldown time.Duration // cooldown time of update + Tick time.Duration // frequency of update check + PollerMaxCount int + PollerMinCount int + Logger *zap.Logger + Scope tally.Scope + Clock clockwork.Clock + } -func NewPollerAutoScaler(input ConcurrencyAutoScalerInput) *ConcurrencyAutoScaler { - ctx, cancel := context.WithCancel(context.Background()) + autoScalerEvent string +) +func NewConcurrencyAutoScaler(input ConcurrencyAutoScalerInput) *ConcurrencyAutoScaler { + tick := defaultAutoScalerUpdateTick + if input.Tick != 0 { + tick = input.Tick + } return &ConcurrencyAutoScaler{ - ctx: ctx, - cancel: cancel, - wg: &sync.WaitGroup{}, + shutdownChan: make(chan struct{}), concurrency: input.Concurrency, cooldown: input.Cooldown, log: input.Logger, scope: input.Scope, + clock: input.Clock, + updateTick: tick, enable: atomic.Bool{}, // initial value should be false and is only turned on from auto config hint + pollerInitCount: input.Concurrency.PollerPermit.Quota(), pollerMaxCount: input.PollerMaxCount, pollerMinCount: input.PollerMinCount, pollerWaitTimeInMsLog2: newRollingAverage(numberOfPollsInRollingAverage), + pollerPermitLastUpdate: input.Clock.Now(), } } func (c *ConcurrencyAutoScaler) Start() { + c.logEvent(autoScalerEventStart) + c.wg.Add(1) - go func() { // scaling daemon + + go func() { defer c.wg.Done() - ticker := time.NewTicker(concurrencyAutoScalerUpdateTick) + ticker := c.clock.NewTicker(c.updateTick) + defer ticker.Stop() for { select { - case <-c.ctx.Done(): - ticker.Stop() - case <-ticker.C: + case <-c.shutdownChan: + return + case <-ticker.Chan(): + c.logEvent(autoScalerEventMetrics) c.updatePollerPermit() } } }() - c.wg.Add(1) - go func() { // observability daemon - defer c.wg.Done() - ticker := time.NewTicker(concurrencyAutoScalerUpdateTick) - for { - select { - case <-c.ctx.Done(): - ticker.Stop() - case <-ticker.C: - c.emit() - } - } - }() } func (c *ConcurrencyAutoScaler) Stop() { - c.cancel() + close(c.shutdownChan) c.wg.Wait() + c.logEvent(autoScalerEventStop) } // ProcessPollerHint reads the poller response hint and take actions @@ -126,6 +143,7 @@ func (c *ConcurrencyAutoScaler) Stop() { // 2. enable/disable auto scaler func (c *ConcurrencyAutoScaler) ProcessPollerHint(hint *shared.AutoConfigHint) { if hint == nil { + c.log.Warn("auto config hint is nil, this results in no action") return } if hint.PollerWaitTimeInMs != nil { @@ -134,7 +152,7 @@ func (c *ConcurrencyAutoScaler) ProcessPollerHint(hint *shared.AutoConfigHint) { } /* - Atomically compare and switch the auto scaler enable flag. If auto scaler is turned off, reset the concurrency limits. + Atomically compare and switch the auto scaler enable flag. If auto scaler is turned off, IMMEDIATELY reset the concurrency limits. */ var shouldEnable bool if hint.EnableAutoConfig != nil && *hint.EnableAutoConfig { @@ -142,33 +160,45 @@ func (c *ConcurrencyAutoScaler) ProcessPollerHint(hint *shared.AutoConfigHint) { } if switched := c.enable.CompareAndSwap(!shouldEnable, shouldEnable); switched { if shouldEnable { - c.log.Sugar().Infof("auto scaler enabled") + c.logEvent(autoScalerEventEnable) } else { - c.log.Sugar().Infof("auto scaler disabled") - c.ResetConcurrency() + c.resetConcurrency() + c.logEvent(autoScalerEventDisable) } } } -// ResetConcurrency reset poller quota to the max value. This will be used for gracefully switching the auto scaler off to avoid workers stuck in the wrong state -func (c *ConcurrencyAutoScaler) ResetConcurrency() { - c.concurrency.PollerPermit.SetQuota(c.pollerMaxCount) +// resetConcurrency reset poller quota to the max value. This will be used for gracefully switching the auto scaler off to avoid workers stuck in the wrong state +func (c *ConcurrencyAutoScaler) resetConcurrency() { + c.concurrency.PollerPermit.SetQuota(c.pollerInitCount) } -func (c *ConcurrencyAutoScaler) emit() { +func (c *ConcurrencyAutoScaler) logEvent(event autoScalerEvent) { if c.enable.Load() { c.scope.Counter("concurrency_auto_scaler.enabled").Inc(1) } else { c.scope.Counter("concurrency_auto_scaler.disabled").Inc(1) } - c.scope.Gauge("poller_in_action").Update(float64(c.concurrency.PollerPermit.Quota() - c.concurrency.PollerPermit.Count())) + c.scope.Gauge("poller_in_action").Update(float64(c.concurrency.PollerPermit.Count())) c.scope.Gauge("poller_quota").Update(float64(c.concurrency.PollerPermit.Quota())) c.scope.Gauge("poller_wait_time").Update(math.Exp2(c.pollerWaitTimeInMsLog2.Average())) + c.log.Debug(autoScalerEventLogMsg, + zap.Time("time", c.clock.Now()), + zap.String("event", string(event)), + zap.Bool("enabled", c.enable.Load()), + zap.Int("poller_quota", c.concurrency.PollerPermit.Quota()), + zap.Int("poller_in_action", c.concurrency.PollerPermit.Count()), + ) } func (c *ConcurrencyAutoScaler) updatePollerPermit() { - updateTime := time.Now() + if !c.enable.Load() { // skip update if auto scaler is disabled + c.logEvent(autoScalerEventPollerSkipUpdateNotEnabled) + return + } + updateTime := c.clock.Now() if updateTime.Before(c.pollerPermitLastUpdate.Add(c.cooldown)) { // before cooldown + c.logEvent(autoScalerEventPollerSkipUpdateCooldown) return } currentQuota := c.concurrency.PollerPermit.Quota() @@ -180,19 +210,16 @@ func (c *ConcurrencyAutoScaler) updatePollerPermit() { newQuota = c.pollerMaxCount } if newQuota == currentQuota { - return - } - enabled := c.enable.Load() - c.log.Sugar().With("applied", enabled).Infof("update poller permit: %v -> %v", currentQuota, newQuota) - if !c.enable.Load() { + c.logEvent(autoScalerEventPollerSkipUpdateNoChange) return } c.concurrency.PollerPermit.SetQuota(newQuota) c.pollerPermitLastUpdate = updateTime + c.logEvent(autoScalerEventPollerUpdate) } type rollingAverage struct { - mu sync.Mutex + mu sync.RWMutex window []float64 index int sum float64 @@ -210,6 +237,11 @@ func (r *rollingAverage) Add(value float64) { r.mu.Lock() defer r.mu.Unlock() + // no op on zero rolling window + if len(r.window) == 0 { + return + } + // replace the old value with the new value r.index %= len(r.window) r.sum += value - r.window[r.index] @@ -222,8 +254,8 @@ func (r *rollingAverage) Add(value float64) { } func (r *rollingAverage) Average() float64 { - r.mu.Lock() - defer r.mu.Unlock() + r.mu.RLock() + defer r.mu.RUnlock() if r.count == 0 { return 0 } diff --git a/internal/worker/concurrency_auto_scaler_test.go b/internal/worker/concurrency_auto_scaler_test.go new file mode 100644 index 000000000..f5da2cafd --- /dev/null +++ b/internal/worker/concurrency_auto_scaler_test.go @@ -0,0 +1,323 @@ +// Copyright (c) 2017-2021 Uber Technologies Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package worker + +import ( + "sync" + "testing" + "time" + + "github.com/jonboulle/clockwork" + "github.com/stretchr/testify/assert" + "github.com/uber-go/tally" + "go.uber.org/goleak" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" + + "go.uber.org/cadence/.gen/go/shared" + "go.uber.org/cadence/internal/common" +) + +const ( + testTickTime = 1 * time.Second +) + +func createTestConcurrencyAutoScaler(t *testing.T, logger *zap.Logger, clock clockwork.Clock) *ConcurrencyAutoScaler { + return NewConcurrencyAutoScaler(ConcurrencyAutoScalerInput{ + Concurrency: &ConcurrencyLimit{ + PollerPermit: NewResizablePermit(100), + TaskPermit: NewResizablePermit(1000), + }, + Cooldown: 2 * testTickTime, + Tick: testTickTime, + PollerMaxCount: 200, + PollerMinCount: 50, + Logger: logger, + Scope: tally.NoopScope, + Clock: clock, + }) +} + +func TestConcurrencyAutoScaler(t *testing.T) { + + type eventLog struct { + eventType autoScalerEvent + enabled bool + pollerQuota int64 + time string + } + + for _, tt := range []struct { + name string + pollAutoConfigHint []*shared.AutoConfigHint + expectedEvents []eventLog + }{ + { + "start and stop immediately", + []*shared.AutoConfigHint{}, + []eventLog{ + {autoScalerEventStart, false, 100, "00:00:00"}, + {autoScalerEventStop, false, 100, "00:00:00"}, + }, + }, + { + "just enough pollers", + []*shared.AutoConfigHint{ + {common.PtrOf(true), common.PtrOf(int64(15))}, // <- tick, in cool down + {common.PtrOf(true), common.PtrOf(int64(15))}, // <- tick, no update + }, + []eventLog{ + {autoScalerEventStart, false, 100, "00:00:00"}, + {autoScalerEventEnable, true, 100, "00:00:00"}, + {autoScalerEventPollerSkipUpdateCooldown, true, 100, "00:00:01"}, + {autoScalerEventPollerSkipUpdateNoChange, true, 100, "00:00:02"}, + {autoScalerEventStop, true, 100, "00:00:02"}, + }, + }, + { + "too many pollers", + []*shared.AutoConfigHint{ + {common.PtrOf(true), common.PtrOf(int64(10))}, // <- tick, in cool down + {common.PtrOf(true), common.PtrOf(int64(10))}, // <- tick, scale up + }, + []eventLog{ + {autoScalerEventStart, false, 100, "00:00:00"}, + {autoScalerEventEnable, true, 100, "00:00:00"}, + {autoScalerEventPollerSkipUpdateCooldown, true, 100, "00:00:01"}, + {autoScalerEventPollerUpdate, true, 86, "00:00:02"}, + {autoScalerEventStop, true, 86, "00:00:02"}, + }, + }, + { + "too many pollers, scale down to minimum", + []*shared.AutoConfigHint{ + {common.PtrOf(true), common.PtrOf(int64(0))}, // <- tick, in cool down + {common.PtrOf(true), common.PtrOf(int64(0))}, // <- tick, scale down to minimum + }, + []eventLog{ + {autoScalerEventStart, false, 100, "00:00:00"}, + {autoScalerEventEnable, true, 100, "00:00:00"}, + {autoScalerEventPollerSkipUpdateCooldown, true, 100, "00:00:01"}, + {autoScalerEventPollerUpdate, true, 50, "00:00:02"}, + {autoScalerEventStop, true, 50, "00:00:02"}, + }, + }, + { + "lack pollers", + []*shared.AutoConfigHint{ + {common.PtrOf(true), common.PtrOf(int64(100))}, // <- tick, in cool down + {common.PtrOf(true), common.PtrOf(int64(100))}, // <- tick, scale up + }, + []eventLog{ + {autoScalerEventStart, false, 100, "00:00:00"}, + {autoScalerEventEnable, true, 100, "00:00:00"}, + {autoScalerEventPollerSkipUpdateCooldown, true, 100, "00:00:01"}, + {autoScalerEventPollerUpdate, true, 166, "00:00:02"}, + {autoScalerEventStop, true, 166, "00:00:02"}, + }, + }, + { + "lack pollers, scale up to maximum", + []*shared.AutoConfigHint{ + {common.PtrOf(true), common.PtrOf(int64(10000))}, // <- tick, in cool down + {common.PtrOf(true), common.PtrOf(int64(10000))}, // <- tick, scale up + }, + []eventLog{ + {autoScalerEventStart, false, 100, "00:00:00"}, + {autoScalerEventEnable, true, 100, "00:00:00"}, + {autoScalerEventPollerSkipUpdateCooldown, true, 100, "00:00:01"}, + {autoScalerEventPollerUpdate, true, 200, "00:00:02"}, + {autoScalerEventStop, true, 200, "00:00:02"}, + }, + }, + { + "lack pollers but disabled", + []*shared.AutoConfigHint{ + {common.PtrOf(false), common.PtrOf(int64(100))}, // <- tick, in cool down + {common.PtrOf(false), common.PtrOf(int64(100))}, // <- tick, scale up + }, + []eventLog{ + {autoScalerEventStart, false, 100, "00:00:00"}, + {autoScalerEventPollerSkipUpdateNotEnabled, false, 100, "00:00:01"}, + {autoScalerEventPollerSkipUpdateNotEnabled, false, 100, "00:00:02"}, + {autoScalerEventStop, false, 100, "00:00:02"}, + }, + }, + { + "too many pollers but disabled at a later time", + []*shared.AutoConfigHint{ + {common.PtrOf(true), common.PtrOf(int64(10))}, // <- tick, in cool down + {common.PtrOf(true), common.PtrOf(int64(10))}, // <- tick, scale up + {common.PtrOf(false), common.PtrOf(int64(10))}, // <- disable + }, + []eventLog{ + {autoScalerEventStart, false, 100, "00:00:00"}, + {autoScalerEventEnable, true, 100, "00:00:00"}, + {autoScalerEventPollerSkipUpdateCooldown, true, 100, "00:00:01"}, + {autoScalerEventPollerUpdate, true, 86, "00:00:02"}, + {autoScalerEventDisable, false, 100, "00:00:02"}, + {autoScalerEventPollerSkipUpdateNotEnabled, false, 100, "00:00:03"}, + {autoScalerEventStop, false, 100, "00:00:03"}, + }, + }, + { + "lack pollers and enabled at a later time", + []*shared.AutoConfigHint{ + {common.PtrOf(false), common.PtrOf(int64(100))}, // <- tick, in cool down + {common.PtrOf(false), common.PtrOf(int64(100))}, // <- tick, not enabled + {common.PtrOf(true), common.PtrOf(int64(100))}, // <- tick, enable scale up + }, + []eventLog{ + {autoScalerEventStart, false, 100, "00:00:00"}, + {autoScalerEventPollerSkipUpdateNotEnabled, false, 100, "00:00:01"}, + {autoScalerEventPollerSkipUpdateNotEnabled, false, 100, "00:00:02"}, + {autoScalerEventEnable, true, 100, "00:00:02"}, + {autoScalerEventPollerUpdate, true, 166, "00:00:03"}, + {autoScalerEventStop, true, 166, "00:00:03"}, + }, + }, + } { + t.Run(tt.name, func(t *testing.T) { + defer goleak.VerifyNone(t) + core, obs := observer.New(zap.DebugLevel) + logger := zap.New(core) + clock := clockwork.NewFakeClockAt(time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)) + scaler := createTestConcurrencyAutoScaler(t, logger, clock) + + // mock poller every 1 tick time + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + clock.Sleep(testTickTime / 2) // poll delay by 0.5 unit of time to avoid test flakiness + for _, hint := range tt.pollAutoConfigHint { + t.Log("hint process time: ", clock.Now().Format(testTimeFormat)) + scaler.ProcessPollerHint(hint) + clock.Sleep(testTickTime) + } + }() + + scaler.Start() + clock.BlockUntil(2) + + // advance clock + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < len(tt.pollAutoConfigHint)*2+1; i++ { + clock.Advance(testTickTime / 2) + time.Sleep(10 * time.Millisecond) // process non-time logic + } + }() + + wg.Wait() + scaler.Stop() + + var actualEvents []eventLog + for _, event := range obs.FilterMessage(autoScalerEventLogMsg).All() { + if event.ContextMap()["event"] != autoScalerEventMetrics { + t.Log("event: ", event.ContextMap()) + actualEvents = append(actualEvents, eventLog{ + eventType: autoScalerEvent(event.ContextMap()["event"].(string)), + enabled: event.ContextMap()["enabled"].(bool), + pollerQuota: event.ContextMap()["poller_quota"].(int64), + time: event.ContextMap()["time"].(time.Time).Format(testTimeFormat), + }) + } + } + assert.ElementsMatch(t, tt.expectedEvents, actualEvents) + }) + } +} + +func TestRollingAverage(t *testing.T) { + for _, tt := range []struct { + name string + cap int + addGoroutine int + input []float64 + expected []float64 + }{ + { + "cap is 0", + 0, + 5, + []float64{1, 2, 3, 4, 5, 6, 7}, + []float64{0, 0, 0, 0, 0, 0, 0}, + }, + { + "cap is 1", + 1, + 5, + []float64{1, 2, 3, 4, 5, 6, 7}, + []float64{1, 2, 3, 4, 5, 6, 7}, + }, + { + "cap is 2", + 2, + 5, + []float64{1, 2, 3, 4, 5, 6, 7}, + []float64{1, 1.5, 2.5, 3.5, 4.5, 5.5, 6.5}, + }, + { + "cap is 3", + 3, + 5, + []float64{1, 2, 3, 4, 5, 6, 7}, + []float64{1, 1.5, 2, 3, 4, 5, 6}, + }, + { + "cap is 4", + 4, + 5, + []float64{1, 2, 3, 4, 5, 6, 7}, + []float64{1, 1.5, 2, 2.5, 3.5, 4.5, 5.5}, + }, + } { + t.Run(tt.name, func(t *testing.T) { + defer goleak.VerifyNone(t) + r := newRollingAverage(tt.cap) + + doneC := make(chan struct{}) + + inputChan := make(chan float64) + var wg sync.WaitGroup + for i := 0; i < tt.addGoroutine; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for v := range inputChan { + r.Add(v) + doneC <- struct{}{} + } + }() + } + + for i := range tt.input { + inputChan <- tt.input[i] + <-doneC + assert.Equal(t, tt.expected[i], r.Average()) + } + close(inputChan) + wg.Wait() + }) + } +}