diff --git a/CHANGELOG.md b/CHANGELOG.md index 80e1e163..ec618533 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ - BugFixes: - [#106 Downscaling of AWS instances fails (Throttling: Rate exceeded)](https://github.com/ThomasObenaus/sokar/issues/106) + - [#108 Instance Downscaling does not complete](https://github.com/ThomasObenaus/sokar/issues/108) +- Config: With [#108](https://github.com/ThomasObenaus/sokar/issues/108) the new configuration parameter `instance-termination-timeout` was introduced. ## v0.0.10 (2019-10-13) diff --git a/config/Config.md b/config/Config.md index 41bbf322..bfbcfd47 100644 --- a/config/Config.md +++ b/config/Config.md @@ -145,6 +145,17 @@ The order they are applied is: | flag | --sca.nomad.dc-aws.region | | env | SK_SCA_NOMAD_DC_AWS_REGION | +##### Timeout for Instance Termination + +| | | +| ------- | -------------------------------------------------------------------------------------------------------------------------------------------- | +| name | instance-termination-timeout | +| usage | The maximum time the instance termination will be monitored before assuming that this action (instance termination due to downscale) failed. | +| type | duration | +| default | 10m | +| flag | --sca.nomad.dc-aws.instance-termination-timeout | +| env | SK_SCA_NOMAD_DC_AWS_INSTANCE_TERMINATION_TIMEOUT | + ## ScaleObject ### Name diff --git a/config/config.go b/config/config.go index 87670b62..618a15cd 100644 --- a/config/config.go +++ b/config/config.go @@ -58,9 +58,10 @@ type SCANomad struct { // SCANomadDataCenterAWS represents the parameters needed for the nomad based scaler for mode data-center running on AWS. type SCANomadDataCenterAWS struct { - Profile string `json:"profile,omitempty"` - Region string `json:"region,omitempty"` - ASGTagKey string `json:"asg_tag_key,omitempty"` + Profile string `json:"profile,omitempty"` + Region string `json:"region,omitempty"` + ASGTagKey string `json:"asg_tag_key,omitempty"` + InstanceTerminationTimeout time.Duration `json:"instance_termination_timeout,omitempty"` } // ScaleObject represents the definition for the object that should be scaled. diff --git a/config/entries.go b/config/entries.go index 9a99ff7b..b8c02b3c 100644 --- a/config/entries.go +++ b/config/entries.go @@ -87,6 +87,14 @@ var scaNomadDataCenterAWSRegion = configEntry{ usage: "This is an optional parameter and is regarded only if the parameter AWSProfile is empty. The AWSRegion has to specify the region in which the data-center to be scaled resides in.", } +var scaNomadDataCenterAWSInstanceTerminationTimeout = configEntry{ + name: "sca.nomad.dc-aws.instance-termination-timeout", + bindEnv: true, + bindFlag: true, + defaultValue: time.Minute * 10, + usage: "The maximum time the instance termination will be monitored before assuming that this action (instance termination due to downscale) failed.", +} + var scaNomadModeServerAddress = configEntry{ name: "sca.nomad.server-address", bindEnv: true, @@ -285,6 +293,7 @@ var configEntries = []configEntry{ scaAWSEC2ASGTagKey, scaNomadDataCenterAWSProfile, scaNomadDataCenterAWSRegion, + scaNomadDataCenterAWSInstanceTerminationTimeout, scaNomadModeServerAddress, capConstantModeEnable, capConstantModeOffset, diff --git a/config/fillCfg.go b/config/fillCfg.go index 6892487b..2e06fba3 100644 --- a/config/fillCfg.go +++ b/config/fillCfg.go @@ -28,6 +28,7 @@ func (cfg *Config) fillScaler() error { cfg.Scaler.Nomad.ServerAddr = cfg.viper.GetString(scaNomadModeServerAddress.name) cfg.Scaler.Nomad.DataCenterAWS.Profile = cfg.viper.GetString(scaNomadDataCenterAWSProfile.name) cfg.Scaler.Nomad.DataCenterAWS.Region = cfg.viper.GetString(scaNomadDataCenterAWSRegion.name) + cfg.Scaler.Nomad.DataCenterAWS.InstanceTerminationTimeout = cfg.viper.GetDuration(scaNomadDataCenterAWSInstanceTerminationTimeout.name) return validateScaler(cfg.Scaler) } diff --git a/config/fillCfg_test.go b/config/fillCfg_test.go index d67384d5..b84f1544 100644 --- a/config/fillCfg_test.go +++ b/config/fillCfg_test.go @@ -37,6 +37,8 @@ func Test_FillCfg_Flags(t *testing.T) { "--sca.aws-ec2.profile=profile-test", "--sca.aws-ec2.region=region-test", "--sca.aws-ec2.asg-tag-key=asg-tag-key", + "--sca.aws-ec2.asg-tag-key=asg-tag-key", + "--sca.nomad.dc-aws.instance-termination-timeout=124s", "--sca.watcher-interval=50s", "--cap.constant-mode.enable=false", "--cap.constant-mode.offset=106", @@ -49,6 +51,7 @@ func Test_FillCfg_Flags(t *testing.T) { assert.Equal(t, ScalerModeAwsEc2, cfg.Scaler.Mode) assert.Equal(t, "profile-test", cfg.Scaler.Nomad.DataCenterAWS.Profile) assert.Equal(t, "region-test", cfg.Scaler.Nomad.DataCenterAWS.Region) + assert.Equal(t, time.Duration(time.Second*124), cfg.Scaler.Nomad.DataCenterAWS.InstanceTerminationTimeout) assert.Equal(t, "http://nomad", cfg.Scaler.Nomad.ServerAddr) assert.Equal(t, "profile-test", cfg.Scaler.AwsEc2.Profile) assert.Equal(t, "region-test", cfg.Scaler.AwsEc2.Region) diff --git a/main.go b/main.go index 31a80a16..e84a0be9 100644 --- a/main.go +++ b/main.go @@ -227,6 +227,7 @@ func setupScalingTarget(cfg config.Scaler, logF logging.LoggerFactory) (scaler.S cfg.Nomad.DataCenterAWS.Profile, nomadWorker.WithLogger(logF.NewNamedLogger("sokar.nomadWorker")), nomadWorker.WithAwsRegion(cfg.Nomad.DataCenterAWS.Region), + nomadWorker.TimeoutForInstanceTermination(cfg.Nomad.DataCenterAWS.InstanceTerminationTimeout), ) if err != nil { return nil, fmt.Errorf("Failed setting up nomad worker connector: %s", err) diff --git a/nomadWorker/connector.go b/nomadWorker/connector.go index bd394e28..339f40b8 100644 --- a/nomadWorker/connector.go +++ b/nomadWorker/connector.go @@ -46,8 +46,8 @@ type Connector struct { // nodeDrainDeadline the maximum amount of time nomad will wait before the containers will be forced to be moved nodeDrainDeadline time.Duration - // monitorInstanceTimeout is the timeout used to monitor the scale of an aws instance at maximum - monitorInstanceTimeout time.Duration + // instanceTerminationTimeout is the timeout used to monitor the scale of an aws instance at maximum + instanceTerminationTimeout time.Duration } // Option represents an option for the nomadWorker Connector @@ -67,6 +67,13 @@ func WithAwsRegion(region string) Option { } } +// TimeoutForInstanceTermination sets the maximum time the instance termination will be monitored before assuming that this action failed. +func TimeoutForInstanceTermination(timeout time.Duration) Option { + return func(c *Connector) { + c.instanceTerminationTimeout = timeout + } +} + // New creates a new nomad worker connector. // The profile represents the name of the aws profile that shall be used to access the resources to scale the aws AutoScalingGroup. // This parameter is optional. If the profile is NOT set the instance where sokar runs on has to have enough permissions to access the @@ -81,8 +88,8 @@ func New(nomadServerAddress, awsProfile string, options ...Option) (*Connector, autoScalingFactory: &aws.AutoScalingFactoryImpl{}, fnCreateSession: aws.NewAWSSession, fnCreateSessionFromProfile: aws.NewAWSSessionFromProfile, - nodeDrainDeadline: time.Second * 30, - monitorInstanceTimeout: time.Second * 180, + nodeDrainDeadline: time.Second * 60, + instanceTerminationTimeout: time.Minute * 10, awsProfile: awsProfile, } diff --git a/nomadWorker/downscale.go b/nomadWorker/downscale.go index fcdc51db..3b1b4605 100644 --- a/nomadWorker/downscale.go +++ b/nomadWorker/downscale.go @@ -2,6 +2,7 @@ package nomadWorker import ( "fmt" + "time" nomadApi "github.com/hashicorp/nomad/api" "github.com/pkg/errors" @@ -12,6 +13,8 @@ import ( const MaxUint = ^uint(0) const MaxInt = int(MaxUint >> 1) +var downscaleCounter uint + type candidate struct { // nodeID is the nomad node ID nodeID string @@ -26,6 +29,7 @@ type candidate struct { } func (c *Connector) downscale(datacenter string, desiredCount uint) error { + downscaleCounter++ // 1. Select a candidate for downscaling -> returns [needs node id] candidate, err := selectCandidate(c.nodesIF, datacenter, c.log) @@ -33,34 +37,38 @@ func (c *Connector) downscale(datacenter string, desiredCount uint) error { return err } - c.log.Info().Str("NodeID", candidate.nodeID).Msgf("1. [Select] Selected node '%s' (%s, %s) as candidate for downscaling.", candidate.nodeID, candidate.ipAddress, candidate.instanceID) - + c.log.Info().Str("NodeID", candidate.nodeID).Uint("count", downscaleCounter).Msgf("1. [Select] Selected node '%s' (%s, %s) as candidate for downscaling.", candidate.nodeID, candidate.ipAddress, candidate.instanceID) // 2. Drain the node [needs node id] - c.log.Info().Str("NodeID", candidate.nodeID).Msgf("2. [Drain] Draining node '%s' (%s, %s) ... ", candidate.nodeID, candidate.ipAddress, candidate.instanceID) + c.log.Info().Str("NodeID", candidate.nodeID).Uint("count", downscaleCounter).Msgf("2. [Drain] Draining node '%s' (%s, %s) ... ", candidate.nodeID, candidate.ipAddress, candidate.instanceID) nodeModifyIndex, err := drainNode(c.nodesIF, candidate.nodeID, c.nodeDrainDeadline) if err != nil { return err } - monitorDrainNode(c.nodesIF, candidate.nodeID, nodeModifyIndex, c.log) - c.log.Info().Str("NodeID", candidate.nodeID).Msgf("2. [Drain] Draining node '%s' (%s, %s) ... done", candidate.nodeID, candidate.ipAddress, candidate.instanceID) + monitorDrainNode(c.nodesIF, candidate.nodeID, nodeModifyIndex, c.nodeDrainDeadline+time.Second*30, c.log) + c.log.Info().Str("NodeID", candidate.nodeID).Uint("count", downscaleCounter).Msgf("2. [Drain] Draining node '%s' (%s, %s) ... done", candidate.nodeID, candidate.ipAddress, candidate.instanceID) // 3. Terminate the node using the AWS ASG [needs instance id] - c.log.Info().Str("NodeID", candidate.nodeID).Msgf("3. [Terminate] Terminate node '%s' (%s, %s) ... ", candidate.nodeID, candidate.ipAddress, candidate.instanceID) + c.log.Info().Str("NodeID", candidate.nodeID).Uint("count", downscaleCounter).Msgf("3. [Terminate] Terminate node '%s' (%s, %s) ... ", candidate.nodeID, candidate.ipAddress, candidate.instanceID) sess, err := c.createSession() if err != nil { + err = errors.WithMessage(err, "Creating AWS session for instance termination failed.") + c.log.Error().Err(err).Str("NodeID", candidate.nodeID).Uint("count", downscaleCounter).Msgf("3. [Terminate] Terminate node '%s' (%s, %s) ... failed", candidate.nodeID, candidate.ipAddress, candidate.instanceID) return err } autoScalingIF := c.autoScalingFactory.CreateAutoScaling(sess) autoscalingGroupName, activityID, err := aws.TerminateInstanceInAsg(autoScalingIF, candidate.instanceID) if err != nil { + c.log.Error().Err(err).Str("NodeID", candidate.nodeID).Uint("count", downscaleCounter).Msgf("3. [Terminate] Terminate node '%s' (%s, %s) ... failed", candidate.nodeID, candidate.ipAddress, candidate.instanceID) return err } // wait until the instance is scaled down - if iter, err := aws.MonitorInstanceScaling(autoScalingIF, autoscalingGroupName, activityID, c.monitorInstanceTimeout); err != nil { - return errors.WithMessage(err, fmt.Sprintf("Monitor instance scaling failed after %d iterations.", iter)) + if iter, err := aws.MonitorInstanceScaling(autoScalingIF, autoscalingGroupName, activityID, c.instanceTerminationTimeout); err != nil { + err = errors.WithMessage(err, fmt.Sprintf("Monitor instance scaling failed after %d iterations.", iter)) + c.log.Error().Err(err).Str("NodeID", candidate.nodeID).Uint("count", downscaleCounter).Msgf("3. [Terminate] Terminate node '%s' (%s, %s) ... failed", candidate.nodeID, candidate.ipAddress, candidate.instanceID) + return err } - c.log.Info().Str("NodeID", candidate.nodeID).Msgf("3. [Terminate] Terminate node '%s' (%s, %s) ... done", candidate.nodeID, candidate.ipAddress, candidate.instanceID) + c.log.Info().Str("NodeID", candidate.nodeID).Uint("count", downscaleCounter).Msgf("3. [Terminate] Terminate node '%s' (%s, %s) ... done", candidate.nodeID, candidate.ipAddress, candidate.instanceID) return nil } diff --git a/nomadWorker/drain_node.go b/nomadWorker/drain_node.go index fb85b999..82ddbd82 100644 --- a/nomadWorker/drain_node.go +++ b/nomadWorker/drain_node.go @@ -1,6 +1,7 @@ package nomadWorker import ( + "context" "time" nomadApi "github.com/hashicorp/nomad/api" @@ -18,17 +19,16 @@ func drainNode(nodesIF Nodes, nodeID string, deadline time.Duration) (nodeModify return resp.NodeModifyIndex, err } -func monitorDrainNode(nodesIF Nodes, nodeID string, nodeModifyIndex uint64, logger zerolog.Logger) uint { +func monitorDrainNode(nodesIF Nodes, nodeID string, nodeModifyIndex uint64, timeout time.Duration, logger zerolog.Logger) uint { - logger.Info().Str("NodeID", nodeID).Msgf("Monitoring node draining (node=%s) ... ", nodeID) - - deadline := time.Now().Add(time.Second * 60) - ctx := monitoringCtx{ - deadline: deadline, - doneChan: make(chan struct{}), - } + logger.Info().Str("NodeID", nodeID).Msgf("Monitoring node draining (node=%s, timeout=%s) ... ", nodeID, timeout.String()) var numEvents uint + ctx, cancel := context.WithTimeout(context.Background(), timeout) + // FIXME: Find out if and when we need to call the cancel function from the outside to close the context + _ = cancel + + // create and obtain the monitoring channel and then wait until it is closed events := nodesIF.MonitorDrain(ctx, nodeID, nodeModifyIndex, false) for ev := range events { if ev != nil { @@ -36,24 +36,6 @@ func monitorDrainNode(nodesIF Nodes, nodeID string, nodeModifyIndex uint64, logg numEvents++ } } - logger.Info().Str("NodeID", nodeID).Msgf("Monitoring node draining (node=%s) ... done", nodeID) + logger.Info().Str("NodeID", nodeID).Msgf("Monitoring node draining (node=%s, timeout=%s, #events=%d) ... done", nodeID, timeout.String(), numEvents) return numEvents } - -type monitoringCtx struct { - doneChan <-chan struct{} - deadline time.Time -} - -func (ctx monitoringCtx) Deadline() (deadline time.Time, ok bool) { - return ctx.deadline, false -} -func (ctx monitoringCtx) Done() <-chan struct{} { - return ctx.doneChan -} -func (ctx monitoringCtx) Err() error { - return nil -} -func (ctx monitoringCtx) Value(key interface{}) interface{} { - return nil -} diff --git a/nomadWorker/drain_node_test.go b/nomadWorker/drain_node_test.go index d181d139..5dc0ba45 100644 --- a/nomadWorker/drain_node_test.go +++ b/nomadWorker/drain_node_test.go @@ -9,7 +9,7 @@ import ( nomadApi "github.com/hashicorp/nomad/api" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" - "github.com/thomasobenaus/sokar/test/nomadWorker" + mock_nomadWorker "github.com/thomasobenaus/sokar/test/nomadWorker" ) func TestDrainNode(t *testing.T) { @@ -34,6 +34,7 @@ func TestMonitorDrainNode(t *testing.T) { logger := zerolog.New(os.Stdout).With().Timestamp().Logger() + monitorTimeout := time.Second * 10 nodeID := "1234" nodeModifyIndex := uint64(1234) evChan := make(chan *nomadApi.MonitorMessage) @@ -45,6 +46,6 @@ func TestMonitorDrainNode(t *testing.T) { }() nodesIF.EXPECT().MonitorDrain(gomock.Any(), nodeID, nodeModifyIndex, false).Return(evChan) - numEvents := monitorDrainNode(nodesIF, nodeID, nodeModifyIndex, logger) + numEvents := monitorDrainNode(nodesIF, nodeID, nodeModifyIndex, monitorTimeout, logger) assert.Equal(t, uint(1), numEvents) } diff --git a/vendor/github.com/hashicorp/nomad/api/go.mod b/vendor/github.com/hashicorp/nomad/api/go.mod index dd8d5f43..6068ea2f 100644 --- a/vendor/github.com/hashicorp/nomad/api/go.mod +++ b/vendor/github.com/hashicorp/nomad/api/go.mod @@ -5,6 +5,7 @@ go 1.12 require ( github.com/docker/go-units v0.3.3 github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75 + github.com/gorilla/websocket v1.4.1 github.com/hashicorp/go-cleanhttp v0.5.1 github.com/hashicorp/go-rootcerts v1.0.0 github.com/hashicorp/go-uuid v1.0.1 diff --git a/vendor/github.com/hashicorp/nomad/api/go.sum b/vendor/github.com/hashicorp/nomad/api/go.sum index c7297afe..c6b86de6 100644 --- a/vendor/github.com/hashicorp/nomad/api/go.sum +++ b/vendor/github.com/hashicorp/nomad/api/go.sum @@ -4,6 +4,8 @@ github.com/docker/go-units v0.3.3 h1:Xk8S3Xj5sLGlG5g67hJmYMmUgXv5N4PhkjJHHqrwnTk github.com/docker/go-units v0.3.3/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75 h1:f0n1xnMSmBLzVfsMMvriDyA75NB/oBgILX2GcHXIQzY= github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75/go.mod h1:g2644b03hfBX9Ov0ZBDgXXens4rxSxmqFBbhvKv2yVA= +github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM= +github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/hashicorp/go-cleanhttp v0.5.1 h1:dH3aiDG9Jvb5r5+bYHsikaOUIpcM0xvgMXVoDkXMzJM= github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= github.com/hashicorp/go-rootcerts v1.0.0 h1:Rqb66Oo1X/eSV1x66xbDccZjhJigjg0+e82kpwzSwCI= diff --git a/vendor/github.com/rs/zerolog/go.mod b/vendor/github.com/rs/zerolog/go.mod index 0275fdd4..1af839b3 100644 --- a/vendor/github.com/rs/zerolog/go.mod +++ b/vendor/github.com/rs/zerolog/go.mod @@ -7,3 +7,5 @@ require ( github.com/zenazn/goji v0.9.0 golang.org/x/tools v0.0.0-20190425163242-31fd60d6bfdc ) + +go 1.13 diff --git a/vendor/github.com/rs/zerolog/go.sum b/vendor/github.com/rs/zerolog/go.sum new file mode 100644 index 00000000..db8f6e81 --- /dev/null +++ b/vendor/github.com/rs/zerolog/go.sum @@ -0,0 +1,10 @@ +github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= +github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20190425163242-31fd60d6bfdc/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= diff --git a/vendor/github.com/stretchr/testify/go.mod b/vendor/github.com/stretchr/testify/go.mod index 50536488..624efddb 100644 --- a/vendor/github.com/stretchr/testify/go.mod +++ b/vendor/github.com/stretchr/testify/go.mod @@ -6,3 +6,5 @@ require ( github.com/stretchr/objx v0.1.0 gopkg.in/yaml.v2 v2.2.2 ) + +go 1.13