Skip to content

Commit

Permalink
Added min, max, measurement and field filtering
Browse files Browse the repository at this point in the history
  • Loading branch information
tsvtitan committed Oct 20, 2023
1 parent 0f72f7b commit 7937355
Showing 1 changed file with 96 additions and 4 deletions.
100 changes: 96 additions & 4 deletions plugins/processors/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
_ "embed"
"fmt"
"regexp"
"strconv"
"strings"

"github.com/devopsext/utils"
Expand All @@ -12,9 +13,17 @@ import (
)

type FilterIf struct {
Disabled bool `toml:"disabled"`
Tags map[string]string `toml:"tags"`
tags map[string]*regexp.Regexp
Measurement string `toml:"measurement,omitempty"`
Field string `toml:"field,omitempty"`
Min interface{} `toml:"min,omitempty"`
Max interface{} `toml:"max,omitempty"`
Disabled bool `toml:"disabled"`
Tags map[string]string `toml:"tags"`
measurement *regexp.Regexp
field *regexp.Regexp
min float64
max float64
tags map[string]*regexp.Regexp
}

type Filter struct {
Expand Down Expand Up @@ -80,18 +89,70 @@ func (f *Filter) ifCondition(item *FilterIf, metric telegraf.Metric) bool {
return flag
}

func (f *Filter) skipFields(item *FilterIf, metric telegraf.Metric) bool {

if item.field == nil {
return false
}
for k := range metric.Fields() {
if !item.field.MatchString(k) {
return true
}
}
return false
}

func (f *Filter) skipMinMax(item *FilterIf, metric telegraf.Metric) bool {

if item.Min == nil && item.Max == nil {
return false
}
for _, field := range metric.Fields() {
v, err := strconv.ParseFloat(fmt.Sprintf("%v", field), 64)
if err != nil {
return true
}
if item.Min != nil && v < item.min {
return true
}
if item.Max != nil && v > item.max {
return true
}
}
return false
}

func (f *Filter) Apply(in ...telegraf.Metric) []telegraf.Metric {

orAnd := f.Condition != "AND"

for _, metric := range in {

measurement := metric.Name()

flag := len(f.Ifs) > 0
if orAnd {
flag = false
}

for _, item := range f.Ifs {

if item.Disabled {
continue
}

if item.measurement != nil && !item.measurement.MatchString(measurement) {
continue
}

if f.skipFields(item, metric) {
continue
}

if f.skipMinMax(item, metric) {
continue
}

exists := f.ifCondition(item, metric)
if orAnd {
flag = flag || exists
Expand All @@ -116,6 +177,33 @@ func (f *Filter) Apply(in ...telegraf.Metric) []telegraf.Metric {
func (f *Filter) setTags() {

for _, item := range f.Ifs {

if strings.TrimSpace(item.Measurement) != "" {
item.measurement = regexp.MustCompile(item.Measurement)
}

if strings.TrimSpace(item.Field) != "" {
item.field = regexp.MustCompile(item.Field)
}

if item.Min != nil {
v, err := strconv.ParseFloat(fmt.Sprintf("%v", item.Min), 64)
if err != nil {
item.Min = nil
} else {
item.min = v
}
}

if item.Max != nil {
v, err := strconv.ParseFloat(fmt.Sprintf("%v", item.Max), 64)
if err != nil {
item.Max = nil
} else {
item.max = v
}
}

m := make(map[string]*regexp.Regexp)
for k, v := range item.Tags {
v = strings.TrimSpace(v)
Expand All @@ -130,8 +218,12 @@ func (f *Filter) setTags() {

func (f *Filter) Init() error {

if strings.TrimSpace(f.Condition) == "" {
f.Condition = "AND"
}

if len(f.Ifs) == 0 {
err := fmt.Errorf("no metrics found")
err := fmt.Errorf("no ifs found")
f.Log.Error(err)
return err
}
Expand Down

0 comments on commit 7937355

Please sign in to comment.