Skip to content

Commit

Permalink
Merge pull request #109 from ThomasObenaus/f/108_instance_downscaling…
Browse files Browse the repository at this point in the history
…_incomplete

Fix: Instance downscaling incomplete (#108)
  • Loading branch information
ThomasObenaus authored Oct 29, 2019
2 parents 312a6df + a7120fc commit e282403
Show file tree
Hide file tree
Showing 16 changed files with 88 additions and 45 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

- BugFixes:
- [#106 Downscaling of AWS instances fails (Throttling: Rate exceeded)](https://github.com/ThomasObenaus/sokar/issues/106)
- [#108 Instance Downscaling does not complete](https://github.com/ThomasObenaus/sokar/issues/108)
- Config: With [#108](https://github.com/ThomasObenaus/sokar/issues/108) the new configuration parameter `instance-termination-timeout` was introduced.

## v0.0.10 (2019-10-13)

Expand Down
11 changes: 11 additions & 0 deletions config/Config.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,17 @@ The order they are applied is:
| flag | --sca.nomad.dc-aws.region |
| env | SK_SCA_NOMAD_DC_AWS_REGION |

##### Timeout for Instance Termination

| | |
| ------- | -------------------------------------------------------------------------------------------------------------------------------------------- |
| name | instance-termination-timeout |
| usage | The maximum time the instance termination will be monitored before assuming that this action (instance termination due to downscale) failed. |
| type | duration |
| default | 10m |
| flag | --sca.nomad.dc-aws.instance-termination-timeout |
| env | SK_SCA_NOMAD_DC_AWS_INSTANCE_TERMINATION_TIMEOUT |

## ScaleObject

### Name
Expand Down
7 changes: 4 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,10 @@ type SCANomad struct {

// SCANomadDataCenterAWS represents the parameters needed for the nomad based scaler for mode data-center running on AWS.
type SCANomadDataCenterAWS struct {
Profile string `json:"profile,omitempty"`
Region string `json:"region,omitempty"`
ASGTagKey string `json:"asg_tag_key,omitempty"`
Profile string `json:"profile,omitempty"`
Region string `json:"region,omitempty"`
ASGTagKey string `json:"asg_tag_key,omitempty"`
InstanceTerminationTimeout time.Duration `json:"instance_termination_timeout,omitempty"`
}

// ScaleObject represents the definition for the object that should be scaled.
Expand Down
9 changes: 9 additions & 0 deletions config/entries.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,14 @@ var scaNomadDataCenterAWSRegion = configEntry{
usage: "This is an optional parameter and is regarded only if the parameter AWSProfile is empty. The AWSRegion has to specify the region in which the data-center to be scaled resides in.",
}

var scaNomadDataCenterAWSInstanceTerminationTimeout = configEntry{
name: "sca.nomad.dc-aws.instance-termination-timeout",
bindEnv: true,
bindFlag: true,
defaultValue: time.Minute * 10,
usage: "The maximum time the instance termination will be monitored before assuming that this action (instance termination due to downscale) failed.",
}

var scaNomadModeServerAddress = configEntry{
name: "sca.nomad.server-address",
bindEnv: true,
Expand Down Expand Up @@ -285,6 +293,7 @@ var configEntries = []configEntry{
scaAWSEC2ASGTagKey,
scaNomadDataCenterAWSProfile,
scaNomadDataCenterAWSRegion,
scaNomadDataCenterAWSInstanceTerminationTimeout,
scaNomadModeServerAddress,
capConstantModeEnable,
capConstantModeOffset,
Expand Down
1 change: 1 addition & 0 deletions config/fillCfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func (cfg *Config) fillScaler() error {
cfg.Scaler.Nomad.ServerAddr = cfg.viper.GetString(scaNomadModeServerAddress.name)
cfg.Scaler.Nomad.DataCenterAWS.Profile = cfg.viper.GetString(scaNomadDataCenterAWSProfile.name)
cfg.Scaler.Nomad.DataCenterAWS.Region = cfg.viper.GetString(scaNomadDataCenterAWSRegion.name)
cfg.Scaler.Nomad.DataCenterAWS.InstanceTerminationTimeout = cfg.viper.GetDuration(scaNomadDataCenterAWSInstanceTerminationTimeout.name)

return validateScaler(cfg.Scaler)
}
Expand Down
3 changes: 3 additions & 0 deletions config/fillCfg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ func Test_FillCfg_Flags(t *testing.T) {
"--sca.aws-ec2.profile=profile-test",
"--sca.aws-ec2.region=region-test",
"--sca.aws-ec2.asg-tag-key=asg-tag-key",
"--sca.aws-ec2.asg-tag-key=asg-tag-key",
"--sca.nomad.dc-aws.instance-termination-timeout=124s",
"--sca.watcher-interval=50s",
"--cap.constant-mode.enable=false",
"--cap.constant-mode.offset=106",
Expand All @@ -49,6 +51,7 @@ func Test_FillCfg_Flags(t *testing.T) {
assert.Equal(t, ScalerModeAwsEc2, cfg.Scaler.Mode)
assert.Equal(t, "profile-test", cfg.Scaler.Nomad.DataCenterAWS.Profile)
assert.Equal(t, "region-test", cfg.Scaler.Nomad.DataCenterAWS.Region)
assert.Equal(t, time.Duration(time.Second*124), cfg.Scaler.Nomad.DataCenterAWS.InstanceTerminationTimeout)
assert.Equal(t, "http://nomad", cfg.Scaler.Nomad.ServerAddr)
assert.Equal(t, "profile-test", cfg.Scaler.AwsEc2.Profile)
assert.Equal(t, "region-test", cfg.Scaler.AwsEc2.Region)
Expand Down
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ func setupScalingTarget(cfg config.Scaler, logF logging.LoggerFactory) (scaler.S
cfg.Nomad.DataCenterAWS.Profile,
nomadWorker.WithLogger(logF.NewNamedLogger("sokar.nomadWorker")),
nomadWorker.WithAwsRegion(cfg.Nomad.DataCenterAWS.Region),
nomadWorker.TimeoutForInstanceTermination(cfg.Nomad.DataCenterAWS.InstanceTerminationTimeout),
)
if err != nil {
return nil, fmt.Errorf("Failed setting up nomad worker connector: %s", err)
Expand Down
15 changes: 11 additions & 4 deletions nomadWorker/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ type Connector struct {
// nodeDrainDeadline the maximum amount of time nomad will wait before the containers will be forced to be moved
nodeDrainDeadline time.Duration

// monitorInstanceTimeout is the timeout used to monitor the scale of an aws instance at maximum
monitorInstanceTimeout time.Duration
// instanceTerminationTimeout is the timeout used to monitor the scale of an aws instance at maximum
instanceTerminationTimeout time.Duration
}

// Option represents an option for the nomadWorker Connector
Expand All @@ -67,6 +67,13 @@ func WithAwsRegion(region string) Option {
}
}

// TimeoutForInstanceTermination sets the maximum time the instance termination will be monitored before assuming that this action failed.
func TimeoutForInstanceTermination(timeout time.Duration) Option {
return func(c *Connector) {
c.instanceTerminationTimeout = timeout
}
}

// New creates a new nomad worker connector.
// The profile represents the name of the aws profile that shall be used to access the resources to scale the aws AutoScalingGroup.
// This parameter is optional. If the profile is NOT set the instance where sokar runs on has to have enough permissions to access the
Expand All @@ -81,8 +88,8 @@ func New(nomadServerAddress, awsProfile string, options ...Option) (*Connector,
autoScalingFactory: &aws.AutoScalingFactoryImpl{},
fnCreateSession: aws.NewAWSSession,
fnCreateSessionFromProfile: aws.NewAWSSessionFromProfile,
nodeDrainDeadline: time.Second * 30,
monitorInstanceTimeout: time.Second * 180,
nodeDrainDeadline: time.Second * 60,
instanceTerminationTimeout: time.Minute * 10,
awsProfile: awsProfile,
}

Expand Down
26 changes: 17 additions & 9 deletions nomadWorker/downscale.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package nomadWorker

import (
"fmt"
"time"

nomadApi "github.com/hashicorp/nomad/api"
"github.com/pkg/errors"
Expand All @@ -12,6 +13,8 @@ import (
const MaxUint = ^uint(0)
const MaxInt = int(MaxUint >> 1)

var downscaleCounter uint

type candidate struct {
// nodeID is the nomad node ID
nodeID string
Expand All @@ -26,41 +29,46 @@ type candidate struct {
}

func (c *Connector) downscale(datacenter string, desiredCount uint) error {
downscaleCounter++

// 1. Select a candidate for downscaling -> returns [needs node id]
candidate, err := selectCandidate(c.nodesIF, datacenter, c.log)
if err != nil {
return err
}

c.log.Info().Str("NodeID", candidate.nodeID).Msgf("1. [Select] Selected node '%s' (%s, %s) as candidate for downscaling.", candidate.nodeID, candidate.ipAddress, candidate.instanceID)

c.log.Info().Str("NodeID", candidate.nodeID).Uint("count", downscaleCounter).Msgf("1. [Select] Selected node '%s' (%s, %s) as candidate for downscaling.", candidate.nodeID, candidate.ipAddress, candidate.instanceID)
// 2. Drain the node [needs node id]
c.log.Info().Str("NodeID", candidate.nodeID).Msgf("2. [Drain] Draining node '%s' (%s, %s) ... ", candidate.nodeID, candidate.ipAddress, candidate.instanceID)
c.log.Info().Str("NodeID", candidate.nodeID).Uint("count", downscaleCounter).Msgf("2. [Drain] Draining node '%s' (%s, %s) ... ", candidate.nodeID, candidate.ipAddress, candidate.instanceID)
nodeModifyIndex, err := drainNode(c.nodesIF, candidate.nodeID, c.nodeDrainDeadline)
if err != nil {
return err
}
monitorDrainNode(c.nodesIF, candidate.nodeID, nodeModifyIndex, c.log)
c.log.Info().Str("NodeID", candidate.nodeID).Msgf("2. [Drain] Draining node '%s' (%s, %s) ... done", candidate.nodeID, candidate.ipAddress, candidate.instanceID)
monitorDrainNode(c.nodesIF, candidate.nodeID, nodeModifyIndex, c.nodeDrainDeadline+time.Second*30, c.log)
c.log.Info().Str("NodeID", candidate.nodeID).Uint("count", downscaleCounter).Msgf("2. [Drain] Draining node '%s' (%s, %s) ... done", candidate.nodeID, candidate.ipAddress, candidate.instanceID)

// 3. Terminate the node using the AWS ASG [needs instance id]
c.log.Info().Str("NodeID", candidate.nodeID).Msgf("3. [Terminate] Terminate node '%s' (%s, %s) ... ", candidate.nodeID, candidate.ipAddress, candidate.instanceID)
c.log.Info().Str("NodeID", candidate.nodeID).Uint("count", downscaleCounter).Msgf("3. [Terminate] Terminate node '%s' (%s, %s) ... ", candidate.nodeID, candidate.ipAddress, candidate.instanceID)
sess, err := c.createSession()
if err != nil {
err = errors.WithMessage(err, "Creating AWS session for instance termination failed.")
c.log.Error().Err(err).Str("NodeID", candidate.nodeID).Uint("count", downscaleCounter).Msgf("3. [Terminate] Terminate node '%s' (%s, %s) ... failed", candidate.nodeID, candidate.ipAddress, candidate.instanceID)
return err
}
autoScalingIF := c.autoScalingFactory.CreateAutoScaling(sess)
autoscalingGroupName, activityID, err := aws.TerminateInstanceInAsg(autoScalingIF, candidate.instanceID)
if err != nil {
c.log.Error().Err(err).Str("NodeID", candidate.nodeID).Uint("count", downscaleCounter).Msgf("3. [Terminate] Terminate node '%s' (%s, %s) ... failed", candidate.nodeID, candidate.ipAddress, candidate.instanceID)
return err
}

// wait until the instance is scaled down
if iter, err := aws.MonitorInstanceScaling(autoScalingIF, autoscalingGroupName, activityID, c.monitorInstanceTimeout); err != nil {
return errors.WithMessage(err, fmt.Sprintf("Monitor instance scaling failed after %d iterations.", iter))
if iter, err := aws.MonitorInstanceScaling(autoScalingIF, autoscalingGroupName, activityID, c.instanceTerminationTimeout); err != nil {
err = errors.WithMessage(err, fmt.Sprintf("Monitor instance scaling failed after %d iterations.", iter))
c.log.Error().Err(err).Str("NodeID", candidate.nodeID).Uint("count", downscaleCounter).Msgf("3. [Terminate] Terminate node '%s' (%s, %s) ... failed", candidate.nodeID, candidate.ipAddress, candidate.instanceID)
return err
}
c.log.Info().Str("NodeID", candidate.nodeID).Msgf("3. [Terminate] Terminate node '%s' (%s, %s) ... done", candidate.nodeID, candidate.ipAddress, candidate.instanceID)
c.log.Info().Str("NodeID", candidate.nodeID).Uint("count", downscaleCounter).Msgf("3. [Terminate] Terminate node '%s' (%s, %s) ... done", candidate.nodeID, candidate.ipAddress, candidate.instanceID)
return nil
}

Expand Down
36 changes: 9 additions & 27 deletions nomadWorker/drain_node.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nomadWorker

import (
"context"
"time"

nomadApi "github.com/hashicorp/nomad/api"
Expand All @@ -18,42 +19,23 @@ func drainNode(nodesIF Nodes, nodeID string, deadline time.Duration) (nodeModify
return resp.NodeModifyIndex, err
}

func monitorDrainNode(nodesIF Nodes, nodeID string, nodeModifyIndex uint64, logger zerolog.Logger) uint {
func monitorDrainNode(nodesIF Nodes, nodeID string, nodeModifyIndex uint64, timeout time.Duration, logger zerolog.Logger) uint {

logger.Info().Str("NodeID", nodeID).Msgf("Monitoring node draining (node=%s) ... ", nodeID)

deadline := time.Now().Add(time.Second * 60)
ctx := monitoringCtx{
deadline: deadline,
doneChan: make(chan struct{}),
}
logger.Info().Str("NodeID", nodeID).Msgf("Monitoring node draining (node=%s, timeout=%s) ... ", nodeID, timeout.String())

var numEvents uint
ctx, cancel := context.WithTimeout(context.Background(), timeout)
// FIXME: Find out if and when we need to call the cancel function from the outside to close the context
_ = cancel

// create and obtain the monitoring channel and then wait until it is closed
events := nodesIF.MonitorDrain(ctx, nodeID, nodeModifyIndex, false)
for ev := range events {
if ev != nil {
logger.Info().Str("NodeID", nodeID).Msg(ev.String())
numEvents++
}
}
logger.Info().Str("NodeID", nodeID).Msgf("Monitoring node draining (node=%s) ... done", nodeID)
logger.Info().Str("NodeID", nodeID).Msgf("Monitoring node draining (node=%s, timeout=%s, #events=%d) ... done", nodeID, timeout.String(), numEvents)
return numEvents
}

type monitoringCtx struct {
doneChan <-chan struct{}
deadline time.Time
}

func (ctx monitoringCtx) Deadline() (deadline time.Time, ok bool) {
return ctx.deadline, false
}
func (ctx monitoringCtx) Done() <-chan struct{} {
return ctx.doneChan
}
func (ctx monitoringCtx) Err() error {
return nil
}
func (ctx monitoringCtx) Value(key interface{}) interface{} {
return nil
}
5 changes: 3 additions & 2 deletions nomadWorker/drain_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
nomadApi "github.com/hashicorp/nomad/api"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"github.com/thomasobenaus/sokar/test/nomadWorker"
mock_nomadWorker "github.com/thomasobenaus/sokar/test/nomadWorker"
)

func TestDrainNode(t *testing.T) {
Expand All @@ -34,6 +34,7 @@ func TestMonitorDrainNode(t *testing.T) {

logger := zerolog.New(os.Stdout).With().Timestamp().Logger()

monitorTimeout := time.Second * 10
nodeID := "1234"
nodeModifyIndex := uint64(1234)
evChan := make(chan *nomadApi.MonitorMessage)
Expand All @@ -45,6 +46,6 @@ func TestMonitorDrainNode(t *testing.T) {
}()

nodesIF.EXPECT().MonitorDrain(gomock.Any(), nodeID, nodeModifyIndex, false).Return(evChan)
numEvents := monitorDrainNode(nodesIF, nodeID, nodeModifyIndex, logger)
numEvents := monitorDrainNode(nodesIF, nodeID, nodeModifyIndex, monitorTimeout, logger)
assert.Equal(t, uint(1), numEvents)
}
1 change: 1 addition & 0 deletions vendor/github.com/hashicorp/nomad/api/go.mod

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions vendor/github.com/hashicorp/nomad/api/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions vendor/github.com/rs/zerolog/go.mod

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions vendor/github.com/rs/zerolog/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions vendor/github.com/stretchr/testify/go.mod

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit e282403

Please sign in to comment.