diff --git a/CLOUDPROVIDERS.md b/CLOUDPROVIDERS.md index 49564e79..be25accb 100644 --- a/CLOUDPROVIDERS.md +++ b/CLOUDPROVIDERS.md @@ -14,7 +14,51 @@ All configuration is in a stanza named after the backend, and takes simple key v aws --- -### TODO +#### Overview + +The AWS cloud provider uses the IP addresses of incoming metric datagrams +to lookup the tags of EC2 instances with matching private IP addresses and +decorates the associated metrics with said tags. A "local IP mode" is also +provided that can be used to lookup tags of the local EC2 instance on which +`gostatsd` is running. + +Tags are queried using the EC2 metadata service to execute a +[`DescribeInstances`](https://docs.aws.amazon.com/sdk-for-go/api/service/ec2/#EC2.DescribeInstances) +operation. Unless local IP mode is enabled, instances are located by +setting the `private-ip-address` in the `DescribeInstances` filter to the values +of the incoming IP addresses. In local IP mode, instances are located by setting +the `instance-id` in the `DescribeInstances` filter to the instance ID returned +by the [EC2 instance identity document](https://docs.aws.amazon.com/sdk-for-go/api/aws/ec2metadata/#EC2InstanceIdentityDocument). + +#### Important details + +Like the k8s provider, `ignore-host` must be set to `false` for the cloud +provider to work at all! This is because it works based off the +source IP address of incoming metrics, and these are dropped if +`ignore-host=true`. + +#### Example with defaults + +```$toml +cloud-provider = 'aws' + +[aws] +max_retries = 3 +client_timeout = 9000000000 +max_instances_batch = 32 +local_ip_mode = "never" +local_ip_whitelist = "127.0.0.1 localhost 172.17.0.1" +``` + +The configuration settings are as follows: +- `max_retries`: The maximum number of times that a request will be retried by the AWS client before failure +- `client_timeout`: The timeout (in nanoseconds) for the HTTP client passed to the AWS client +- `max_instances_batch`: The maximum number of instances that can be requested at a time by the Cloud Provider pipeline handler +- `local_ip_mode`: If set to `deny`, will never try to lookup tags of the local instance, even if datagrams + are sent with local interface addresses. If set to `allow`, will lookup tags of the local instance if datagrams + are sent with addresses on the local interface IP whitelist. +- `local_ip_whitelist`: A string separated list of local interface IPs that should trigger + the lookup of tags on the local instance. k8s --- diff --git a/pkg/cloudproviders/aws/aws.go b/pkg/cloudproviders/aws/aws.go index 2f20ffd7..49cd8f46 100644 --- a/pkg/cloudproviders/aws/aws.go +++ b/pkg/cloudproviders/aws/aws.go @@ -7,6 +7,7 @@ import ( "fmt" "net" "net/http" + "strings" "sync/atomic" "time" @@ -24,11 +25,38 @@ import ( "github.com/atlassian/gostatsd/pkg/stats" ) +type LocalIPMode int + +const ( + Deny LocalIPMode = iota + Allow +) + +func NewLocalIPMode(str string) LocalIPMode { + s := strings.ToLower(str) + switch s { + case "deny": + return Deny + case "allow": + return Allow + } + return Deny +} + +var ( + defaultLocalIPWhitelist = []string{ + "127.0.0.1", + "localhost", + "172.17.0.1", // docker gateway + } +) + const ( // ProviderName is the name of AWS cloud provider. ProviderName = "aws" defaultClientTimeout = 9 * time.Second defaultMaxInstancesBatch = 32 + defaultLocalIPMode = Deny ) // Provider represents an AWS provider. @@ -41,9 +69,11 @@ type Provider struct { logger logrus.FieldLogger - Metadata *ec2metadata.EC2Metadata - Ec2 *ec2.EC2 - MaxInstances int + Metadata *ec2metadata.EC2Metadata + Ec2 *ec2.EC2 + MaxInstances int + localIPMode LocalIPMode + localIPWhitelist []string } func (p *Provider) EstimatedTags() int { @@ -74,11 +104,21 @@ func (p *Provider) RunMetrics(ctx context.Context, statser stats.Statser) { // map is returned even in case of errors because it may contain partial data. func (p *Provider) Instance(ctx context.Context, IP ...gostatsd.Source) (map[gostatsd.Source]*gostatsd.Instance, error) { instances := make(map[gostatsd.Source]*gostatsd.Instance, len(IP)) - values := make([]*string, len(IP)) - for i, ip := range IP { + ips := make([]*string, len(IP)) + errors := make([]error, 0, 2) + n := 0 + lookupLocal := false + for _, ip := range IP { instances[ip] = nil // initialize map. Used for lookups to see if info for IP was requested - values[i] = aws.String(string(ip)) + if !p.isLocalIP(ip) { + ips[n] = aws.String(string(ip)) + n = n + 1 + } else { + lookupLocal = true + } } + + values := ips[:n] input := &ec2.DescribeInstancesInput{ Filters: []*ec2.Filter{ { @@ -89,7 +129,7 @@ func (p *Provider) Instance(ctx context.Context, IP ...gostatsd.Source) (map[gos } atomic.AddUint64(&p.describeInstanceCount, 1) - atomic.AddUint64(&p.describeInstanceInstances, uint64(len(IP))) + atomic.AddUint64(&p.describeInstanceInstances, uint64(len(values))) instancesFound := uint64(0) pages := uint64(0) @@ -104,37 +144,12 @@ func (p *Provider) Instance(ctx context.Context, IP ...gostatsd.Source) (map[gos continue } instancesFound++ - region, err := azToRegion(aws.StringValue(instance.Placement.AvailabilityZone)) - if err != nil { - p.logger.Errorf("Error getting instance region: %v", err) - } - tags := make(gostatsd.Tags, len(instance.Tags)+1) - for idx, tag := range instance.Tags { - tags[idx] = fmt.Sprintf("%s:%s", - gostatsd.NormalizeTagKey(aws.StringValue(tag.Key)), - aws.StringValue(tag.Value)) - } - tags[len(tags)-1] = "region:" + region - instances[ip] = &gostatsd.Instance{ - ID: gostatsd.Source(aws.StringValue(instance.InstanceId)), - Tags: tags, - } - p.logger.WithFields(logrus.Fields{ - "instance": instance.InstanceId, - "ip": ip, - "tags": tags, - }).Debug("Added tags") + instances[ip] = p.gostatsdInstanceFromInstance(ip, instance) } } return true }) - for ip, instance := range instances { - if instance == nil { - p.logger.WithField("ip", ip).Debug("No results looking up instance") - } - } - atomic.AddUint64(&p.describeInstancePages, pages) atomic.AddUint64(&p.describeInstanceFound, instancesFound) @@ -143,11 +158,35 @@ func (p *Provider) Instance(ctx context.Context, IP ...gostatsd.Source) (map[gos // Avoid spamming logs if instance id is not visible yet due to eventual consistency. // https://docs.aws.amazon.com/AWSEC2/latest/APIReference/errors-overview.html#CommonErrors - if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == "InvalidInstanceID.NotFound" { - return instances, nil + if !isEventualConsistencyErr(err) { + errors = append(errors, fmt.Errorf("error listing AWS instances: %v", err)) + } + } + + var localInstance *gostatsd.Instance + + if lookupLocal { + localInstance, err = p.instanceFromMetadata(ctx) + if err != nil && !isEventualConsistencyErr(err) { + errors = append(errors, fmt.Errorf("error inspecting local instance: %v", err)) + } + } + + for ip, instance := range instances { + if instance == nil { + if localInstance != nil && p.isLocalIP(ip) { + p.logger.WithField("ip", ip).Debugf("Using local instance for IP %v", ip) + instances[ip] = localInstance + } else { + p.logger.WithField("ip", ip).Debug("No results looking up instance") + } } - return instances, fmt.Errorf("error listing AWS instances: %v", err) } + + if len(errors) > 0 { + return instances, multiError(errors) + } + return instances, nil } @@ -187,6 +226,93 @@ func (p *Provider) Name() string { return ProviderName } +func (p *Provider) instanceFromMetadata(ctx context.Context) (*gostatsd.Instance, error) { + identityDoc, err := p.Metadata.GetInstanceIdentityDocument() + if err != nil { + return nil, err + } + + values := []*string{&identityDoc.InstanceID} + + input := &ec2.DescribeInstancesInput{ + Filters: []*ec2.Filter{ + { + Name: aws.String("instance-id"), + Values: values, + }, + }, + } + + atomic.AddUint64(&p.describeInstanceCount, 1) + atomic.AddUint64(&p.describeInstanceInstances, 1) + + var cachedInstance *gostatsd.Instance + + p.logger.Debugf("Looking up instance for local instance ID %v", identityDoc.InstanceID) + err = p.Ec2.DescribeInstancesPagesWithContext(ctx, input, func(page *ec2.DescribeInstancesOutput, lastPage bool) bool { + reservationCount := len(page.Reservations) + if reservationCount > 0 { + if reservationCount > 1 { + p.logger.WithFields(logrus.Fields{ + "instance": identityDoc.InstanceID, + }).Warnf("Found more than one reservation for local instance ID %v. Using first.", identityDoc.InstanceID) + } + + reservation := page.Reservations[0] + instanceCount := len(reservation.Instances) + if instanceCount > 0 { + if instanceCount > 1 { + p.logger.WithFields(logrus.Fields{ + "instance": identityDoc.InstanceID, + "reservationId": reservation.ReservationId, + }).Warnf("Found more than one instance for local instance ID %v and reservation ID %v. Using first.", identityDoc.InstanceID, reservation.ReservationId) + } + + cachedInstance = p.gostatsdInstanceFromInstance(gostatsd.Source(identityDoc.PrivateIP), reservation.Instances[0]) + } + } + return false + }) + + atomic.AddUint64(&p.describeInstancePages, 1) + if cachedInstance != nil { + atomic.AddUint64(&p.describeInstanceFound, 1) + } + + if err != nil { + atomic.AddUint64(&p.describeInstanceErrors, 1) + + return nil, err + } + + return cachedInstance, nil +} + +func (p *Provider) gostatsdInstanceFromInstance(ip gostatsd.Source, instance *ec2.Instance) *gostatsd.Instance { + region, err := azToRegion(aws.StringValue(instance.Placement.AvailabilityZone)) + if err != nil { + p.logger.Errorf("Error getting instance region: %v", err) + } + tags := make(gostatsd.Tags, len(instance.Tags)+1) + for idx, tag := range instance.Tags { + tags[idx] = fmt.Sprintf("%s:%s", + gostatsd.NormalizeTagKey(aws.StringValue(tag.Key)), + aws.StringValue(tag.Value)) + } + tags[len(tags)-1] = "region:" + region + + p.logger.WithFields(logrus.Fields{ + "instance": instance.InstanceId, + "ip": ip, + "tags": tags, + }).Debug("Added tags") + + return &gostatsd.Instance{ + ID: gostatsd.Source(aws.StringValue(instance.InstanceId)), + Tags: tags, + } +} + // Derives the region from a valid az name. // Returns an error if the az is known invalid (empty). func azToRegion(az string) (string, error) { @@ -197,12 +323,48 @@ func azToRegion(az string) (string, error) { return region, nil } +func isEventualConsistencyErr(err error) bool { + if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == "InvalidInstanceID.NotFound" { + return true + } + return false +} + +func multiError(errors []error) error { + errs := make([]string, 0, len(errors)+1) + errs = append(errs, fmt.Sprintf("%d errors occurred", len(errors))) + for _, err := range errors { + errs = append(errs, err.Error()) + } + return fmt.Errorf(strings.Join(errs, ", ")) +} + +func (p *Provider) isLocalIP(ip gostatsd.Source) bool { + if p.localIPMode == Deny { + return false + } + + return contains(p.localIPWhitelist, string(ip)) +} + +// contains checks if item is within slice +func contains(s []string, e string) bool { + for _, a := range s { + if a == e { + return true + } + } + return false +} + // NewProviderFromViper returns a new aws provider. func NewProviderFromViper(v *viper.Viper, logger logrus.FieldLogger, _ string) (gostatsd.CloudProvider, error) { a := util.GetSubViper(v, "aws") a.SetDefault("max_retries", 3) a.SetDefault("client_timeout", defaultClientTimeout) a.SetDefault("max_instances_batch", defaultMaxInstancesBatch) + a.SetDefault("local_ip_mode", defaultLocalIPMode) + a.SetDefault("local_ip_whitelist", defaultLocalIPWhitelist) httpTimeout := a.GetDuration("client_timeout") if httpTimeout <= 0 { return nil, errors.New("client timeout must be positive") @@ -211,6 +373,13 @@ func NewProviderFromViper(v *viper.Viper, logger logrus.FieldLogger, _ string) ( if maxInstances <= 0 { return nil, errors.New("max number of instances per batch must be positive") } + localIPMode := NewLocalIPMode(a.GetString("local_ip_mode")) + + var localIPWhitelist []string + + if localIPMode == Allow { + localIPWhitelist = a.GetStringSlice("local_ip_whitelist") + } // This is the main config without credentials. transport := &http.Transport{ @@ -254,9 +423,11 @@ func NewProviderFromViper(v *viper.Viper, logger logrus.FieldLogger, _ string) ( return nil, fmt.Errorf("error creating a new EC2 session: %v", err) } return &Provider{ - Metadata: metadata, - Ec2: ec2.New(ec2Session), - MaxInstances: maxInstances, - logger: logger, + Metadata: metadata, + Ec2: ec2.New(ec2Session), + MaxInstances: maxInstances, + logger: logger, + localIPMode: localIPMode, + localIPWhitelist: localIPWhitelist, }, nil }