Skip to content

Commit

Permalink
Document and add support to input plugins for logging alias (influxda…
Browse files Browse the repository at this point in the history
  • Loading branch information
glinton authored and danielnelson committed Sep 23, 2019
1 parent e42d2e3 commit 817c9a6
Show file tree
Hide file tree
Showing 111 changed files with 958 additions and 656 deletions.
1 change: 1 addition & 0 deletions docs/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ driven operation.

Parameters that can be used with any input plugin:

- **alias**: Name an instance of a plugin.
- **interval**: How often to gather this metric. Normal plugins use a single
global interval, but if one particular input should be run less or more
often, you can configure that here.
Expand Down
2 changes: 1 addition & 1 deletion internal/models/running_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (r *RunningAggregator) Add(m telegraf.Metric) bool {
defer r.Unlock()

if m.Time().Before(r.periodStart.Add(-r.Config.Grace)) || m.Time().After(r.periodEnd.Add(r.Config.Delay)) {
r.log.Debugf("metric is outside aggregation window; discarding. %s: m: %s e: %s g: %s",
r.log.Debugf("Metric is outside aggregation window; discarding. %s: m: %s e: %s g: %s",
m.Time(), r.periodStart, r.periodEnd, r.Config.Grace)
r.MetricsDropped.Incr(1)
return r.Config.DropOriginal
Expand Down
1 change: 1 addition & 0 deletions plugins/aggregators/basicstats/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ emitting the aggregate every `period` seconds.
[[aggregators.basicstats]]
## The period on which to flush & clear the aggregator.
period = "30s"

## If true, the original metric will be dropped by the
## aggregator and will not get sent to the output plugins.
drop_original = false
Expand Down
110 changes: 50 additions & 60 deletions plugins/aggregators/basicstats/basicstats.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package basicstats

import (
"log"
"math"

"github.com/influxdata/telegraf"
Expand All @@ -10,6 +9,7 @@ import (

type BasicStats struct {
Stats []string `toml:"stats"`
Log telegraf.Logger

cache map[uint64]aggregate
statsConfig *configuredStats
Expand All @@ -28,9 +28,9 @@ type configuredStats struct {
}

func NewBasicStats() *BasicStats {
mm := &BasicStats{}
mm.Reset()
return mm
return &BasicStats{
cache: make(map[uint64]aggregate),
}
}

type aggregate struct {
Expand All @@ -53,6 +53,7 @@ type basicstats struct {
var sampleConfig = `
## The period on which to flush & clear the aggregator.
period = "30s"
## If true, the original metric will be dropped by the
## aggregator and will not get sent to the output plugins.
drop_original = false
Expand All @@ -61,17 +62,17 @@ var sampleConfig = `
# stats = ["count", "min", "max", "mean", "stdev", "s2", "sum"]
`

func (m *BasicStats) SampleConfig() string {
func (*BasicStats) SampleConfig() string {
return sampleConfig
}

func (m *BasicStats) Description() string {
func (*BasicStats) Description() string {
return "Keep the aggregate basicstats of each metric passing through."
}

func (m *BasicStats) Add(in telegraf.Metric) {
func (b *BasicStats) Add(in telegraf.Metric) {
id := in.HashID()
if _, ok := m.cache[id]; !ok {
if _, ok := b.cache[id]; !ok {
// hit an uncached metric, create caches for first time:
a := aggregate{
name: in.Name(),
Expand All @@ -92,13 +93,13 @@ func (m *BasicStats) Add(in telegraf.Metric) {
}
}
}
m.cache[id] = a
b.cache[id] = a
} else {
for _, field := range in.FieldList() {
if fv, ok := convert(field.Value); ok {
if _, ok := m.cache[id].fields[field.Key]; !ok {
if _, ok := b.cache[id].fields[field.Key]; !ok {
// hit an uncached field of a cached metric
m.cache[id].fields[field.Key] = basicstats{
b.cache[id].fields[field.Key] = basicstats{
count: 1,
min: fv,
max: fv,
Expand All @@ -111,7 +112,7 @@ func (m *BasicStats) Add(in telegraf.Metric) {
continue
}

tmp := m.cache[id].fields[field.Key]
tmp := b.cache[id].fields[field.Key]
//https://en.m.wikipedia.org/wiki/Algorithms_for_calculating_variance
//variable initialization
x := fv
Expand All @@ -138,49 +139,47 @@ func (m *BasicStats) Add(in telegraf.Metric) {
//diff compute
tmp.diff = fv - tmp.LAST
//store final data
m.cache[id].fields[field.Key] = tmp
b.cache[id].fields[field.Key] = tmp
}
}
}
}

func (m *BasicStats) Push(acc telegraf.Accumulator) {
config := getConfiguredStats(m)

for _, aggregate := range m.cache {
func (b *BasicStats) Push(acc telegraf.Accumulator) {
for _, aggregate := range b.cache {
fields := map[string]interface{}{}
for k, v := range aggregate.fields {

if config.count {
if b.statsConfig.count {
fields[k+"_count"] = v.count
}
if config.min {
if b.statsConfig.min {
fields[k+"_min"] = v.min
}
if config.max {
if b.statsConfig.max {
fields[k+"_max"] = v.max
}
if config.mean {
if b.statsConfig.mean {
fields[k+"_mean"] = v.mean
}
if config.sum {
if b.statsConfig.sum {
fields[k+"_sum"] = v.sum
}

//v.count always >=1
if v.count > 1 {
variance := v.M2 / (v.count - 1)

if config.variance {
if b.statsConfig.variance {
fields[k+"_s2"] = variance
}
if config.stdev {
if b.statsConfig.stdev {
fields[k+"_stdev"] = math.Sqrt(variance)
}
if config.diff {
if b.statsConfig.diff {
fields[k+"_diff"] = v.diff
}
if config.non_negative_diff && v.diff >= 0 {
if b.statsConfig.non_negative_diff && v.diff >= 0 {
fields[k+"_non_negative_diff"] = v.diff
}

Expand All @@ -194,14 +193,12 @@ func (m *BasicStats) Push(acc telegraf.Accumulator) {
}
}

func parseStats(names []string) *configuredStats {

// member function for logging.
func (b *BasicStats) parseStats() *configuredStats {
parsed := &configuredStats{}

for _, name := range names {

for _, name := range b.Stats {
switch name {

case "count":
parsed.count = true
case "min":
Expand All @@ -222,45 +219,32 @@ func parseStats(names []string) *configuredStats {
parsed.non_negative_diff = true

default:
log.Printf("W! Unrecognized basic stat '%s', ignoring", name)
b.Log.Warnf("Unrecognized basic stat %q, ignoring", name)
}
}

return parsed
}

func defaultStats() *configuredStats {

defaults := &configuredStats{}

defaults.count = true
defaults.min = true
defaults.max = true
defaults.mean = true
defaults.variance = true
defaults.stdev = true
defaults.sum = false
defaults.non_negative_diff = false

return defaults
}

func getConfiguredStats(m *BasicStats) *configuredStats {

if m.statsConfig == nil {

if m.Stats == nil {
m.statsConfig = defaultStats()
} else {
m.statsConfig = parseStats(m.Stats)
func (b *BasicStats) getConfiguredStats() {
if b.Stats == nil {
b.statsConfig = &configuredStats{
count: true,
min: true,
max: true,
mean: true,
variance: true,
stdev: true,
sum: false,
non_negative_diff: false,
}
} else {
b.statsConfig = b.parseStats()
}

return m.statsConfig
}

func (m *BasicStats) Reset() {
m.cache = make(map[uint64]aggregate)
func (b *BasicStats) Reset() {
b.cache = make(map[uint64]aggregate)
}

func convert(in interface{}) (float64, bool) {
Expand All @@ -276,6 +260,12 @@ func convert(in interface{}) (float64, bool) {
}
}

func (b *BasicStats) Init() error {
b.getConfiguredStats()

return nil
}

func init() {
aggregators.Add("basicstats", func() telegraf.Aggregator {
return NewBasicStats()
Expand Down
Loading

0 comments on commit 817c9a6

Please sign in to comment.