Skip to content

Commit

Permalink
Add ability to limit node stats in elasticsearch input (influxdata#3304)
Browse files Browse the repository at this point in the history
  • Loading branch information
meilke authored and danielnelson committed Oct 6, 2017
1 parent 59bb31e commit 75567d5
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 14 deletions.
5 changes: 5 additions & 0 deletions plugins/inputs/elasticsearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ or [cluster-stats](https://www.elastic.co/guide/en/elasticsearch/reference/curre
## Master node.
cluster_stats = false
## node_stats is a list of sub-stats that you want to have gathered. Valid options
## are "indices", "os", "process", "jvm", "thread_pool", "fs", "transport", "http",
## "breakers". Per default, all stats are gathered.
# node_stats = ["jvm", "http"]
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
Expand Down
34 changes: 28 additions & 6 deletions plugins/inputs/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ const sampleConfig = `
## Master node.
cluster_stats = false
## node_stats is a list of sub-stats that you want to have gathered. Valid options
## are "indices", "os", "process", "jvm", "thread_pool", "fs", "transport", "http",
## "breakers". Per default, all stats are gathered.
# node_stats = ["jvm", "http"]
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
Expand All @@ -120,6 +125,7 @@ type Elasticsearch struct {
ClusterHealth bool
ClusterHealthLevel string
ClusterStats bool
NodeStats []string
SSLCA string `toml:"ssl_ca"` // Path to CA file
SSLCert string `toml:"ssl_cert"` // Path to host cert file
SSLKey string `toml:"ssl_key"` // Path to cert key file
Expand Down Expand Up @@ -165,12 +171,7 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
for _, serv := range e.Servers {
go func(s string, acc telegraf.Accumulator) {
defer wg.Done()
var url string
if e.Local {
url = s + statsPathLocal
} else {
url = s + statsPath
}
url := e.nodeStatsUrl(s)
e.isMaster = false

if e.ClusterStats {
Expand Down Expand Up @@ -229,6 +230,22 @@ func (e *Elasticsearch) createHttpClient() (*http.Client, error) {
return client, nil
}

func (e *Elasticsearch) nodeStatsUrl(baseUrl string) string {
var url string

if e.Local {
url = baseUrl + statsPathLocal
} else {
url = baseUrl + statsPath
}

if len(e.NodeStats) == 0 {
return url
}

return fmt.Sprintf("%s/%s", url, strings.Join(e.NodeStats, ","))
}

func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) error {
nodeStats := &struct {
ClusterName string `json:"cluster_name"`
Expand Down Expand Up @@ -269,6 +286,11 @@ func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) er

now := time.Now()
for p, s := range stats {
// if one of the individual node stats is not even in the
// original result
if s == nil {
continue
}
f := jsonparser.JSONFlattener{}
// parse Json, ignoring strings and bools
err := f.FlattenJSON("", s)
Expand Down
45 changes: 37 additions & 8 deletions plugins/inputs/elasticsearch/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@ import (
"github.com/stretchr/testify/require"
)

func defaultTags() map[string]string {
return map[string]string{
"cluster_name": "es-testcluster",
"node_attribute_master": "true",
"node_id": "SDFsfSDFsdfFSDSDfSFDSDF",
"node_name": "test.host.com",
"node_host": "test",
}
}

type transportMock struct {
statusCode int
body string
Expand Down Expand Up @@ -45,15 +55,9 @@ func checkIsMaster(es *Elasticsearch, expected bool, t *testing.T) {
assert.Fail(t, msg)
}
}
func checkNodeStatsResult(t *testing.T, acc *testutil.Accumulator) {
tags := map[string]string{
"cluster_name": "es-testcluster",
"node_attribute_master": "true",
"node_id": "SDFsfSDFsdfFSDSDfSFDSDF",
"node_name": "test.host.com",
"node_host": "test",
}

func checkNodeStatsResult(t *testing.T, acc *testutil.Accumulator) {
tags := defaultTags()
acc.AssertContainsTaggedFields(t, "elasticsearch_indices", nodestatsIndicesExpected, tags)
acc.AssertContainsTaggedFields(t, "elasticsearch_os", nodestatsOsExpected, tags)
acc.AssertContainsTaggedFields(t, "elasticsearch_process", nodestatsProcessExpected, tags)
Expand All @@ -79,6 +83,31 @@ func TestGather(t *testing.T) {
checkNodeStatsResult(t, &acc)
}

func TestGatherIndividualStats(t *testing.T) {
es := newElasticsearchWithClient()
es.Servers = []string{"http://example.com:9200"}
es.NodeStats = []string{"jvm", "process"}
es.client.Transport = newTransportMock(http.StatusOK, nodeStatsResponseJVMProcess)

var acc testutil.Accumulator
if err := acc.GatherError(es.Gather); err != nil {
t.Fatal(err)
}

checkIsMaster(es, false, t)

tags := defaultTags()
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_indices", nodestatsIndicesExpected, tags)
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_os", nodestatsOsExpected, tags)
acc.AssertContainsTaggedFields(t, "elasticsearch_process", nodestatsProcessExpected, tags)
acc.AssertContainsTaggedFields(t, "elasticsearch_jvm", nodestatsJvmExpected, tags)
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_thread_pool", nodestatsThreadPoolExpected, tags)
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_fs", nodestatsFsExpected, tags)
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_transport", nodestatsTransportExpected, tags)
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_http", nodestatsHttpExpected, tags)
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_breakers", nodestatsBreakersExpected, tags)
}

func TestGatherNodeStats(t *testing.T) {
es := newElasticsearchWithClient()
es.Servers = []string{"http://example.com:9200"}
Expand Down
94 changes: 94 additions & 0 deletions plugins/inputs/elasticsearch/testdata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,100 @@ const nodeStatsResponse = `
}
`

const nodeStatsResponseJVMProcess = `
{
"cluster_name": "es-testcluster",
"nodes": {
"SDFsfSDFsdfFSDSDfSFDSDF": {
"timestamp": 1436365550135,
"name": "test.host.com",
"transport_address": "inet[/127.0.0.1:9300]",
"host": "test",
"ip": [
"inet[/127.0.0.1:9300]",
"NONE"
],
"attributes": {
"master": "true"
},
"process": {
"timestamp": 1436460392945,
"open_file_descriptors": 160,
"cpu": {
"percent": 2,
"sys_in_millis": 1870,
"user_in_millis": 13610,
"total_in_millis": 15480
},
"mem": {
"total_virtual_in_bytes": 4747890688
}
},
"jvm": {
"timestamp": 1436460392945,
"uptime_in_millis": 202245,
"mem": {
"heap_used_in_bytes": 52709568,
"heap_used_percent": 5,
"heap_committed_in_bytes": 259522560,
"heap_max_in_bytes": 1038876672,
"non_heap_used_in_bytes": 39634576,
"non_heap_committed_in_bytes": 40841216,
"pools": {
"young": {
"used_in_bytes": 32685760,
"max_in_bytes": 279183360,
"peak_used_in_bytes": 71630848,
"peak_max_in_bytes": 279183360
},
"survivor": {
"used_in_bytes": 8912880,
"max_in_bytes": 34865152,
"peak_used_in_bytes": 8912888,
"peak_max_in_bytes": 34865152
},
"old": {
"used_in_bytes": 11110928,
"max_in_bytes": 724828160,
"peak_used_in_bytes": 14354608,
"peak_max_in_bytes": 724828160
}
}
},
"threads": {
"count": 44,
"peak_count": 45
},
"gc": {
"collectors": {
"young": {
"collection_count": 2,
"collection_time_in_millis": 98
},
"old": {
"collection_count": 1,
"collection_time_in_millis": 24
}
}
},
"buffer_pools": {
"direct": {
"count": 40,
"used_in_bytes": 6304239,
"total_capacity_in_bytes": 6304239
},
"mapped": {
"count": 0,
"used_in_bytes": 0,
"total_capacity_in_bytes": 0
}
}
}
}
}
}
`

var nodestatsIndicesExpected = map[string]interface{}{
"id_cache_memory_size_in_bytes": float64(0),
"completion_size_in_bytes": float64(0),
Expand Down

0 comments on commit 75567d5

Please sign in to comment.