From 55b672e4fc6696190c2cc6e5af26778bca16fdc8 Mon Sep 17 00:00:00 2001 From: Sergey <4070076+sergeykhegay@users.noreply.github.com> Date: Tue, 7 Jul 2020 11:14:05 -0700 Subject: [PATCH] Add v3 metadata support to ecs input (#7154) --- plugins/inputs/ecs/README.md | 44 +++++++++++++++--- plugins/inputs/ecs/client.go | 75 +++++++++++++++++++++++++------ plugins/inputs/ecs/client_test.go | 75 +++++++++++++++++++++++++++++++ plugins/inputs/ecs/ecs.go | 50 +++++++++++++++------ plugins/inputs/ecs/ecs_test.go | 64 ++++++++++++++++++++++++++ 5 files changed, 276 insertions(+), 32 deletions(-) diff --git a/plugins/inputs/ecs/README.md b/plugins/inputs/ecs/README.md index f23eb8bab04bf..9e3188eec30bf 100644 --- a/plugins/inputs/ecs/README.md +++ b/plugins/inputs/ecs/README.md @@ -1,8 +1,8 @@ # Amazon ECS Input Plugin -Amazon ECS, Fargate compatible, input plugin which uses the [Amazon ECS v2 metadata and -stats API][task-metadata-endpoint-v2] endpoints to gather stats on running -containers in a Task. +Amazon ECS, Fargate compatible, input plugin which uses the Amazon ECS metadata and +stats [v2][task-metadata-endpoint-v2] or [v3][task-metadata-endpoint-v3] API endpoints +to gather stats on running containers in a Task. The telegraf container must be run in the same Task as the workload it is inspecting. @@ -19,8 +19,41 @@ present in the metadata/stats endpoints. ```toml # Read metrics about ECS containers [[inputs.ecs]] - ## ECS metadata url - # endpoint_url = "http://169.254.170.2" + ## ECS metadata url. + ## Metadata v2 API is used if set explicitly. Otherwise, + ## v3 metadata endpoint API is used if available. + # endpoint_url = "" + + ## Containers to include and exclude. Globs accepted. + ## Note that an empty array for both will include all containers + # container_name_include = [] + # container_name_exclude = [] + + ## Container states to include and exclude. Globs accepted. + ## When empty only containers in the "RUNNING" state will be captured. + ## Possible values are "NONE", "PULLED", "CREATED", "RUNNING", + ## "RESOURCES_PROVISIONED", "STOPPED". + # container_status_include = [] + # container_status_exclude = [] + + ## ecs labels to include and exclude as tags. Globs accepted. + ## Note that an empty array for both will include all labels as tags + ecs_label_include = [ "com.amazonaws.ecs.*" ] + ecs_label_exclude = [] + + ## Timeout for queries. + # timeout = "5s" +``` + +### Configuration (enforce v2 metadata) + +```toml +# Read metrics about ECS containers +[[inputs.ecs]] + ## ECS metadata url. + ## Metadata v2 API is used if set explicitly. Otherwise, + ## v3 metadata endpoint API is used if available. + endpoint_url = "http://169.254.170.2" ## Containers to include and exclude. Globs accepted. ## Note that an empty array for both will include all containers @@ -210,3 +243,4 @@ ecs_container_meta,cluster=test,com.amazonaws.ecs.cluster=test,com.amazonaws.ecs [docker-input]: /plugins/inputs/docker/README.md [task-metadata-endpoint-v2]: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v2.html +[task-metadata-endpoint-v3] https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v3.html diff --git a/plugins/inputs/ecs/client.go b/plugins/inputs/ecs/client.go index 93074ad79c878..d7ce10cb2a2e0 100644 --- a/plugins/inputs/ecs/client.go +++ b/plugins/inputs/ecs/client.go @@ -12,8 +12,13 @@ import ( ) var ( - ecsMetadataPath, _ = url.Parse("/v2/metadata") - ecsMetaStatsPath, _ = url.Parse("/v2/stats") + // https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v2.html + ecsMetadataPath = "/v2/metadata" + ecsMetaStatsPath = "/v2/stats" + + // https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v3.html + ecsMetadataPathV3 = "/task" + ecsMetaStatsPathV3 = "/task/stats" ) // Client is the ECS client contract @@ -27,30 +32,78 @@ type httpClient interface { } // NewClient constructs an ECS client with the passed configuration params -func NewClient(timeout time.Duration) (*EcsClient, error) { +func NewClient(timeout time.Duration, endpoint string, version int) (*EcsClient, error) { + if version != 2 && version != 3 { + const msg = "expected metadata version 2 or 3, got %d" + return nil, fmt.Errorf(msg, version) + } + + baseURL, err := url.Parse(endpoint) + if err != nil { + return nil, err + } + c := &http.Client{ Timeout: timeout, } return &EcsClient{ - client: c, + client: c, + baseURL: baseURL, + taskURL: resolveTaskURL(baseURL, version), + statsURL: resolveStatsURL(baseURL, version), + version: version, }, nil } +func resolveTaskURL(base *url.URL, version int) string { + var path string + switch version { + case 2: + path = ecsMetadataPath + case 3: + path = ecsMetadataPathV3 + default: + // Should never happen. + const msg = "resolveTaskURL: unexpected version %d" + panic(fmt.Errorf(msg, version)) + } + return resolveURL(base, path) +} + +func resolveStatsURL(base *url.URL, version int) string { + var path string + switch version { + case 2: + path = ecsMetaStatsPath + case 3: + path = ecsMetaStatsPathV3 + default: + // Should never happen. + const msg = "resolveStatsURL: unexpected version %d" + panic(fmt.Errorf(msg, version)) + } + return resolveURL(base, path) +} + +// resolveURL returns a URL string by concatenating the string representation of base +// and path. This is consistent with AWS metadata documentation: +// https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v3.html#task-metadata-endpoint-v3-paths +func resolveURL(base *url.URL, path string) string { + return base.String() + path +} + // EcsClient contains ECS connection config type EcsClient struct { client httpClient - BaseURL *url.URL + version int + baseURL *url.URL taskURL string statsURL string } // Task calls the ECS metadata endpoint and returns a populated Task func (c *EcsClient) Task() (*Task, error) { - if c.taskURL == "" { - c.taskURL = c.BaseURL.ResolveReference(ecsMetadataPath).String() - } - req, _ := http.NewRequest("GET", c.taskURL, nil) resp, err := c.client.Do(req) if err != nil { @@ -74,10 +127,6 @@ func (c *EcsClient) Task() (*Task, error) { // ContainerStats calls the ECS stats endpoint and returns a populated container stats map func (c *EcsClient) ContainerStats() (map[string]types.StatsJSON, error) { - if c.statsURL == "" { - c.statsURL = c.BaseURL.ResolveReference(ecsMetaStatsPath).String() - } - req, _ := http.NewRequest("GET", c.statsURL, nil) resp, err := c.client.Do(req) if err != nil { diff --git a/plugins/inputs/ecs/client_test.go b/plugins/inputs/ecs/client_test.go index 6532e5d51d0b0..333aec80c2709 100644 --- a/plugins/inputs/ecs/client_test.go +++ b/plugins/inputs/ecs/client_test.go @@ -5,6 +5,7 @@ import ( "errors" "io/ioutil" "net/http" + "net/url" "os" "testing" @@ -238,3 +239,77 @@ func TestEcsClient_ContainerStats(t *testing.T) { }) } } + +func TestResolveTaskURL(t *testing.T) { + tests := []struct { + name string + base string + ver int + exp string + }{ + { + name: "default v2 endpoint", + base: v2Endpoint, + ver: 2, + exp: "http://169.254.170.2/v2/metadata", + }, + { + name: "custom v2 endpoint", + base: "http://192.168.0.1", + ver: 2, + exp: "http://192.168.0.1/v2/metadata", + }, + { + name: "theoretical v3 endpoint", + base: "http://169.254.170.2/v3/metadata", + ver: 3, + exp: "http://169.254.170.2/v3/metadata/task", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + baseURL, err := url.Parse(tt.base) + assert.NoError(t, err) + + act := resolveTaskURL(baseURL, tt.ver) + assert.Equal(t, tt.exp, act) + }) + } +} + +func TestResolveStatsURL(t *testing.T) { + tests := []struct { + name string + base string + ver int + exp string + }{ + { + name: "default v2 endpoint", + base: v2Endpoint, + ver: 2, + exp: "http://169.254.170.2/v2/stats", + }, + { + name: "custom v2 endpoint", + base: "http://192.168.0.1", + ver: 2, + exp: "http://192.168.0.1/v2/stats", + }, + { + name: "theoretical v3 endpoint", + base: "http://169.254.170.2/v3/metadata", + ver: 3, + exp: "http://169.254.170.2/v3/metadata/task/stats", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + baseURL, err := url.Parse(tt.base) + assert.NoError(t, err) + + act := resolveStatsURL(baseURL, tt.ver) + assert.Equal(t, tt.exp, act) + }) + } +} diff --git a/plugins/inputs/ecs/ecs.go b/plugins/inputs/ecs/ecs.go index b3fe5f347bdd6..5fa53d4fd58bc 100644 --- a/plugins/inputs/ecs/ecs.go +++ b/plugins/inputs/ecs/ecs.go @@ -1,7 +1,7 @@ package ecs import ( - "net/url" + "os" "strings" "time" @@ -25,13 +25,14 @@ type Ecs struct { LabelInclude []string `toml:"ecs_label_include"` LabelExclude []string `toml:"ecs_label_exclude"` - newClient func(timeout time.Duration) (*EcsClient, error) + newClient func(timeout time.Duration, endpoint string, version int) (*EcsClient, error) client Client filtersCreated bool labelFilter filter.Filter containerNameFilter filter.Filter statusFilter filter.Filter + metadataVersion int } const ( @@ -40,11 +41,15 @@ const ( GB = 1000 * MB TB = 1000 * GB PB = 1000 * TB + + v2Endpoint = "http://169.254.170.2" ) var sampleConfig = ` - ## ECS metadata url - # endpoint_url = "http://169.254.170.2" + ## ECS metadata url. + ## Metadata v2 API is used if set explicitly. Otherwise, + ## v3 metadata endpoint API is used if available. + # endpoint_url = "" ## Containers to include and exclude. Globs accepted. ## Note that an empty array for both will include all containers @@ -69,7 +74,7 @@ var sampleConfig = ` // Description describes ECS plugin func (ecs *Ecs) Description() string { - return "Read metrics about docker containers from Fargate/ECS v2 meta endpoints." + return "Read metrics about docker containers from Fargate/ECS v2, v3 meta endpoints." } // SampleConfig returns the ECS example config @@ -107,18 +112,12 @@ func (ecs *Ecs) Gather(acc telegraf.Accumulator) error { func initSetup(ecs *Ecs) error { if ecs.client == nil { - var err error - var c *EcsClient - c, err = ecs.newClient(ecs.Timeout.Duration) - if err != nil { - return err - } + resolveEndpoint(ecs) - c.BaseURL, err = url.Parse(ecs.EndpointURL) + c, err := ecs.newClient(ecs.Timeout.Duration, ecs.EndpointURL, ecs.metadataVersion) if err != nil { return err } - ecs.client = c } @@ -142,6 +141,29 @@ func initSetup(ecs *Ecs) error { return nil } +func resolveEndpoint(ecs *Ecs) { + if ecs.EndpointURL != "" { + // Use metadata v2 API since endpoint is set explicitly. + ecs.metadataVersion = 2 + return + } + + // Auto-detect metadata endpoint version. + + // Use metadata v3 if available. + // https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v3.html + v3Endpoint := os.Getenv("ECS_CONTAINER_METADATA_URI") + if v3Endpoint != "" { + ecs.EndpointURL = v3Endpoint + ecs.metadataVersion = 3 + return + } + + // Use v2 endpoint if nothing else is available. + ecs.EndpointURL = v2Endpoint + ecs.metadataVersion = 2 +} + func (ecs *Ecs) accTask(task *Task, tags map[string]string, acc telegraf.Accumulator) { taskFields := map[string]interface{}{ "revision": task.Revision, @@ -240,7 +262,7 @@ func (ecs *Ecs) createContainerStatusFilters() error { func init() { inputs.Add("ecs", func() telegraf.Input { return &Ecs{ - EndpointURL: "http://169.254.170.2", + EndpointURL: "", Timeout: internal.Duration{Duration: 5 * time.Second}, newClient: NewClient, filtersCreated: false, diff --git a/plugins/inputs/ecs/ecs_test.go b/plugins/inputs/ecs/ecs_test.go index b105a433fa8ef..5d64fef01efad 100644 --- a/plugins/inputs/ecs/ecs_test.go +++ b/plugins/inputs/ecs/ecs_test.go @@ -1,9 +1,12 @@ package ecs import ( + "os" + "testing" "time" "github.com/docker/docker/api/types" + "github.com/stretchr/testify/assert" ) // codified golden objects for tests @@ -765,3 +768,64 @@ var validMeta = Task{ PullStartedAt: metaPullStart, PullStoppedAt: metaPullStop, } + +func TestResolveEndpoint(t *testing.T) { + tests := []struct { + name string + given Ecs + exp Ecs + preF func() + afterF func() + }{ + { + name: "Endpoint is explicitly set => use v2 metadata", + given: Ecs{ + EndpointURL: "192.162.0.1/custom_endpoint", + }, + exp: Ecs{ + EndpointURL: "192.162.0.1/custom_endpoint", + metadataVersion: 2, + }, + }, + { + name: "Endpoint is not set, ECS_CONTAINER_METADATA_URI is not set => use v2 metadata", + given: Ecs{ + EndpointURL: "", + }, + exp: Ecs{ + EndpointURL: v2Endpoint, + metadataVersion: 2, + }, + }, + { + name: "Endpoint is not set, ECS_CONTAINER_METADATA_URI is set => use v3 metadata", + preF: func() { + os.Setenv("ECS_CONTAINER_METADATA_URI", "v3-endpoint.local") + }, + afterF: func() { + os.Unsetenv("ECS_CONTAINER_METADATA_URI") + }, + given: Ecs{ + EndpointURL: "", + }, + exp: Ecs{ + EndpointURL: "v3-endpoint.local", + metadataVersion: 3, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.preF != nil { + tt.preF() + } + if tt.afterF != nil { + defer tt.afterF() + } + + act := tt.given + resolveEndpoint(&act) + assert.Equal(t, tt.exp, act) + }) + } +}