Skip to content

Commit

Permalink
add unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
shijiesheng committed Dec 20, 2024
1 parent f2db045 commit 636c433
Show file tree
Hide file tree
Showing 3 changed files with 429 additions and 72 deletions.
4 changes: 3 additions & 1 deletion internal/internal_worker_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"go.uber.org/cadence/internal/common/debug"
"go.uber.org/cadence/internal/worker"

"github.com/jonboulle/clockwork"
"github.com/uber-go/tally"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -175,13 +176,14 @@ func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope t

var concurrencyAS *worker.ConcurrencyAutoScaler
if pollerOptions := options.pollerAutoScaler; pollerOptions.Enabled {
concurrencyAS = worker.NewPollerAutoScaler(worker.ConcurrencyAutoScalerInput{
concurrencyAS = worker.NewConcurrencyAutoScaler(worker.ConcurrencyAutoScalerInput{
Concurrency: concurrency,
Cooldown: pollerOptions.Cooldown,
PollerMaxCount: pollerOptions.MaxCount,
PollerMinCount: pollerOptions.MinCount,
Logger: logger,
Scope: metricsScope,
Clock: clockwork.NewRealClock(),
})
}

Expand Down
174 changes: 103 additions & 71 deletions internal/worker/concurrency_auto_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,111 +21,129 @@
package worker

import (
"context"
"math"
"sync"
"sync/atomic"
"time"

"github.com/jonboulle/clockwork"
"github.com/uber-go/tally"
"go.uber.org/zap"

"go.uber.org/cadence/.gen/go/shared"
)

const (
concurrencyAutoScalerUpdateTick = time.Second
concurrencyAutoScalerObservabilityTick = time.Millisecond * 500
targetPollerWaitTimeInMsLog2 = 4 // 16 ms
numberOfPollsInRollingAverage = 20
defaultAutoScalerUpdateTick = time.Second
// concurrencyAutoScalerObservabilityTick = time.Millisecond * 500
targetPollerWaitTimeInMsLog2 = 4 // 16 ms
numberOfPollsInRollingAverage = 20

autoScalerEventPollerUpdate autoScalerEvent = "update-poller-limit"
autoScalerEventPollerSkipUpdateCooldown = "skip-update-poller-limit-cooldown"
autoScalerEventPollerSkipUpdateNoChange = "skip-update-poller-limit-no-change"
autoScalerEventPollerSkipUpdateNotEnabled = "skip-update-poller-limit-not-enabled"
autoScalerEventMetrics = "metrics"
autoScalerEventEnable = "enable"
autoScalerEventDisable = "disable"
autoScalerEventStart = "start"
autoScalerEventStop = "stop"
autoScalerEventLogMsg string = "concurrency auto scaler event"
testTimeFormat string = "15:04:05"
)

type ConcurrencyAutoScaler struct {
ctx context.Context
cancel context.CancelFunc
wg *sync.WaitGroup
log *zap.Logger
scope tally.Scope
type (
ConcurrencyAutoScaler struct {
shutdownChan chan struct{}
wg sync.WaitGroup
log *zap.Logger
scope tally.Scope
clock clockwork.Clock

concurrency *ConcurrencyLimit
cooldown time.Duration
concurrency *ConcurrencyLimit
cooldown time.Duration
updateTick time.Duration

// enable auto scaler on concurrency or not
enable atomic.Bool
// enable auto scaler on concurrency or not
enable atomic.Bool

// poller
pollerMaxCount int
pollerMinCount int
pollerWaitTimeInMsLog2 *rollingAverage // log2(pollerWaitTimeInMs+1) for smoothing (ideal value is 0)
pollerPermitLastUpdate time.Time
}
// poller
pollerInitCount int
pollerMaxCount int
pollerMinCount int
pollerWaitTimeInMsLog2 *rollingAverage // log2(pollerWaitTimeInMs+1) for smoothing (ideal value is 0)
pollerPermitLastUpdate time.Time
}

type ConcurrencyAutoScalerInput struct {
Concurrency *ConcurrencyLimit
Cooldown time.Duration // cooldown time of update
PollerMaxCount int
PollerMinCount int
Logger *zap.Logger
Scope tally.Scope
}
ConcurrencyAutoScalerInput struct {
Concurrency *ConcurrencyLimit
Cooldown time.Duration // cooldown time of update
Tick time.Duration // frequency of update check
PollerMaxCount int
PollerMinCount int
Logger *zap.Logger
Scope tally.Scope
Clock clockwork.Clock
}

func NewPollerAutoScaler(input ConcurrencyAutoScalerInput) *ConcurrencyAutoScaler {
ctx, cancel := context.WithCancel(context.Background())
autoScalerEvent string
)

func NewConcurrencyAutoScaler(input ConcurrencyAutoScalerInput) *ConcurrencyAutoScaler {
tick := defaultAutoScalerUpdateTick
if input.Tick != 0 {
tick = input.Tick
}
return &ConcurrencyAutoScaler{
ctx: ctx,
cancel: cancel,
wg: &sync.WaitGroup{},
shutdownChan: make(chan struct{}),
concurrency: input.Concurrency,
cooldown: input.Cooldown,
log: input.Logger,
scope: input.Scope,
clock: input.Clock,
updateTick: tick,
enable: atomic.Bool{}, // initial value should be false and is only turned on from auto config hint
pollerInitCount: input.Concurrency.PollerPermit.Quota(),
pollerMaxCount: input.PollerMaxCount,
pollerMinCount: input.PollerMinCount,
pollerWaitTimeInMsLog2: newRollingAverage(numberOfPollsInRollingAverage),
pollerPermitLastUpdate: input.Clock.Now(),
}
}

func (c *ConcurrencyAutoScaler) Start() {
c.logEvent(autoScalerEventStart)

c.wg.Add(1)
go func() { // scaling daemon

go func() {
defer c.wg.Done()
ticker := time.NewTicker(concurrencyAutoScalerUpdateTick)
ticker := c.clock.NewTicker(c.updateTick)
defer ticker.Stop()
for {
select {
case <-c.ctx.Done():
ticker.Stop()
case <-ticker.C:
case <-c.shutdownChan:
return
case <-ticker.Chan():
c.logEvent(autoScalerEventMetrics)
c.updatePollerPermit()
}
}
}()
c.wg.Add(1)
go func() { // observability daemon
defer c.wg.Done()
ticker := time.NewTicker(concurrencyAutoScalerUpdateTick)
for {
select {
case <-c.ctx.Done():
ticker.Stop()
case <-ticker.C:
c.emit()
}
}
}()
}

func (c *ConcurrencyAutoScaler) Stop() {
c.cancel()
close(c.shutdownChan)
c.wg.Wait()
c.logEvent(autoScalerEventStop)
}

// ProcessPollerHint reads the poller response hint and take actions
// 1. update poller wait time
// 2. enable/disable auto scaler
func (c *ConcurrencyAutoScaler) ProcessPollerHint(hint *shared.AutoConfigHint) {
if hint == nil {
c.log.Warn("auto config hint is nil, this results in no action")
return
}

Check warning on line 148 in internal/worker/concurrency_auto_scaler.go

View check run for this annotation

Codecov / codecov/patch

internal/worker/concurrency_auto_scaler.go#L146-L148

Added lines #L146 - L148 were not covered by tests
if hint.PollerWaitTimeInMs != nil {
Expand All @@ -134,41 +152,53 @@ func (c *ConcurrencyAutoScaler) ProcessPollerHint(hint *shared.AutoConfigHint) {
}

/*
Atomically compare and switch the auto scaler enable flag. If auto scaler is turned off, reset the concurrency limits.
Atomically compare and switch the auto scaler enable flag. If auto scaler is turned off, IMMEDIATELY reset the concurrency limits.
*/
var shouldEnable bool
if hint.EnableAutoConfig != nil && *hint.EnableAutoConfig {
shouldEnable = true
}
if switched := c.enable.CompareAndSwap(!shouldEnable, shouldEnable); switched {
if shouldEnable {
c.log.Sugar().Infof("auto scaler enabled")
c.logEvent(autoScalerEventEnable)
} else {
c.log.Sugar().Infof("auto scaler disabled")
c.ResetConcurrency()
c.resetConcurrency()
c.logEvent(autoScalerEventDisable)
}
}
}

// ResetConcurrency reset poller quota to the max value. This will be used for gracefully switching the auto scaler off to avoid workers stuck in the wrong state
func (c *ConcurrencyAutoScaler) ResetConcurrency() {
c.concurrency.PollerPermit.SetQuota(c.pollerMaxCount)
// resetConcurrency reset poller quota to the max value. This will be used for gracefully switching the auto scaler off to avoid workers stuck in the wrong state
func (c *ConcurrencyAutoScaler) resetConcurrency() {
c.concurrency.PollerPermit.SetQuota(c.pollerInitCount)
}

func (c *ConcurrencyAutoScaler) emit() {
func (c *ConcurrencyAutoScaler) logEvent(event autoScalerEvent) {
if c.enable.Load() {
c.scope.Counter("concurrency_auto_scaler.enabled").Inc(1)
} else {
c.scope.Counter("concurrency_auto_scaler.disabled").Inc(1)
}
c.scope.Gauge("poller_in_action").Update(float64(c.concurrency.PollerPermit.Quota() - c.concurrency.PollerPermit.Count()))
c.scope.Gauge("poller_in_action").Update(float64(c.concurrency.PollerPermit.Count()))
c.scope.Gauge("poller_quota").Update(float64(c.concurrency.PollerPermit.Quota()))
c.scope.Gauge("poller_wait_time").Update(math.Exp2(c.pollerWaitTimeInMsLog2.Average()))
c.log.Debug(autoScalerEventLogMsg,
zap.Time("time", c.clock.Now()),
zap.String("event", string(event)),
zap.Bool("enabled", c.enable.Load()),
zap.Int("poller_quota", c.concurrency.PollerPermit.Quota()),
zap.Int("poller_in_action", c.concurrency.PollerPermit.Count()),
)
}

func (c *ConcurrencyAutoScaler) updatePollerPermit() {
updateTime := time.Now()
if !c.enable.Load() { // skip update if auto scaler is disabled
c.logEvent(autoScalerEventPollerSkipUpdateNotEnabled)
return
}
updateTime := c.clock.Now()
if updateTime.Before(c.pollerPermitLastUpdate.Add(c.cooldown)) { // before cooldown
c.logEvent(autoScalerEventPollerSkipUpdateCooldown)
return
}
currentQuota := c.concurrency.PollerPermit.Quota()
Expand All @@ -180,19 +210,16 @@ func (c *ConcurrencyAutoScaler) updatePollerPermit() {
newQuota = c.pollerMaxCount
}
if newQuota == currentQuota {
return
}
enabled := c.enable.Load()
c.log.Sugar().With("applied", enabled).Infof("update poller permit: %v -> %v", currentQuota, newQuota)
if !c.enable.Load() {
c.logEvent(autoScalerEventPollerSkipUpdateNoChange)
return
}
c.concurrency.PollerPermit.SetQuota(newQuota)
c.pollerPermitLastUpdate = updateTime
c.logEvent(autoScalerEventPollerUpdate)
}

type rollingAverage struct {
mu sync.Mutex
mu sync.RWMutex
window []float64
index int
sum float64
Expand All @@ -210,6 +237,11 @@ func (r *rollingAverage) Add(value float64) {
r.mu.Lock()
defer r.mu.Unlock()

// no op on zero rolling window
if len(r.window) == 0 {
return
}

// replace the old value with the new value
r.index %= len(r.window)
r.sum += value - r.window[r.index]
Expand All @@ -222,8 +254,8 @@ func (r *rollingAverage) Add(value float64) {
}

func (r *rollingAverage) Average() float64 {
r.mu.Lock()
defer r.mu.Unlock()
r.mu.RLock()
defer r.mu.RUnlock()
if r.count == 0 {
return 0
}
Expand Down
Loading

0 comments on commit 636c433

Please sign in to comment.