diff --git a/capacityPlanner/capacityPlannerImpl.go b/capacityPlanner/capacityPlannerImpl.go index 5f0859bf..60faad97 100644 --- a/capacityPlanner/capacityPlannerImpl.go +++ b/capacityPlanner/capacityPlannerImpl.go @@ -23,7 +23,7 @@ func (cp *CapacityPlanner) Plan(scaleFactor float32, currentScale uint) uint { // IsCoolingDown returns true if the CapacityPlanner thinks that currently a new scaling // would not be a good idea. -func (cp *CapacityPlanner) IsCoolingDown(timeOfLastScale time.Time, scaleDown bool) bool { +func (cp *CapacityPlanner) IsCoolingDown(timeOfLastScale time.Time, scaleDown bool) (cooldownActive bool, cooldownTimeLeft time.Duration) { now := time.Now() dur := cp.upScaleCooldownPeriod @@ -31,9 +31,11 @@ func (cp *CapacityPlanner) IsCoolingDown(timeOfLastScale time.Time, scaleDown bo dur = cp.downScaleCooldownPeriod } + // still cooling down if timeOfLastScale.Add(dur).After(now) { - return true + return true, timeOfLastScale.Add(dur).Sub(now) } - return false + // not cooling down any more + return false, time.Second * 0 } diff --git a/capacityPlanner/capacityPlanner_test.go b/capacityPlanner/capacityPlanner_test.go index e9fa6690..51b67e19 100644 --- a/capacityPlanner/capacityPlanner_test.go +++ b/capacityPlanner/capacityPlanner_test.go @@ -1,6 +1,7 @@ package capacityPlanner import ( + "fmt" "testing" "time" @@ -101,27 +102,33 @@ func Test_IsCoolingDown(t *testing.T) { require.NotNil(t, capa) lastScale := time.Now() - result := capa.IsCoolingDown(lastScale, false) + result, timeLeft := capa.IsCoolingDown(lastScale, false) assert.True(t, result) + assert.InEpsilon(t, time.Second*10, timeLeft, 0.1, fmt.Sprintf("left %s", timeLeft.String())) - result = capa.IsCoolingDown(lastScale, true) + result, timeLeft = capa.IsCoolingDown(lastScale, true) assert.True(t, result) + assert.InEpsilon(t, time.Second*20, timeLeft, 0.1, fmt.Sprintf("left %s", timeLeft.String())) // Upscaling lastScale = time.Now().Add(time.Second * -11) - result = capa.IsCoolingDown(lastScale, false) + result, timeLeft = capa.IsCoolingDown(lastScale, false) assert.False(t, result) + assert.Equal(t, time.Second*0, timeLeft, fmt.Sprintf("left %s", timeLeft.String())) lastScale = time.Now().Add(time.Second * -9) - result = capa.IsCoolingDown(lastScale, false) + result, timeLeft = capa.IsCoolingDown(lastScale, false) assert.True(t, result) + assert.InEpsilon(t, time.Second*1, timeLeft, 0.1, fmt.Sprintf("left %s", timeLeft.String())) // Downscaling lastScale = time.Now().Add(time.Second * -21) - result = capa.IsCoolingDown(lastScale, true) + result, timeLeft = capa.IsCoolingDown(lastScale, true) assert.False(t, result) + assert.Equal(t, time.Second*0, timeLeft, fmt.Sprintf("left %s", timeLeft.String())) lastScale = time.Now().Add(time.Second * -19) - result = capa.IsCoolingDown(lastScale, true) + result, timeLeft = capa.IsCoolingDown(lastScale, true) assert.True(t, result) + assert.InEpsilon(t, time.Second*1, timeLeft, 0.1, fmt.Sprintf("left %s", timeLeft.String())) } diff --git a/doc/DryRunMode.md b/doc/DryRunMode.md new file mode 100644 index 00000000..4d3894c8 --- /dev/null +++ b/doc/DryRunMode.md @@ -0,0 +1,10 @@ +# Dry Run Mode + +The following table shows how sokar behaves in case the dry run mode is activated. + +| Feature | Dry Run Mode Active | Dry Run Mode Deactivated | +| :-------------------------------------------------------------------------------------------------------------------------------------------------- | :----------------------------------------------------------------- | :----------------------- | +| Automatic Scaling | Deactivated | Active | +| Manual Scaling | Possible | Not Possible | +| ScaleObjectWatcher | Deactivated | Active | +| PlanedButSkippedScalingOpen
_(The metric `sokar_sca_planned_but_skipped_scaling_open`,
for more information see [Metrics.md](../Metrics.md))_ | Set to 1 if a scaling was skipped
Set to 0 after manual scaling | Stays 0 | diff --git a/main.go b/main.go index 7f721899..31a80a16 100644 --- a/main.go +++ b/main.go @@ -63,7 +63,7 @@ func main() { logger.Info().Msgf("Scaling Target: %s", scalingTarget.String()) logger.Info().Msg("5. Setup: Scaler") - scaler := helper.Must(setupScaler(cfg.ScaleObject.Name, cfg.ScaleObject.MinCount, cfg.ScaleObject.MaxCount, cfg.Scaler.WatcherInterval, scalingTarget, loggingFactory)).(*scaler.Scaler) + scaler := helper.Must(setupScaler(cfg.ScaleObject.Name, cfg.ScaleObject.MinCount, cfg.ScaleObject.MaxCount, cfg.Scaler.WatcherInterval, scalingTarget, loggingFactory, cfg.DryRunMode)).(*scaler.Scaler) logger.Info().Msg("6. Setup: CapacityPlanner") @@ -259,7 +259,7 @@ func setupScalingTarget(cfg config.Scaler, logF logging.LoggerFactory) (scaler.S } // setupScaler creates and configures the Scaler. Internally nomad is used as scaling target. -func setupScaler(scalingObjName string, min uint, max uint, watcherInterval time.Duration, scalingTarget scaler.ScalingTarget, logF logging.LoggerFactory) (*scaler.Scaler, error) { +func setupScaler(scalingObjName string, min uint, max uint, watcherInterval time.Duration, scalingTarget scaler.ScalingTarget, logF logging.LoggerFactory, dryRunMode bool) (*scaler.Scaler, error) { if logF == nil { return nil, fmt.Errorf("Logging factory is nil") @@ -276,6 +276,7 @@ func setupScaler(scalingObjName string, min uint, max uint, watcherInterval time scaler.NewMetrics(), scaler.WithLogger(logF.NewNamedLogger("sokar.scaler")), scaler.WatcherInterval(watcherInterval), + scaler.DryRunMode(dryRunMode), ) if err != nil { return nil, fmt.Errorf("Failed setting up scaler: %s", err) diff --git a/main_test.go b/main_test.go index 26838f52..cdf4722b 100644 --- a/main_test.go +++ b/main_test.go @@ -53,16 +53,16 @@ func Test_SetupScaler_Failures(t *testing.T) { logF := mock_logging.NewMockLoggerFactory(mockCtrl) // no logging factory - scaler, err := setupScaler("any", 0, 1, time.Second*1, nil, nil) + scaler, err := setupScaler("any", 0, 1, time.Second*1, nil, nil, false) assert.Error(t, err) assert.Nil(t, scaler) - scaler, err = setupScaler("any", 0, 1, time.Second*1, nil, logF) + scaler, err = setupScaler("any", 0, 1, time.Second*1, nil, logF, false) assert.Error(t, err) assert.Nil(t, scaler) // invalid watcher-interval - scaler, err = setupScaler("any", 0, 1, time.Second*0, nil, nil) + scaler, err = setupScaler("any", 0, 1, time.Second*0, nil, nil, false) assert.Error(t, err) assert.Nil(t, scaler) } @@ -75,7 +75,7 @@ func Test_SetupScaler(t *testing.T) { scalingTarget := mock_scaler.NewMockScalingTarget(mockCtrl) logF.EXPECT().NewNamedLogger(gomock.Any()).Times(1) - scaler, err := setupScaler("any", 0, 1, time.Second*1, scalingTarget, logF) + scaler, err := setupScaler("any", 0, 1, time.Second*1, scalingTarget, logF, false) assert.NoError(t, err) assert.NotNil(t, scaler) } diff --git a/scaler/scale.go b/scaler/scale.go index 185265ce..18593859 100644 --- a/scaler/scale.go +++ b/scaler/scale.go @@ -2,6 +2,7 @@ package scaler import ( "fmt" + "time" "github.com/thomasobenaus/sokar/helper" ) @@ -69,27 +70,14 @@ func checkScalingPolicy(desiredCount uint, min uint, max uint) policyCheckResult return result } -// trueIfNil returns a scaleResult filled in with an appropriate error message in case the given scaler is nil -func trueIfNil(s *Scaler) (result scaleResult, ok bool) { - ok = false - result = scaleResult{state: scaleUnknown} - - if s == nil { - ok = true - result = scaleResult{ - state: scaleFailed, - stateDescription: "Scaler is nil", - newCount: 0, - } - } - return result, ok -} - // scale scales the scalingObject from currentCount to desiredCount. // Internally it is checked if a scaling is needed and if the scaling policy is valid. -func (s *Scaler) scale(desiredCount uint, currentCount uint, dryRun bool) scaleResult { - if r, ok := trueIfNil(s); ok { - return r +// If the force flag is true then even in dry-run mode the scaling will be applied. +func (s *Scaler) scale(desiredCount uint, currentCount uint, force bool) scaleResult { + + // memorize the time the scaling started + if isScalePermitted(s.dryRunMode, force) { + s.lastScaleAction = time.Now() } sObjName := s.scalingObject.Name @@ -103,6 +91,7 @@ func (s *Scaler) scale(desiredCount uint, currentCount uint, dryRun bool) scaleR return scaleResult{ state: scaleFailed, stateDescription: fmt.Sprintf("Error obtaining if scalingObject is dead: %s.", err.Error()), + newCount: currentCount, } } @@ -110,6 +99,7 @@ func (s *Scaler) scale(desiredCount uint, currentCount uint, dryRun bool) scaleR return scaleResult{ state: scaleIgnored, stateDescription: fmt.Sprintf("ScalingObject '%s' is dead. Can't scale", sObjName), + newCount: currentCount, } } @@ -136,10 +126,26 @@ func (s *Scaler) scale(desiredCount uint, currentCount uint, dryRun bool) scaleR } } + return s.executeScale(currentCount, newCount, force) +} + +// executeScale just executes the wanted scale, even if it is not needed in case currentCount == newCount. +// The only thing that is checked is if the scaler is in dry run mode or not. +// In dry-run mode the scaling is not applied by actually scaling the scalingObject, only a metric is +// updated, that reflects the fact that a scaling was skipped/ ignored. +// Only the force flag can be used to override this behavior. If the force flag is true then even in +// dry-run mode the scaling will be applied. +func (s *Scaler) executeScale(currentCount, newCount uint, force bool) scaleResult { + sObjName := s.scalingObject.Name + min := s.scalingObject.MinCount + max := s.scalingObject.MaxCount + + diff := helper.SubUint(newCount, currentCount) scaleTypeStr := amountToScaleType(diff) - if dryRun { - s.logger.Info().Str("scalingObject", sObjName).Msgf("Skip scale %s by %d to %d (DryRun).", scaleTypeStr, diff, newCount) + // the force flag can overrule the dry run mode + if !isScalePermitted(s.dryRunMode, force) { + s.logger.Info().Str("scalingObject", sObjName).Msgf("Skip scale %s by %d to %d (DryRun, force=%v).", scaleTypeStr, diff, newCount, force) s.metrics.plannedButSkippedScalingOpen.WithLabelValues(scaleTypeStr).Set(1) return scaleResult{ @@ -149,16 +155,15 @@ func (s *Scaler) scale(desiredCount uint, currentCount uint, dryRun bool) scaleR } } - s.logger.Info().Str("scalingObject", sObjName).Msgf("Scale %s by %d to %d.", scaleTypeStr, diff, newCount) + s.logger.Info().Str("scalingObject", sObjName).Msgf("Scale %s by %d to %d (force=%v).", scaleTypeStr, diff, newCount, force) s.metrics.plannedButSkippedScalingOpen.WithLabelValues(scaleTypeStr).Set(0) - // Set the new scalingObject count - s.desiredScale.setValue(newCount) - err = s.scalingTarget.AdjustScalingObjectCount(s.scalingObject.Name, s.scalingObject.MinCount, s.scalingObject.MaxCount, currentCount, newCount) + err := s.scalingTarget.AdjustScalingObjectCount(sObjName, min, max, currentCount, newCount) if err != nil { return scaleResult{ state: scaleFailed, stateDescription: fmt.Sprintf("Error adjusting scalingObject count to %d: %s.", newCount, err.Error()), + newCount: currentCount, } } @@ -168,3 +173,7 @@ func (s *Scaler) scale(desiredCount uint, currentCount uint, dryRun bool) scaleR newCount: newCount, } } + +func isScalePermitted(dryRun, force bool) bool { + return !dryRun || force +} diff --git a/scaler/scale_test.go b/scaler/scale_test.go index cd1a3593..1d0dac4a 100644 --- a/scaler/scale_test.go +++ b/scaler/scale_test.go @@ -3,6 +3,7 @@ package scaler import ( "fmt" "testing" + "time" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" @@ -11,6 +12,81 @@ import ( mock_scaler "github.com/thomasobenaus/sokar/test/scaler" ) +func Test_ExecuteScale_NoDryRun(t *testing.T) { + + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + sObjName := "any" + scaTgt := mock_scaler.NewMockScalingTarget(mockCtrl) + metrics, mocks := NewMockedMetrics(mockCtrl) + + plannedButSkippedGauge := mock_metrics.NewMockGauge(mockCtrl) + plannedButSkippedGauge.EXPECT().Set(float64(0)).Times(3) + mocks.plannedButSkippedScalingOpen.EXPECT().WithLabelValues("UP").Return(plannedButSkippedGauge).Times(3) + + sObj := ScalingObject{Name: sObjName, MinCount: 0, MaxCount: 2} + scaler, err := New(scaTgt, sObj, metrics, DryRunMode(false)) + require.NoError(t, err) + + // error + scaTgt.EXPECT().AdjustScalingObjectCount(sObjName, uint(0), uint(2), uint(1), uint(1)).Return(fmt.Errorf("err")) + result := scaler.executeScale(1, 1, false) + assert.Equal(t, scaleFailed, result.state) + assert.Equal(t, uint(1), result.newCount) + + // no scale, success + scaTgt.EXPECT().AdjustScalingObjectCount(sObjName, uint(0), uint(2), uint(1), uint(1)).Return(nil) + result = scaler.executeScale(1, 1, false) + assert.Equal(t, scaleDone, result.state) + assert.Equal(t, uint(1), result.newCount) + + // scale up, success + scaTgt.EXPECT().AdjustScalingObjectCount(sObjName, uint(0), uint(2), uint(1), uint(2)).Return(nil) + result = scaler.executeScale(1, 2, false) + assert.Equal(t, scaleDone, result.state) + assert.Equal(t, uint(2), result.newCount) + + // scale down, success + scaTgt.EXPECT().AdjustScalingObjectCount(sObjName, uint(0), uint(2), uint(2), uint(1)).Return(nil) + plannedButSkippedGauge.EXPECT().Set(float64(0)) + mocks.plannedButSkippedScalingOpen.EXPECT().WithLabelValues("DOWN").Return(plannedButSkippedGauge) + result = scaler.executeScale(2, 1, false) + assert.Equal(t, scaleDone, result.state) + assert.Equal(t, uint(1), result.newCount) +} + +func Test_ExecuteScale_DryRun(t *testing.T) { + + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + sObjName := "any" + scaTgt := mock_scaler.NewMockScalingTarget(mockCtrl) + metrics, mocks := NewMockedMetrics(mockCtrl) + + plannedButSkippedGauge := mock_metrics.NewMockGauge(mockCtrl) + plannedButSkippedGauge.EXPECT().Set(float64(1)) + mocks.plannedButSkippedScalingOpen.EXPECT().WithLabelValues("UP").Return(plannedButSkippedGauge) + + sObj := ScalingObject{Name: sObjName, MinCount: 0, MaxCount: 2} + scaler, err := New(scaTgt, sObj, metrics, DryRunMode(true)) + require.NoError(t, err) + + // no scale, dry run + result := scaler.executeScale(1, 2, false) + assert.Equal(t, scaleIgnored, result.state) + assert.Equal(t, uint(1), result.newCount) + + // scale up, dry run but force + scaTgt.EXPECT().AdjustScalingObjectCount(sObjName, uint(0), uint(2), uint(1), uint(2)).Return(nil) + plannedButSkippedGauge.EXPECT().Set(float64(0)) + mocks.plannedButSkippedScalingOpen.EXPECT().WithLabelValues("UP").Return(plannedButSkippedGauge) + result = scaler.executeScale(1, 2, true) + assert.Equal(t, scaleDone, result.state) + assert.Equal(t, uint(2), result.newCount) +} + func TestScale_ScalingObjectDead(t *testing.T) { mockCtrl := gomock.NewController(t) @@ -25,13 +101,15 @@ func TestScale_ScalingObjectDead(t *testing.T) { // dead scalingObject - error scaTgt.EXPECT().IsScalingObjectDead(sObjName).Return(false, fmt.Errorf("internal error")) - result := scaler.scale(2, 0, false) + result := scaler.scale(2, 1, false) assert.Equal(t, scaleFailed, result.state) + assert.Equal(t, uint(1), result.newCount) // dead scalingObject scaTgt.EXPECT().IsScalingObjectDead(sObjName).Return(true, nil) - result = scaler.scale(2, 0, false) + result = scaler.scale(2, 1, false) assert.Equal(t, scaleIgnored, result.state) + assert.Equal(t, uint(1), result.newCount) } func TestScale_Up(t *testing.T) { @@ -55,8 +133,8 @@ func TestScale_Up(t *testing.T) { scaTgt.EXPECT().IsScalingObjectDead(sObjName).Return(false, nil) scaTgt.EXPECT().AdjustScalingObjectCount(sObjName, uint(1), uint(5), uint(0), uint(2)).Return(nil) result := scaler.scale(2, 0, false) - assert.Equal(t, uint(2), scaler.desiredScale.value) - assert.NotEqual(t, scaleFailed, result.state) + assert.Equal(t, scaleDone, result.state) + assert.Equal(t, uint(2), result.newCount) // scale up - max hit polViolatedCounter := mock_metrics.NewMockCounter(mockCtrl) @@ -65,8 +143,8 @@ func TestScale_Up(t *testing.T) { scaTgt.EXPECT().IsScalingObjectDead(sObjName).Return(false, nil) scaTgt.EXPECT().AdjustScalingObjectCount(sObjName, uint(1), uint(5), uint(0), uint(5)).Return(nil) result = scaler.scale(6, 0, false) - assert.Equal(t, uint(5), scaler.desiredScale.value) - assert.NotEqual(t, scaleFailed, result.state) + assert.Equal(t, scaleDone, result.state) + assert.Equal(t, uint(5), result.newCount) } func TestScale_Down(t *testing.T) { @@ -90,8 +168,8 @@ func TestScale_Down(t *testing.T) { scaTgt.EXPECT().IsScalingObjectDead(sObjName).Return(false, nil) scaTgt.EXPECT().AdjustScalingObjectCount(sObjName, uint(1), uint(5), uint(4), uint(1)).Return(nil) result := scaler.scale(1, 4, false) - assert.Equal(t, uint(1), scaler.desiredScale.value) - assert.NotEqual(t, scaleFailed, result.state) + assert.Equal(t, scaleDone, result.state) + assert.Equal(t, uint(1), result.newCount) // scale up - min hit polViolatedCounter := mock_metrics.NewMockCounter(mockCtrl) @@ -100,8 +178,8 @@ func TestScale_Down(t *testing.T) { scaTgt.EXPECT().IsScalingObjectDead(sObjName).Return(false, nil) scaTgt.EXPECT().AdjustScalingObjectCount(sObjName, uint(1), uint(5), uint(2), uint(1)).Return(nil) result = scaler.scale(0, 2, false) - assert.Equal(t, uint(1), scaler.desiredScale.value) - assert.NotEqual(t, scaleFailed, result.state) + assert.Equal(t, scaleDone, result.state) + assert.Equal(t, uint(1), result.newCount) } func TestScale_NoScale(t *testing.T) { @@ -117,11 +195,10 @@ func TestScale_NoScale(t *testing.T) { scaler, err := New(scaTgt, sObj, metrics) require.NoError(t, err) - // scale down scaTgt.EXPECT().IsScalingObjectDead(sObjName).Return(false, nil) result := scaler.scale(2, 2, false) - assert.False(t, scaler.desiredScale.isKnown) - assert.NotEqual(t, scaleFailed, result.state) + assert.Equal(t, scaleIgnored, result.state) + assert.Equal(t, uint(2), result.newCount) } func TestScaleBy_CheckScalingPolicy(t *testing.T) { @@ -163,16 +240,33 @@ func TestScaleBy_CheckScalingPolicy(t *testing.T) { assert.True(t, chk.maxPolicyViolated) } -func TestScaleBy_trueIfNil(t *testing.T) { - _, ok := trueIfNil(nil) - assert.True(t, ok) +func TestScale_UpDryRun(t *testing.T) { + + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + metrics, mocks := NewMockedMetrics(mockCtrl) + + scaTgt := mock_scaler.NewMockScalingTarget(mockCtrl) + + sObjName := "any" + sObj := ScalingObject{Name: sObjName, MinCount: 1, MaxCount: 5} + scaler, err := New(scaTgt, sObj, metrics, DryRunMode(true)) + require.NoError(t, err) + + plannedButSkippedGauge := mock_metrics.NewMockGauge(mockCtrl) + plannedButSkippedGauge.EXPECT().Set(float64(1)) + mocks.plannedButSkippedScalingOpen.EXPECT().WithLabelValues("UP").Return(plannedButSkippedGauge) - scaler := &Scaler{} - _, ok = trueIfNil(scaler) - assert.False(t, ok) + // scale up + scaTgt.EXPECT().IsScalingObjectDead(sObjName).Return(false, nil) + result := scaler.scale(2, 1, false) + assert.Equal(t, scaleIgnored, result.state) + assert.Equal(t, uint(1), result.newCount) + oneDayAgo := time.Now().Add(time.Hour * -24) + assert.WithinDuration(t, oneDayAgo, scaler.lastScaleAction, time.Second*1) } -func TestScale_UpDryRun(t *testing.T) { +func TestScale_DryRunForce(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() @@ -182,17 +276,21 @@ func TestScale_UpDryRun(t *testing.T) { sObjName := "any" sObj := ScalingObject{Name: sObjName, MinCount: 1, MaxCount: 5} - scaler, err := New(scaTgt, sObj, metrics) + scaler, err := New(scaTgt, sObj, metrics, DryRunMode(true)) require.NoError(t, err) plannedButSkippedGauge := mock_metrics.NewMockGauge(mockCtrl) - plannedButSkippedGauge.EXPECT().Set(float64(1)) + plannedButSkippedGauge.EXPECT().Set(float64(0)) mocks.plannedButSkippedScalingOpen.EXPECT().WithLabelValues("UP").Return(plannedButSkippedGauge) + scaTgt.EXPECT().AdjustScalingObjectCount(sObjName, uint(1), uint(5), uint(1), uint(2)).Return(nil) // scale up scaTgt.EXPECT().IsScalingObjectDead(sObjName).Return(false, nil) - result := scaler.scale(2, 0, true) - assert.NotEqual(t, scaleFailed, result.state) + result := scaler.scale(2, 1, true) + assert.Equal(t, scaleDone, result.state) + assert.Equal(t, uint(2), result.newCount) + fiveMsAgo := time.Now().Add(time.Millisecond * -5) + assert.WithinDuration(t, fiveMsAgo, scaler.lastScaleAction, time.Second*1) } func TestScale_DownDryRun(t *testing.T) { @@ -205,7 +303,7 @@ func TestScale_DownDryRun(t *testing.T) { sObjName := "any" sObj := ScalingObject{Name: sObjName, MinCount: 1, MaxCount: 5} - scaler, err := New(scaTgt, sObj, metrics) + scaler, err := New(scaTgt, sObj, metrics, DryRunMode(true)) require.NoError(t, err) plannedButSkippedGauge := mock_metrics.NewMockGauge(mockCtrl) @@ -214,6 +312,16 @@ func TestScale_DownDryRun(t *testing.T) { // scale down scaTgt.EXPECT().IsScalingObjectDead(sObjName).Return(false, nil) - result := scaler.scale(1, 4, true) - assert.NotEqual(t, scaleFailed, result.state) + result := scaler.scale(1, 4, false) + assert.Equal(t, scaleIgnored, result.state) + assert.Equal(t, uint(4), result.newCount) + oneDayAgo := time.Now().Add(time.Hour * -24) + assert.WithinDuration(t, oneDayAgo, scaler.lastScaleAction, time.Second*1) +} + +func Test_IsScalePermitted(t *testing.T) { + assert.True(t, isScalePermitted(true, true)) + assert.True(t, isScalePermitted(false, true)) + assert.True(t, isScalePermitted(false, false)) + assert.False(t, isScalePermitted(true, false)) } diff --git a/scaler/scaler.go b/scaler/scaler.go index 82a007a5..c0df60d9 100644 --- a/scaler/scaler.go +++ b/scaler/scaler.go @@ -8,6 +8,8 @@ import ( "github.com/rs/zerolog" ) +var oneDayAgo = time.Now().Add(time.Hour * -24) + // Scaler is a component responsible for scaling a scalingObject type Scaler struct { logger zerolog.Logger @@ -19,6 +21,15 @@ type Scaler struct { // ScalingObject represents the ScalingObject and relevant meta data scalingObject ScalingObject + // dryRunMode active/ not active. In dry run mode no automatic scaling will + // executed. For more information see ../doc/DryRunMode.md + dryRunMode bool + + // LastScaleAction represents that point in time + // when the scaler was triggered to execute a scaling + // action the last time + lastScaleAction time.Time + // watcherInterval the interval the Scaler will check if // the scalingObject count still matches the desired state. watcherInterval time.Duration @@ -48,16 +59,6 @@ type Scaler struct { scalingObjectWatcherPaused bool } -// Config is the configuration for the Scaler -type Config struct { - Name string - MinCount uint - MaxCount uint - Logger zerolog.Logger - MaxOpenScalingTickets uint - WatcherInterval time.Duration -} - // Option represents an option for the Scaler type Option func(c *Scaler) @@ -83,6 +84,15 @@ func WatcherInterval(interval time.Duration) Option { } } +// DryRunMode can be used to activate/ deactivate the dry run mode. +// In dry run mode no automatic scaling will executed. +// For more information see ../doc/DryRunMode.md +func DryRunMode(enable bool) Option { + return func(s *Scaler) { + s.dryRunMode = enable + } +} + // New creates a new instance of a scaler using the given // ScalingTarget to send scaling events to. func New(scalingTarget ScalingTarget, scalingObject ScalingObject, metrics Metrics, options ...Option) (*Scaler, error) { @@ -97,6 +107,8 @@ func New(scalingTarget ScalingTarget, scalingObject ScalingObject, metrics Metri maxOpenScalingTickets: maxOpenScalingTickets, metrics: metrics, desiredScale: optionalValue{isKnown: false}, + dryRunMode: false, + lastScaleAction: oneDayAgo, } // apply the options @@ -126,9 +138,9 @@ func (s *Scaler) GetCount() (uint, error) { } // ScaleTo will scale the scalingObject to the desired count. -func (s *Scaler) ScaleTo(desiredCount uint, dryRun bool) error { - s.logger.Info().Msgf("Scale to %d requested (dryRun=%t).", desiredCount, dryRun) - return s.openScalingTicket(desiredCount, dryRun) +func (s *Scaler) ScaleTo(desiredCount uint, force bool) error { + s.logger.Info().Msgf("Scale to %d requested (force=%t).", desiredCount, force) + return s.openScalingTicket(desiredCount, force) } // GetName returns the name of this component @@ -140,8 +152,14 @@ func (s *Scaler) GetName() string { func (s *Scaler) Run() { // handler that processes incoming scaling tickets go s.scaleTicketProcessor(s.scaleTicketChan) - // handler that checks periodically if the desired count is still valid - go s.scalingObjectWatcher(s.watcherInterval) + + if s.dryRunMode { + s.logger.Info().Msg("Don't start the ScalingObjectWatcher in dry-run mode.") + } else { + // handler that checks periodically if the desired count is still valid + go s.scalingObjectWatcher(s.watcherInterval) + s.logger.Info().Msg("ScalingObjectWatcher started.") + } } // Stop tears down scaler @@ -158,3 +176,9 @@ func (s *Scaler) Stop() error { func (s *Scaler) Join() { s.wg.Wait() } + +// GetTimeOfLastScaleAction returns that point in time where the most recent +// scaling STARTED. +func (s *Scaler) GetTimeOfLastScaleAction() time.Time { + return s.lastScaleAction +} diff --git a/scaler/scalerImpl.go b/scaler/scalerImpl.go index 524cdbf5..c3ca89e5 100644 --- a/scaler/scalerImpl.go +++ b/scaler/scalerImpl.go @@ -46,20 +46,17 @@ func (s *Scaler) scaleTicketProcessor(ticketChan <-chan ScalingTicket) { s.logger.Info().Msg("ScaleTicketProcessor closed.") } -// applyScaleTicket applies the given ScalingTicket by issuing and tracking the scaling action. -func (s *Scaler) applyScaleTicket(ticket ScalingTicket) { - ticket.start() - result := s.scaleTo(ticket.desiredCount, ticket.dryRun) - ticket.complete(result.state) - s.numOpenScalingTickets-- - - s.metrics.scalingTicketCount.WithLabelValues("applied").Inc() +func updateDesiredScale(sResult scaleResult, desiredScale *optionalValue) error { + if desiredScale == nil { + return fmt.Errorf("desiredScale parameter is nil") + } - dur, _ := ticket.processingDuration() - s.metrics.scalingDurationSeconds.Observe(float64(dur.Seconds())) - updateScaleResultMetric(result, s.metrics.scaleResultCounter) + if sResult.state != scaleDone { + return nil + } - s.logger.Info().Msgf("Ticket applied. Scaling was %s (%s). New count is %d. Scaling in %f .", result.state, result.stateDescription, result.newCount, dur.Seconds()) + desiredScale.setValue(sResult.newCount) + return nil } func updateScaleResultMetric(result scaleResult, scaleResultCounter m.CounterVec) { @@ -81,7 +78,7 @@ func updateScaleResultMetric(result scaleResult, scaleResultCounter m.CounterVec } // openScalingTicket opens based on the desired count a ScalingTicket -func (s *Scaler) openScalingTicket(desiredCount uint, dryRun bool) error { +func (s *Scaler) openScalingTicket(desiredCount uint, force bool) error { if s.numOpenScalingTickets > s.maxOpenScalingTickets { s.metrics.scalingTicketCount.WithLabelValues("rejected").Inc() @@ -93,11 +90,31 @@ func (s *Scaler) openScalingTicket(desiredCount uint, dryRun bool) error { s.metrics.scalingTicketCount.WithLabelValues("added").Inc() // TODO: Add metric "open scaling tickets" s.numOpenScalingTickets++ - s.scaleTicketChan <- NewScalingTicket(desiredCount, dryRun) + s.scaleTicketChan <- NewScalingTicket(desiredCount, force) return nil } -func (s *Scaler) scaleTo(desiredCount uint, dryRun bool) scaleResult { +// applyScaleTicket applies the given ScalingTicket by issuing and tracking the scaling action. +func (s *Scaler) applyScaleTicket(ticket ScalingTicket) { + ticket.start() + result := s.scaleTo(ticket.desiredCount, ticket.force) + if err := updateDesiredScale(result, &s.desiredScale); err != nil { + s.logger.Error().Err(err).Msg("Failed updating desired scale.") + } + + ticket.complete(result.state) + s.numOpenScalingTickets-- + + s.metrics.scalingTicketCount.WithLabelValues("applied").Inc() + + dur, _ := ticket.processingDuration() + s.metrics.scalingDurationSeconds.Observe(float64(dur.Seconds())) + updateScaleResultMetric(result, s.metrics.scaleResultCounter) + + s.logger.Info().Msgf("Ticket applied. Scaling was %s (%s). New count is %d. Scaling in %f .", result.state, result.stateDescription, result.newCount, dur.Seconds()) +} + +func (s *Scaler) scaleTo(desiredCount uint, force bool) scaleResult { scalingObjectName := s.scalingObject.Name currentCount, err := s.scalingTarget.GetScalingObjectCount(scalingObjectName) if err != nil { @@ -107,5 +124,5 @@ func (s *Scaler) scaleTo(desiredCount uint, dryRun bool) scaleResult { } } - return s.scale(desiredCount, currentCount, dryRun) + return s.scale(desiredCount, currentCount, force) } diff --git a/scaler/scaler_test.go b/scaler/scaler_test.go index 58b3111e..3a385552 100644 --- a/scaler/scaler_test.go +++ b/scaler/scaler_test.go @@ -30,6 +30,9 @@ func Test_New(t *testing.T) { assert.NotNil(t, scaler.stopChan) assert.NotNil(t, scaler.scaleTicketChan) assert.NotNil(t, scaler.scalingTarget) + + oneDayAgo := time.Now().Add(time.Hour * -24) + assert.WithinDuration(t, oneDayAgo, scaler.lastScaleAction, time.Second*1) } func Test_GetCount(t *testing.T) { @@ -121,7 +124,7 @@ func Test_OpenScalingTicket(t *testing.T) { scalingTicketCounter := mock_metrics.NewMockCounter(mockCtrl) scalingTicketCounter.EXPECT().Inc().Times(2) mocks.scalingTicketCount.EXPECT().WithLabelValues("added").Return(scalingTicketCounter) - err = scaler.openScalingTicket(0, false) + err = scaler.openScalingTicket(1, false) assert.NoError(t, err) assert.Equal(t, uint(1), scaler.numOpenScalingTickets) assert.Len(t, scaler.scaleTicketChan, 1) @@ -131,10 +134,11 @@ func Test_OpenScalingTicket(t *testing.T) { assert.Error(t, err) ticket := <-scaler.scaleTicketChan - assert.Equal(t, uint(0), ticket.desiredCount) + assert.Equal(t, uint(1), ticket.desiredCount) + assert.False(t, ticket.force) } -func Test_ApplyScalingTicket(t *testing.T) { +func Test_ApplyScalingTicket_NoScale_DeadJob(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() metrics, mocks := NewMockedMetrics(mockCtrl) @@ -158,8 +162,104 @@ func Test_ApplyScalingTicket(t *testing.T) { mocks.scaleResultCounter.EXPECT().WithLabelValues("ignored").Return(ignoredCounter) mocks.scalingDurationSeconds.EXPECT().Observe(gomock.Any()) - ticket := NewScalingTicket(0, false) + ticket := NewScalingTicket(10, false) + scaler.applyScaleTicket(ticket) + assert.False(t, scaler.desiredScale.isKnown) + assert.Equal(t, uint(0), scaler.desiredScale.value) +} + +func Test_ApplyScalingTicket_NoScale_DryRun(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + metrics, mocks := NewMockedMetrics(mockCtrl) + + scaTgt := mock_scaler.NewMockScalingTarget(mockCtrl) + plannedButSkippedGauge := mock_metrics.NewMockGauge(mockCtrl) + plannedButSkippedGauge.EXPECT().Set(float64(1)) + scalingTicketCounter := mock_metrics.NewMockCounter(mockCtrl) + scalingTicketCounter.EXPECT().Inc() + doneCounter := mock_metrics.NewMockCounter(mockCtrl) + doneCounter.EXPECT().Inc() + sObjName := "any" + + gomock.InOrder( + scaTgt.EXPECT().GetScalingObjectCount(sObjName).Return(uint(1), nil), + scaTgt.EXPECT().IsScalingObjectDead(sObjName).Return(false, nil), + mocks.plannedButSkippedScalingOpen.EXPECT().WithLabelValues("UP").Return(plannedButSkippedGauge), + mocks.scalingTicketCount.EXPECT().WithLabelValues("applied").Return(scalingTicketCounter), + mocks.scalingDurationSeconds.EXPECT().Observe(gomock.Any()), + mocks.scaleResultCounter.EXPECT().WithLabelValues("ignored").Return(doneCounter), + ) + sObj := ScalingObject{Name: sObjName, MinCount: 1, MaxCount: 10} + scaler, err := New(scaTgt, sObj, metrics, DryRunMode(true)) + require.NoError(t, err) + require.NotNil(t, scaler) + + ticket := NewScalingTicket(5, false) + scaler.applyScaleTicket(ticket) + assert.False(t, scaler.desiredScale.isKnown) + assert.Equal(t, uint(0), scaler.desiredScale.value) +} + +func Test_ApplyScalingTicket_NoScaleObjectWatcherInDryRunMode(t *testing.T) { + // This test was added to ensure that the ScaleObjectWatcher does not + // run in dry-run mode. Why, see: https://github.com/ThomasObenaus/sokar/issues/98. + + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + metrics, _ := NewMockedMetrics(mockCtrl) + + scaTgt := mock_scaler.NewMockScalingTarget(mockCtrl) + sObj := ScalingObject{Name: "any", MinCount: 1, MaxCount: 10} + scaler, err := New(scaTgt, sObj, metrics, DryRunMode(true), WatcherInterval(time.Millisecond*100)) + require.NoError(t, err) + require.NotNil(t, scaler) + + scaler.Run() + defer func() { + scaler.Stop() + scaler.Join() + }() + + // give the (potential) watcher some time + time.Sleep(time.Millisecond * 200) + + // hint: This test would fail in case a running ScaleObjectWatcher would + // call a method of the mocked ScalingTarget (e.g. GetCount or Scale) +} + +func Test_ApplyScalingTicket_Scale(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + metrics, mocks := NewMockedMetrics(mockCtrl) + + scaTgt := mock_scaler.NewMockScalingTarget(mockCtrl) + plannedButSkippedGauge := mock_metrics.NewMockGauge(mockCtrl) + plannedButSkippedGauge.EXPECT().Set(float64(0)) + scalingTicketCounter := mock_metrics.NewMockCounter(mockCtrl) + scalingTicketCounter.EXPECT().Inc() + doneCounter := mock_metrics.NewMockCounter(mockCtrl) + doneCounter.EXPECT().Inc() + sObjName := "any" + + gomock.InOrder( + scaTgt.EXPECT().GetScalingObjectCount(sObjName).Return(uint(0), nil), + scaTgt.EXPECT().IsScalingObjectDead(sObjName).Return(false, nil), + mocks.plannedButSkippedScalingOpen.EXPECT().WithLabelValues("UP").Return(plannedButSkippedGauge), + scaTgt.EXPECT().AdjustScalingObjectCount(sObjName, uint(1), uint(10), uint(0), uint(5)).Return(nil), + mocks.scalingTicketCount.EXPECT().WithLabelValues("applied").Return(scalingTicketCounter), + mocks.scalingDurationSeconds.EXPECT().Observe(gomock.Any()), + mocks.scaleResultCounter.EXPECT().WithLabelValues("done").Return(doneCounter), + ) + sObj := ScalingObject{Name: sObjName, MinCount: 1, MaxCount: 10} + scaler, err := New(scaTgt, sObj, metrics) + require.NoError(t, err) + require.NotNil(t, scaler) + + ticket := NewScalingTicket(5, false) scaler.applyScaleTicket(ticket) + assert.True(t, scaler.desiredScale.isKnown) + assert.Equal(t, uint(5), scaler.desiredScale.value) } func Test_OpenAndApplyScalingTicket(t *testing.T) { @@ -211,3 +311,27 @@ func Test_OpenAndApplyScalingTicket(t *testing.T) { err = scaler.openScalingTicket(0, false) assert.NoError(t, err) } + +func Test_UpdateDesiredScale(t *testing.T) { + err := updateDesiredScale(scaleResult{}, nil) + assert.Error(t, err) + + desiredScale := optionalValue{} + err = updateDesiredScale(scaleResult{}, &desiredScale) + assert.NoError(t, err) + assert.False(t, desiredScale.isKnown) + assert.Equal(t, uint(0), desiredScale.value) + + desiredScale = optionalValue{} + desiredScale.setValue(10) + err = updateDesiredScale(scaleResult{}, &desiredScale) + assert.NoError(t, err) + assert.True(t, desiredScale.isKnown) + assert.Equal(t, uint(10), desiredScale.value) + + desiredScale = optionalValue{} + err = updateDesiredScale(scaleResult{state: scaleDone, newCount: 10}, &desiredScale) + assert.NoError(t, err) + assert.True(t, desiredScale.isKnown) + assert.Equal(t, uint(10), desiredScale.value) +} diff --git a/scaler/scalingTicket.go b/scaler/scalingTicket.go index 35feb500..7eea4061 100644 --- a/scaler/scalingTicket.go +++ b/scaler/scalingTicket.go @@ -18,23 +18,25 @@ type ScalingTicket struct { // (failed or successful) completedAt *time.Time - // If this flag is true, then no scaling is executed in the end. - // The scaler just checks against the scaling policy if a scaling would be needed. - dryRun bool + // In case the scaler is in dry-run mode usually the scaling is not applied by actually scaling the scalingObject. + // Only a metric is updated, that reflects the fact that a scaling was skipped/ ignored. + // With this force flag this behavior overridden. If the force flag is true then even in + // dry-run mode the scaling will be applied. + force bool desiredCount uint state scaleState } // NewScalingTicket creates and opens/ issues a new ScalingTicket -func NewScalingTicket(desiredCount uint, dryRun bool) ScalingTicket { +func NewScalingTicket(desiredCount uint, force bool) ScalingTicket { return ScalingTicket{ issuedAt: time.Now(), startedAt: nil, completedAt: nil, desiredCount: desiredCount, state: scaleNotStarted, - dryRun: dryRun, + force: force, } } diff --git a/scaler/scalingTicket_test.go b/scaler/scalingTicket_test.go index ea739399..cdd3438b 100644 --- a/scaler/scalingTicket_test.go +++ b/scaler/scalingTicket_test.go @@ -15,9 +15,9 @@ func TestNewScalingTicket(t *testing.T) { assert.Equal(t, uint(0), sj.desiredCount) assert.Nil(t, sj.startedAt) assert.Nil(t, sj.completedAt) - assert.False(t, sj.dryRun) + assert.False(t, sj.force) sj = NewScalingTicket(0, true) - assert.True(t, sj.dryRun) + assert.True(t, sj.force) } func Test_Start(t *testing.T) { diff --git a/sokar/iface/capacity_planner_IF.go b/sokar/iface/capacity_planner_IF.go index 4219eb8a..903ef3b9 100644 --- a/sokar/iface/capacity_planner_IF.go +++ b/sokar/iface/capacity_planner_IF.go @@ -11,5 +11,5 @@ type CapacityPlanner interface { // IsCoolingDown returns true if the CapacityPlanner thinks that // its currently not a good idea to apply the wanted scaling event. - IsCoolingDown(timeOfLastScale time.Time, scaleDown bool) bool + IsCoolingDown(timeOfLastScale time.Time, scaleDown bool) (cooldownActive bool, cooldownTimeLeft time.Duration) } diff --git a/sokar/iface/scaler_IF.go b/sokar/iface/scaler_IF.go index 764de553..77c5cefc 100644 --- a/sokar/iface/scaler_IF.go +++ b/sokar/iface/scaler_IF.go @@ -1,7 +1,10 @@ package sokar +import "time" + // Scaler is a component that is able to scale a scaling-object type Scaler interface { - ScaleTo(count uint, dryRun bool) error + ScaleTo(count uint, force bool) error GetCount() (uint, error) + GetTimeOfLastScaleAction() time.Time } diff --git a/sokar/scaleBy.go b/sokar/scaleBy.go index 81a8e51a..716b791a 100644 --- a/sokar/scaleBy.go +++ b/sokar/scaleBy.go @@ -30,7 +30,8 @@ func (sk *Sokar) ScaleByPercentage(w http.ResponseWriter, r *http.Request, ps ht } percentageFract := float32(percentage) / 100.00 - err = sk.triggerScale(false, percentageFract, planScaleByPercentage) + // this is used in manual (override) mode --> force has to be true + err = sk.triggerScale(true, percentageFract, planScaleByPercentage) if err != nil { sk.logger.Error().Err(err).Msg("Unable to trigger scale") http.Error(w, err.Error(), http.StatusInternalServerError) @@ -60,7 +61,8 @@ func (sk *Sokar) ScaleByValue(w http.ResponseWriter, r *http.Request, ps httprou return } - err = sk.triggerScale(false, float32(value), planScaleByValue) + // this is used in manual (override) mode --> force has to be true + err = sk.triggerScale(true, float32(value), planScaleByValue) if err != nil { sk.logger.Error().Err(err).Msg("Unable to trigger scale") http.Error(w, err.Error(), http.StatusInternalServerError) diff --git a/sokar/scaleBy_test.go b/sokar/scaleBy_test.go index 1b51ce96..305b682b 100644 --- a/sokar/scaleBy_test.go +++ b/sokar/scaleBy_test.go @@ -5,6 +5,7 @@ import ( "net/http" "net/http/httptest" "testing" + "time" "github.com/golang/mock/gomock" "github.com/julienschmidt/httprouter" @@ -94,8 +95,9 @@ func Test_ScaleByPercentage_HTTPHandler_OK(t *testing.T) { params := []httprouter.Param{httprouter.Param{Key: PathPartValue, Value: "10"}} gomock.InOrder( scalerIF.EXPECT().GetCount().Return(currentScale, nil), - capaPlannerIF.EXPECT().IsCoolingDown(gomock.Any(), false).Return(false), - scalerIF.EXPECT().ScaleTo(scaleTo, false).Return(nil), + scalerIF.EXPECT().GetTimeOfLastScaleAction().Return(time.Now()), + capaPlannerIF.EXPECT().IsCoolingDown(gomock.Any(), false).Return(false, time.Second*0), + scalerIF.EXPECT().ScaleTo(scaleTo, true).Return(nil), ) metricMocks.preScaleJobCount.EXPECT().Set(float64(currentScale)) metricMocks.plannedJobCount.EXPECT().Set(float64(scaleTo)) @@ -129,8 +131,9 @@ func Test_ScaleByPercentage_HTTPHandler_IntError(t *testing.T) { params := []httprouter.Param{httprouter.Param{Key: PathPartValue, Value: "10"}} gomock.InOrder( scalerIF.EXPECT().GetCount().Return(currentScale, nil), - capaPlannerIF.EXPECT().IsCoolingDown(gomock.Any(), false).Return(false), - scalerIF.EXPECT().ScaleTo(scaleTo, false).Return(fmt.Errorf("Failed to scale")), + scalerIF.EXPECT().GetTimeOfLastScaleAction().Return(time.Now()), + capaPlannerIF.EXPECT().IsCoolingDown(gomock.Any(), false).Return(false, time.Second*0), + scalerIF.EXPECT().ScaleTo(scaleTo, true).Return(fmt.Errorf("Failed to scale")), metricMocks.failedScalingTotal.EXPECT().Inc(), ) metricMocks.preScaleJobCount.EXPECT().Set(float64(currentScale)) @@ -196,8 +199,9 @@ func Test_ScaleByValue_HTTPHandler_OK(t *testing.T) { params := []httprouter.Param{httprouter.Param{Key: PathPartValue, Value: "10"}} gomock.InOrder( scalerIF.EXPECT().GetCount().Return(currentScale, nil), - capaPlannerIF.EXPECT().IsCoolingDown(gomock.Any(), false).Return(false), - scalerIF.EXPECT().ScaleTo(scaleTo, false).Return(nil), + scalerIF.EXPECT().GetTimeOfLastScaleAction().Return(time.Now()), + capaPlannerIF.EXPECT().IsCoolingDown(gomock.Any(), false).Return(false, time.Second*0), + scalerIF.EXPECT().ScaleTo(scaleTo, true).Return(nil), ) metricMocks.preScaleJobCount.EXPECT().Set(float64(currentScale)) metricMocks.plannedJobCount.EXPECT().Set(float64(scaleTo)) @@ -231,8 +235,9 @@ func Test_ScaleByValue_HTTPHandler_IntError(t *testing.T) { params := []httprouter.Param{httprouter.Param{Key: PathPartValue, Value: "10"}} gomock.InOrder( scalerIF.EXPECT().GetCount().Return(currentScale, nil), - capaPlannerIF.EXPECT().IsCoolingDown(gomock.Any(), false).Return(false), - scalerIF.EXPECT().ScaleTo(scaleTo, false).Return(fmt.Errorf("Failed to scale")), + scalerIF.EXPECT().GetTimeOfLastScaleAction().Return(time.Now()), + capaPlannerIF.EXPECT().IsCoolingDown(gomock.Any(), false).Return(false, time.Second*0), + scalerIF.EXPECT().ScaleTo(scaleTo, true).Return(fmt.Errorf("Failed to scale")), metricMocks.failedScalingTotal.EXPECT().Inc(), ) metricMocks.preScaleJobCount.EXPECT().Set(float64(currentScale)) diff --git a/sokar/sokar.go b/sokar/sokar.go index cdc48279..94fe96bb 100644 --- a/sokar/sokar.go +++ b/sokar/sokar.go @@ -3,14 +3,11 @@ package sokar import ( "fmt" "sync" - "time" "github.com/rs/zerolog" sokarIF "github.com/thomasobenaus/sokar/sokar/iface" ) -var oneDayAgo = time.Now().Add(time.Hour * -24) - // Sokar component that can be used to scale scaling-objects (jobs /instances). type Sokar struct { logger zerolog.Logger @@ -25,11 +22,6 @@ type Sokar struct { // the needed commands to the scaling target (i.e. nomad) scaler sokarIF.Scaler - // LastScaleAction represents that point in time - // when the scaler was triggered to execute a scaling - // action the last time - lastScaleAction time.Time - // metrics is a collection of metrics used by the sokar metrics Metrics @@ -71,7 +63,6 @@ func (cfg *Config) New(scaleEventEmitter sokarIF.ScaleEventEmitter, capacityPlan stopChan: make(chan struct{}, 1), metrics: metrics, logger: cfg.Logger, - lastScaleAction: oneDayAgo, dryRunMode: cfg.DryRunMode, }, nil } diff --git a/sokar/sokarImpl.go b/sokar/sokarImpl.go index 7d9adf86..14c43536 100644 --- a/sokar/sokarImpl.go +++ b/sokar/sokarImpl.go @@ -2,7 +2,6 @@ package sokar import ( "fmt" - "time" sokarIF "github.com/thomasobenaus/sokar/sokar/iface" ) @@ -39,13 +38,14 @@ func (sk *Sokar) handleScaleEvent(scaleEvent sokarIF.ScaleEvent) { sk.metrics.scaleEventsTotal.Inc() sk.metrics.scaleFactor.Set(float64(scaleFactor)) - err := sk.triggerScale(sk.dryRunMode, scaleFactor, sk.capacityPlanner.Plan) + // this method is used for automatic mode only --> force has to be false + err := sk.triggerScale(false, scaleFactor, sk.capacityPlanner.Plan) if err != nil { sk.logger.Error().Err(err).Msg("Failed to scale.") } } -func (sk *Sokar) triggerScale(dryRunOnly bool, scaleValue float32, planFun func(scaleValue float32, currentScale uint) uint) error { +func (sk *Sokar) triggerScale(force bool, scaleValue float32, planFun func(scaleValue float32, currentScale uint) uint) error { scaleDown := scaleValueToScaleDir(scaleValue) @@ -57,9 +57,9 @@ func (sk *Sokar) triggerScale(dryRunOnly bool, scaleValue float32, planFun func( sk.metrics.preScaleJobCount.Set(float64(preScaleJobCount)) // Don't scale if sokar is in cool down mode - if sk.capacityPlanner.IsCoolingDown(sk.lastScaleAction, scaleDown) { + if cooldown, timeleft := sk.capacityPlanner.IsCoolingDown(sk.scaler.GetTimeOfLastScaleAction(), scaleDown); cooldown { sk.metrics.skippedScalingDuringCooldownTotal.Inc() - sk.logger.Info().Msg("Skip scale event. Sokar is cooling down.") + sk.logger.Info().Msgf("Skip scale event. Sokar is cooling down (time left=%s).", timeleft.String()) return nil } @@ -67,10 +67,7 @@ func (sk *Sokar) triggerScale(dryRunOnly bool, scaleValue float32, planFun func( plannedJobCount := planFun(scaleValue, preScaleJobCount) sk.metrics.plannedJobCount.Set(float64(plannedJobCount)) - if !dryRunOnly { - sk.lastScaleAction = time.Now() - } - err = sk.scaler.ScaleTo(plannedJobCount, dryRunOnly) + err = sk.scaler.ScaleTo(plannedJobCount, force) // HACK: For now we ignore all rejected scaling tickets if err != nil { diff --git a/sokar/sokar_test.go b/sokar/sokar_test.go index 8a4eb9e2..31b87312 100644 --- a/sokar/sokar_test.go +++ b/sokar/sokar_test.go @@ -33,9 +33,6 @@ func Test_New(t *testing.T) { assert.NotNil(t, sokar.scaler) assert.NotNil(t, sokar.stopChan) assert.NotNil(t, sokar.metrics) - - oneDayAgo := time.Now().Add(time.Hour * -24) - assert.WithinDuration(t, oneDayAgo, sokar.lastScaleAction, time.Second*1) } func Test_HandleScaleEvent(t *testing.T) { @@ -59,7 +56,8 @@ func Test_HandleScaleEvent(t *testing.T) { event := sokarIF.ScaleEvent{ScaleFactor: scaleFactor} gomock.InOrder( scalerIF.EXPECT().GetCount().Return(currentScale, nil), - capaPlannerIF.EXPECT().IsCoolingDown(gomock.Any(), false).Return(false), + scalerIF.EXPECT().GetTimeOfLastScaleAction().Return(time.Now()), + capaPlannerIF.EXPECT().IsCoolingDown(gomock.Any(), false).Return(false, time.Second*0), capaPlannerIF.EXPECT().Plan(scaleFactor, uint(0)).Return(scaleTo), scalerIF.EXPECT().ScaleTo(scaleTo, false), ) @@ -117,7 +115,8 @@ func Test_TriggerScale_Scale(t *testing.T) { gomock.InOrder( scalerIF.EXPECT().GetCount().Return(currentScale, nil), metricMocks.preScaleJobCount.EXPECT().Set(float64(currentScale)), - capaPlannerIF.EXPECT().IsCoolingDown(gomock.Any(), false).Return(false), + scalerIF.EXPECT().GetTimeOfLastScaleAction().Return(time.Now()), + capaPlannerIF.EXPECT().IsCoolingDown(gomock.Any(), false).Return(false, time.Second*0), metricMocks.plannedJobCount.EXPECT().Set(float64(scaleTo)), scalerIF.EXPECT().ScaleTo(scaleTo, false).Return(nil), ) @@ -150,7 +149,8 @@ func Test_TriggerScale_Cooldown(t *testing.T) { gomock.InOrder( scalerIF.EXPECT().GetCount().Return(currentScale, nil), metricMocks.preScaleJobCount.EXPECT().Set(float64(currentScale)), - capaPlannerIF.EXPECT().IsCoolingDown(gomock.Any(), false).Return(true), + scalerIF.EXPECT().GetTimeOfLastScaleAction().Return(time.Now()), + capaPlannerIF.EXPECT().IsCoolingDown(gomock.Any(), false).Return(true, time.Second*0), metricMocks.skippedScalingDuringCooldownTotal.EXPECT().Inc(), ) @@ -161,6 +161,40 @@ func Test_TriggerScale_Cooldown(t *testing.T) { sokar.triggerScale(false, scaleFactor, planFunc) } +func Test_TriggerScale_NoCooldown(t *testing.T) { + + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + evEmitterIF := mock_sokar.NewMockScaleEventEmitter(mockCtrl) + scalerIF := mock_sokar.NewMockScaler(mockCtrl) + capaPlannerIF := mock_sokar.NewMockCapacityPlanner(mockCtrl) + metrics, metricMocks := NewMockedMetrics(mockCtrl) + + cfg := Config{} + sokar, err := cfg.New(evEmitterIF, capaPlannerIF, scalerIF, metrics) + require.NotNil(t, sokar) + require.NoError(t, err) + + currentScale := uint(0) + scaleFactor := float32(1) + scaleTo := uint(1) + gomock.InOrder( + scalerIF.EXPECT().GetCount().Return(currentScale, nil), + metricMocks.preScaleJobCount.EXPECT().Set(float64(currentScale)), + scalerIF.EXPECT().GetTimeOfLastScaleAction().Return(time.Now().Add(time.Hour*-1)), + capaPlannerIF.EXPECT().IsCoolingDown(gomock.Any(), false).Return(false, time.Second*0), + metricMocks.plannedJobCount.EXPECT().Set(float64(1)), + scalerIF.EXPECT().ScaleTo(scaleTo, false), + ) + + planFunc := func(scaleValue float32, currentScale uint) uint { + return scaleTo + } + + sokar.triggerScale(false, scaleFactor, planFunc) +} + func Test_TriggerScale_ErrGettingJobCount(t *testing.T) { mockCtrl := gomock.NewController(t) @@ -212,7 +246,8 @@ func Test_TriggerScale_ErrScaleTo(t *testing.T) { gomock.InOrder( scalerIF.EXPECT().GetCount().Return(currentScale, nil), metricMocks.preScaleJobCount.EXPECT().Set(float64(currentScale)), - capaPlannerIF.EXPECT().IsCoolingDown(gomock.Any(), false).Return(false), + scalerIF.EXPECT().GetTimeOfLastScaleAction().Return(time.Now()), + capaPlannerIF.EXPECT().IsCoolingDown(gomock.Any(), false).Return(false, time.Second*0), metricMocks.plannedJobCount.EXPECT().Set(float64(scaleTo)), scalerIF.EXPECT().ScaleTo(scaleTo, false).Return(fmt.Errorf("Unable to scale")), metricMocks.failedScalingTotal.EXPECT().Inc(), diff --git a/test/sokar/mock_capacity_planner_IF.go b/test/sokar/mock_capacity_planner_IF.go index 11e87c93..3d4f37b2 100644 --- a/test/sokar/mock_capacity_planner_IF.go +++ b/test/sokar/mock_capacity_planner_IF.go @@ -48,11 +48,12 @@ func (mr *MockCapacityPlannerMockRecorder) Plan(scaleFactor, currentScale interf } // IsCoolingDown mocks base method -func (m *MockCapacityPlanner) IsCoolingDown(timeOfLastScale time.Time, scaleDown bool) bool { +func (m *MockCapacityPlanner) IsCoolingDown(timeOfLastScale time.Time, scaleDown bool) (bool, time.Duration) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "IsCoolingDown", timeOfLastScale, scaleDown) ret0, _ := ret[0].(bool) - return ret0 + ret1, _ := ret[1].(time.Duration) + return ret0, ret1 } // IsCoolingDown indicates an expected call of IsCoolingDown diff --git a/test/sokar/mock_scaler_IF.go b/test/sokar/mock_scaler_IF.go index 287a9adb..3245c62f 100644 --- a/test/sokar/mock_scaler_IF.go +++ b/test/sokar/mock_scaler_IF.go @@ -7,6 +7,7 @@ package mock_sokar import ( gomock "github.com/golang/mock/gomock" reflect "reflect" + time "time" ) // MockScaler is a mock of Scaler interface @@ -33,17 +34,17 @@ func (m *MockScaler) EXPECT() *MockScalerMockRecorder { } // ScaleTo mocks base method -func (m *MockScaler) ScaleTo(count uint, dryRun bool) error { +func (m *MockScaler) ScaleTo(count uint, force bool) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ScaleTo", count, dryRun) + ret := m.ctrl.Call(m, "ScaleTo", count, force) ret0, _ := ret[0].(error) return ret0 } // ScaleTo indicates an expected call of ScaleTo -func (mr *MockScalerMockRecorder) ScaleTo(count, dryRun interface{}) *gomock.Call { +func (mr *MockScalerMockRecorder) ScaleTo(count, force interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ScaleTo", reflect.TypeOf((*MockScaler)(nil).ScaleTo), count, dryRun) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ScaleTo", reflect.TypeOf((*MockScaler)(nil).ScaleTo), count, force) } // GetCount mocks base method @@ -60,3 +61,17 @@ func (mr *MockScalerMockRecorder) GetCount() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetCount", reflect.TypeOf((*MockScaler)(nil).GetCount)) } + +// GetTimeOfLastScaleAction mocks base method +func (m *MockScaler) GetTimeOfLastScaleAction() time.Time { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetTimeOfLastScaleAction") + ret0, _ := ret[0].(time.Time) + return ret0 +} + +// GetTimeOfLastScaleAction indicates an expected call of GetTimeOfLastScaleAction +func (mr *MockScalerMockRecorder) GetTimeOfLastScaleAction() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTimeOfLastScaleAction", reflect.TypeOf((*MockScaler)(nil).GetTimeOfLastScaleAction)) +}