Skip to content

Commit

Permalink
Add v3 metadata support to ecs input (influxdata#7154)
Browse files Browse the repository at this point in the history
  • Loading branch information
sergeykhegay authored Jul 7, 2020
1 parent 07f601f commit 55b672e
Show file tree
Hide file tree
Showing 5 changed files with 276 additions and 32 deletions.
44 changes: 39 additions & 5 deletions plugins/inputs/ecs/README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# Amazon ECS Input Plugin

Amazon ECS, Fargate compatible, input plugin which uses the [Amazon ECS v2 metadata and
stats API][task-metadata-endpoint-v2] endpoints to gather stats on running
containers in a Task.
Amazon ECS, Fargate compatible, input plugin which uses the Amazon ECS metadata and
stats [v2][task-metadata-endpoint-v2] or [v3][task-metadata-endpoint-v3] API endpoints
to gather stats on running containers in a Task.

The telegraf container must be run in the same Task as the workload it is
inspecting.
Expand All @@ -19,8 +19,41 @@ present in the metadata/stats endpoints.
```toml
# Read metrics about ECS containers
[[inputs.ecs]]
## ECS metadata url
# endpoint_url = "http://169.254.170.2"
## ECS metadata url.
## Metadata v2 API is used if set explicitly. Otherwise,
## v3 metadata endpoint API is used if available.
# endpoint_url = ""

## Containers to include and exclude. Globs accepted.
## Note that an empty array for both will include all containers
# container_name_include = []
# container_name_exclude = []

## Container states to include and exclude. Globs accepted.
## When empty only containers in the "RUNNING" state will be captured.
## Possible values are "NONE", "PULLED", "CREATED", "RUNNING",
## "RESOURCES_PROVISIONED", "STOPPED".
# container_status_include = []
# container_status_exclude = []

## ecs labels to include and exclude as tags. Globs accepted.
## Note that an empty array for both will include all labels as tags
ecs_label_include = [ "com.amazonaws.ecs.*" ]
ecs_label_exclude = []

## Timeout for queries.
# timeout = "5s"
```

### Configuration (enforce v2 metadata)

```toml
# Read metrics about ECS containers
[[inputs.ecs]]
## ECS metadata url.
## Metadata v2 API is used if set explicitly. Otherwise,
## v3 metadata endpoint API is used if available.
endpoint_url = "http://169.254.170.2"

## Containers to include and exclude. Globs accepted.
## Note that an empty array for both will include all containers
Expand Down Expand Up @@ -210,3 +243,4 @@ ecs_container_meta,cluster=test,com.amazonaws.ecs.cluster=test,com.amazonaws.ecs

[docker-input]: /plugins/inputs/docker/README.md
[task-metadata-endpoint-v2]: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v2.html
[task-metadata-endpoint-v3] https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v3.html
75 changes: 62 additions & 13 deletions plugins/inputs/ecs/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,13 @@ import (
)

var (
ecsMetadataPath, _ = url.Parse("/v2/metadata")
ecsMetaStatsPath, _ = url.Parse("/v2/stats")
// https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v2.html
ecsMetadataPath = "/v2/metadata"
ecsMetaStatsPath = "/v2/stats"

// https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v3.html
ecsMetadataPathV3 = "/task"
ecsMetaStatsPathV3 = "/task/stats"
)

// Client is the ECS client contract
Expand All @@ -27,30 +32,78 @@ type httpClient interface {
}

// NewClient constructs an ECS client with the passed configuration params
func NewClient(timeout time.Duration) (*EcsClient, error) {
func NewClient(timeout time.Duration, endpoint string, version int) (*EcsClient, error) {
if version != 2 && version != 3 {
const msg = "expected metadata version 2 or 3, got %d"
return nil, fmt.Errorf(msg, version)
}

baseURL, err := url.Parse(endpoint)
if err != nil {
return nil, err
}

c := &http.Client{
Timeout: timeout,
}

return &EcsClient{
client: c,
client: c,
baseURL: baseURL,
taskURL: resolveTaskURL(baseURL, version),
statsURL: resolveStatsURL(baseURL, version),
version: version,
}, nil
}

func resolveTaskURL(base *url.URL, version int) string {
var path string
switch version {
case 2:
path = ecsMetadataPath
case 3:
path = ecsMetadataPathV3
default:
// Should never happen.
const msg = "resolveTaskURL: unexpected version %d"
panic(fmt.Errorf(msg, version))
}
return resolveURL(base, path)
}

func resolveStatsURL(base *url.URL, version int) string {
var path string
switch version {
case 2:
path = ecsMetaStatsPath
case 3:
path = ecsMetaStatsPathV3
default:
// Should never happen.
const msg = "resolveStatsURL: unexpected version %d"
panic(fmt.Errorf(msg, version))
}
return resolveURL(base, path)
}

// resolveURL returns a URL string by concatenating the string representation of base
// and path. This is consistent with AWS metadata documentation:
// https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v3.html#task-metadata-endpoint-v3-paths
func resolveURL(base *url.URL, path string) string {
return base.String() + path
}

// EcsClient contains ECS connection config
type EcsClient struct {
client httpClient
BaseURL *url.URL
version int
baseURL *url.URL
taskURL string
statsURL string
}

// Task calls the ECS metadata endpoint and returns a populated Task
func (c *EcsClient) Task() (*Task, error) {
if c.taskURL == "" {
c.taskURL = c.BaseURL.ResolveReference(ecsMetadataPath).String()
}

req, _ := http.NewRequest("GET", c.taskURL, nil)
resp, err := c.client.Do(req)
if err != nil {
Expand All @@ -74,10 +127,6 @@ func (c *EcsClient) Task() (*Task, error) {

// ContainerStats calls the ECS stats endpoint and returns a populated container stats map
func (c *EcsClient) ContainerStats() (map[string]types.StatsJSON, error) {
if c.statsURL == "" {
c.statsURL = c.BaseURL.ResolveReference(ecsMetaStatsPath).String()
}

req, _ := http.NewRequest("GET", c.statsURL, nil)
resp, err := c.client.Do(req)
if err != nil {
Expand Down
75 changes: 75 additions & 0 deletions plugins/inputs/ecs/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"io/ioutil"
"net/http"
"net/url"
"os"
"testing"

Expand Down Expand Up @@ -238,3 +239,77 @@ func TestEcsClient_ContainerStats(t *testing.T) {
})
}
}

func TestResolveTaskURL(t *testing.T) {
tests := []struct {
name string
base string
ver int
exp string
}{
{
name: "default v2 endpoint",
base: v2Endpoint,
ver: 2,
exp: "http://169.254.170.2/v2/metadata",
},
{
name: "custom v2 endpoint",
base: "http://192.168.0.1",
ver: 2,
exp: "http://192.168.0.1/v2/metadata",
},
{
name: "theoretical v3 endpoint",
base: "http://169.254.170.2/v3/metadata",
ver: 3,
exp: "http://169.254.170.2/v3/metadata/task",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
baseURL, err := url.Parse(tt.base)
assert.NoError(t, err)

act := resolveTaskURL(baseURL, tt.ver)
assert.Equal(t, tt.exp, act)
})
}
}

func TestResolveStatsURL(t *testing.T) {
tests := []struct {
name string
base string
ver int
exp string
}{
{
name: "default v2 endpoint",
base: v2Endpoint,
ver: 2,
exp: "http://169.254.170.2/v2/stats",
},
{
name: "custom v2 endpoint",
base: "http://192.168.0.1",
ver: 2,
exp: "http://192.168.0.1/v2/stats",
},
{
name: "theoretical v3 endpoint",
base: "http://169.254.170.2/v3/metadata",
ver: 3,
exp: "http://169.254.170.2/v3/metadata/task/stats",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
baseURL, err := url.Parse(tt.base)
assert.NoError(t, err)

act := resolveStatsURL(baseURL, tt.ver)
assert.Equal(t, tt.exp, act)
})
}
}
50 changes: 36 additions & 14 deletions plugins/inputs/ecs/ecs.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package ecs

import (
"net/url"
"os"
"strings"
"time"

Expand All @@ -25,13 +25,14 @@ type Ecs struct {
LabelInclude []string `toml:"ecs_label_include"`
LabelExclude []string `toml:"ecs_label_exclude"`

newClient func(timeout time.Duration) (*EcsClient, error)
newClient func(timeout time.Duration, endpoint string, version int) (*EcsClient, error)

client Client
filtersCreated bool
labelFilter filter.Filter
containerNameFilter filter.Filter
statusFilter filter.Filter
metadataVersion int
}

const (
Expand All @@ -40,11 +41,15 @@ const (
GB = 1000 * MB
TB = 1000 * GB
PB = 1000 * TB

v2Endpoint = "http://169.254.170.2"
)

var sampleConfig = `
## ECS metadata url
# endpoint_url = "http://169.254.170.2"
## ECS metadata url.
## Metadata v2 API is used if set explicitly. Otherwise,
## v3 metadata endpoint API is used if available.
# endpoint_url = ""
## Containers to include and exclude. Globs accepted.
## Note that an empty array for both will include all containers
Expand All @@ -69,7 +74,7 @@ var sampleConfig = `

// Description describes ECS plugin
func (ecs *Ecs) Description() string {
return "Read metrics about docker containers from Fargate/ECS v2 meta endpoints."
return "Read metrics about docker containers from Fargate/ECS v2, v3 meta endpoints."
}

// SampleConfig returns the ECS example config
Expand Down Expand Up @@ -107,18 +112,12 @@ func (ecs *Ecs) Gather(acc telegraf.Accumulator) error {

func initSetup(ecs *Ecs) error {
if ecs.client == nil {
var err error
var c *EcsClient
c, err = ecs.newClient(ecs.Timeout.Duration)
if err != nil {
return err
}
resolveEndpoint(ecs)

c.BaseURL, err = url.Parse(ecs.EndpointURL)
c, err := ecs.newClient(ecs.Timeout.Duration, ecs.EndpointURL, ecs.metadataVersion)
if err != nil {
return err
}

ecs.client = c
}

Expand All @@ -142,6 +141,29 @@ func initSetup(ecs *Ecs) error {
return nil
}

func resolveEndpoint(ecs *Ecs) {
if ecs.EndpointURL != "" {
// Use metadata v2 API since endpoint is set explicitly.
ecs.metadataVersion = 2
return
}

// Auto-detect metadata endpoint version.

// Use metadata v3 if available.
// https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v3.html
v3Endpoint := os.Getenv("ECS_CONTAINER_METADATA_URI")
if v3Endpoint != "" {
ecs.EndpointURL = v3Endpoint
ecs.metadataVersion = 3
return
}

// Use v2 endpoint if nothing else is available.
ecs.EndpointURL = v2Endpoint
ecs.metadataVersion = 2
}

func (ecs *Ecs) accTask(task *Task, tags map[string]string, acc telegraf.Accumulator) {
taskFields := map[string]interface{}{
"revision": task.Revision,
Expand Down Expand Up @@ -240,7 +262,7 @@ func (ecs *Ecs) createContainerStatusFilters() error {
func init() {
inputs.Add("ecs", func() telegraf.Input {
return &Ecs{
EndpointURL: "http://169.254.170.2",
EndpointURL: "",
Timeout: internal.Duration{Duration: 5 * time.Second},
newClient: NewClient,
filtersCreated: false,
Expand Down
Loading

0 comments on commit 55b672e

Please sign in to comment.