diff --git a/graphite-clickhouse.go b/graphite-clickhouse.go
index 2a218ae9e..c5c99bd55 100644
--- a/graphite-clickhouse.go
+++ b/graphite-clickhouse.go
@@ -27,6 +27,7 @@ import (
"github.com/lomik/graphite-clickhouse/config"
"github.com/lomik/graphite-clickhouse/find"
"github.com/lomik/graphite-clickhouse/healthcheck"
+ "github.com/lomik/graphite-clickhouse/helper/rollup"
"github.com/lomik/graphite-clickhouse/index"
"github.com/lomik/graphite-clickhouse/logs"
"github.com/lomik/graphite-clickhouse/metrics"
@@ -90,6 +91,297 @@ var (
srv *http.Server
)
+func sdList(name string, args []string) {
+ descr := "List registered nodes in SD"
+ flagName := "sd-list"
+ flagSet := flag.NewFlagSet(descr, flag.ExitOnError)
+ help := flagSet.Bool("help", false, "Print help")
+ configFile := flagSet.String("config", "/etc/graphite-clickhouse/graphite-clickhouse.conf", "Filename of config")
+ exactConfig := flagSet.Bool("exact-config", false, "Ensure that all config params are contained in the target struct.")
+ flagSet.Usage = func() {
+ fmt.Fprintf(os.Stderr, "Usage of %s %s:\n", name, flagName)
+ flagSet.PrintDefaults()
+ }
+ flagSet.Parse(args)
+ if *help || flagSet.NArg() > 0 {
+ flagSet.Usage()
+ return
+ }
+
+ cfg, _, err := config.ReadConfig(*configFile, *exactConfig)
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ if cfg.Common.SD != "" && cfg.NeedLoadAvgColect() {
+ var s sd.SD
+ logger := zapwriter.Default()
+ if s, err = sd.New(&cfg.Common, "", logger); err != nil {
+ fmt.Fprintf(os.Stderr, "service discovery type %q can be registered", cfg.Common.SDType.String())
+ os.Exit(1)
+ }
+
+ if nodes, err := s.Nodes(); err == nil {
+ for _, node := range nodes {
+ fmt.Printf("%s/%s: %s (%s)\n", s.Namespace(), node.Key, node.Value, time.Unix(node.Flags, 0).UTC().Format(time.RFC3339Nano))
+ }
+ } else {
+ fmt.Fprintln(os.Stderr, err.Error())
+ os.Exit(1)
+ }
+ }
+}
+
+func sdDelete(name string, args []string) {
+ descr := "Delete registered nodes for local hostname in SD"
+ flagName := "sd-delete"
+ flagSet := flag.NewFlagSet(descr, flag.ExitOnError)
+ help := flagSet.Bool("help", false, "Print help")
+ configFile := flagSet.String("config", "/etc/graphite-clickhouse/graphite-clickhouse.conf", "Filename of config")
+ exactConfig := flagSet.Bool("exact-config", false, "Ensure that all config params are contained in the target struct.")
+ flagSet.Usage = func() {
+ fmt.Fprintf(os.Stderr, "Usage of %s %s:\n", name, flagName)
+ flagSet.PrintDefaults()
+ }
+ flagSet.Parse(args)
+ if *help || flagSet.NArg() > 0 {
+ flagSet.Usage()
+ return
+ }
+
+ cfg, _, err := config.ReadConfig(*configFile, *exactConfig)
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ if cfg.Common.SD != "" && cfg.NeedLoadAvgColect() {
+ var s sd.SD
+ logger := zapwriter.Default()
+ if s, err = sd.New(&cfg.Common, "", logger); err != nil {
+ fmt.Fprintf(os.Stderr, "service discovery type %q can be registered", cfg.Common.SDType.String())
+ os.Exit(1)
+ }
+
+ hostname, _ := os.Hostname()
+ hostname, _, _ = strings.Cut(hostname, ".")
+ if err = s.Clear("", ""); err != nil {
+ fmt.Fprintln(os.Stderr, err.Error())
+ os.Exit(1)
+ }
+ }
+}
+
+func sdEvict(name string, args []string) {
+ descr := "Delete registered nodes for hostnames in SD"
+ flagName := "sd-evict"
+ flagSet := flag.NewFlagSet(descr, flag.ExitOnError)
+ help := flagSet.Bool("help", false, "Print help")
+ configFile := flagSet.String("config", "/etc/graphite-clickhouse/graphite-clickhouse.conf", "Filename of config")
+ exactConfig := flagSet.Bool("exact-config", false, "Ensure that all config params are contained in the target struct.")
+ flagSet.Usage = func() {
+ fmt.Fprintf(os.Stderr, "Usage of %s %s:\n", name, flagName)
+ flagSet.PrintDefaults()
+ fmt.Fprintf(os.Stderr, " HOST []string\n List of hostnames\n")
+ }
+ flagSet.Parse(args)
+ if *help {
+ flagSet.Usage()
+ return
+ }
+ cfg, _, err := config.ReadConfig(*configFile, *exactConfig)
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ if cfg.Common.SD != "" && cfg.NeedLoadAvgColect() {
+ for _, host := range flagSet.Args() {
+ var s sd.SD
+ logger := zapwriter.Default()
+ if s, err = sd.New(&cfg.Common, host, logger); err != nil {
+ fmt.Fprintf(os.Stderr, "service discovery type %q can be registered", cfg.Common.SDType.String())
+ os.Exit(1)
+ }
+ err = s.Clear("", "")
+ }
+ }
+}
+
+func sdExpired(name string, args []string) {
+ descr := "List expired registered nodes in SD"
+ flagName := "sd-expired"
+ flagSet := flag.NewFlagSet(descr, flag.ExitOnError)
+ help := flagSet.Bool("help", false, "Print help")
+ configFile := flagSet.String("config", "/etc/graphite-clickhouse/graphite-clickhouse.conf", "Filename of config")
+ exactConfig := flagSet.Bool("exact-config", false, "Ensure that all config params are contained in the target struct.")
+ flagSet.Usage = func() {
+ fmt.Fprintf(os.Stderr, "Usage of %s %s:\n", name, flagName)
+ flagSet.PrintDefaults()
+ }
+ flagSet.Parse(args)
+ if *help || flagSet.NArg() > 0 {
+ flagSet.Usage()
+ return
+ }
+
+ cfg, _, err := config.ReadConfig(*configFile, *exactConfig)
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ if cfg.Common.SD != "" && cfg.NeedLoadAvgColect() {
+ var s sd.SD
+ logger := zapwriter.Default()
+ if s, err = sd.New(&cfg.Common, "", logger); err != nil {
+ fmt.Fprintf(os.Stderr, "service discovery type %q can be registered", cfg.Common.SDType.String())
+ os.Exit(1)
+ }
+
+ if err = sd.Cleanup(&cfg.Common, s, true); err != nil {
+ fmt.Fprintln(os.Stderr, err.Error())
+ os.Exit(1)
+ }
+ }
+}
+
+func sdClean(name string, args []string) {
+ descr := "Cleanup expired registered nodes in SD"
+ flagName := "sd-clean"
+ flagSet := flag.NewFlagSet(descr, flag.ExitOnError)
+ help := flagSet.Bool("help", false, "Print help")
+ configFile := flagSet.String("config", "/etc/graphite-clickhouse/graphite-clickhouse.conf", "Filename of config")
+ exactConfig := flagSet.Bool("exact-config", false, "Ensure that all config params are contained in the target struct.")
+ flagSet.Usage = func() {
+ fmt.Fprintf(os.Stderr, "Usage of %s %s:\n", name, flagName)
+ flagSet.PrintDefaults()
+ }
+ flagSet.Parse(args)
+ if *help || flagSet.NArg() > 0 {
+ flagSet.Usage()
+ return
+ }
+
+ cfg, _, err := config.ReadConfig(*configFile, *exactConfig)
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ if cfg.Common.SD != "" && cfg.NeedLoadAvgColect() {
+ var s sd.SD
+ logger := zapwriter.Default()
+ if s, err = sd.New(&cfg.Common, "", logger); err != nil {
+ fmt.Fprintf(os.Stderr, "service discovery type %q can be registered", cfg.Common.SDType.String())
+ os.Exit(1)
+ }
+
+ if err = sd.Cleanup(&cfg.Common, s, false); err != nil {
+ fmt.Fprintln(os.Stderr, err.Error())
+ os.Exit(1)
+ }
+ }
+}
+
+func printMatchedRollupRules(metric string, age uint32, rollupRules *rollup.Rules) {
+ // check metric rollup rules
+ prec, aggr, aggrPattern, retentionPattern := rollupRules.Lookup(metric, age, true)
+ fmt.Printf(" metric %q, age %d -> precision=%d, aggr=%s\n", metric, age, prec, aggr.Name())
+ if aggrPattern != nil {
+ fmt.Printf(" aggr pattern: type=%s, regexp=%q, function=%s", aggrPattern.RuleType.String(), aggrPattern.Regexp, aggrPattern.Function)
+ if len(aggrPattern.Retention) > 0 {
+ fmt.Print(", retentions:\n")
+ for i := range aggrPattern.Retention {
+ fmt.Printf(" [age: %d, precision: %d]\n", aggrPattern.Retention[i].Age, aggrPattern.Retention[i].Precision)
+ }
+ } else {
+ fmt.Print("\n")
+ }
+ }
+ if retentionPattern != nil {
+ fmt.Printf(" retention pattern: type=%s, regexp=%q, function=%s, retentions:\n", retentionPattern.RuleType.String(), retentionPattern.Regexp, retentionPattern.Function)
+ for i := range retentionPattern.Retention {
+ fmt.Printf(" [age: %d, precision: %d]\n", retentionPattern.Retention[i].Age, retentionPattern.Retention[i].Precision)
+ }
+ }
+}
+
+func checkRollupMatch(name string, args []string) {
+ descr := "Match metric against rollup rules"
+ flagName := "match"
+ flagSet := flag.NewFlagSet(descr, flag.ExitOnError)
+ help := flagSet.Bool("help", false, "Print help")
+
+ rollupFile := flagSet.String("rollup", "", "Filename of rollup rules file")
+ configFile := flagSet.String("config", "", "Filename of config")
+ exactConfig := flagSet.Bool("exact-config", false, "Ensure that all config params are contained in the target struct.")
+ table := flagSet.String("table", "", "Table in config for lookup rules")
+
+ age := flagSet.Uint64("age", 0, "Age")
+ flagSet.Usage = func() {
+ fmt.Fprintf(os.Stderr, "Usage of %s %s:\n", name, flagName)
+ flagSet.PrintDefaults()
+ fmt.Fprintf(os.Stderr, " METRIC []string\n List of metric names\n")
+ }
+ flagSet.Parse(args)
+ if *help {
+ flagSet.Usage()
+ return
+ }
+
+ if *rollupFile == "" && *configFile == "" {
+ fmt.Fprint(os.Stderr, "set rollup and/or config file\n")
+ os.Exit(1)
+ }
+
+ if *rollupFile != "" {
+ fmt.Printf("rollup file %q\n", *rollupFile)
+ if rollup, err := rollup.NewXMLFile(*rollupFile, 0, ""); err == nil {
+ for _, metric := range flagSet.Args() {
+ printMatchedRollupRules(metric, uint32(*age), rollup.Rules())
+ }
+ } else {
+ log.Fatal(err)
+ }
+ }
+ if *configFile != "" {
+ cfg, _, err := config.ReadConfig(*configFile, *exactConfig)
+ if err != nil {
+ log.Fatal(err)
+ }
+ ec := 0
+ for i := range cfg.DataTable {
+ var rulesTable string
+ if *table == "" || *table == cfg.DataTable[i].Table {
+ if cfg.DataTable[i].RollupConf == "auto" || cfg.DataTable[i].RollupConf == "" {
+ rulesTable = cfg.DataTable[i].Table
+ if cfg.DataTable[i].RollupAutoTable != "" {
+ rulesTable = cfg.DataTable[i].RollupAutoTable
+ }
+ fmt.Printf("table %q, rollup rules table %q in Clickhouse\n", cfg.DataTable[i].Table, rulesTable)
+ } else {
+ fmt.Printf("rollup file %q\n", cfg.DataTable[i].RollupConf)
+ }
+
+ rules := cfg.DataTable[i].Rollup.Rules()
+ if rules == nil {
+ if cfg.DataTable[i].RollupConf == "auto" || cfg.DataTable[i].RollupConf == "" {
+ rules, err = rollup.RemoteLoad(cfg.ClickHouse.URL,
+ cfg.ClickHouse.TLSConfig, rulesTable)
+ if err != nil {
+ ec = 1
+ fmt.Fprintf(os.Stderr, "%v\n", err)
+ }
+ }
+ }
+ if rules != nil {
+ for _, metric := range flagSet.Args() {
+ printMatchedRollupRules(metric, uint32(*age), rules)
+ }
+ }
+ }
+ }
+ os.Exit(ec)
+ }
+}
+
func main() {
rand.Seed(time.Now().UnixNano())
@@ -108,15 +400,46 @@ func main() {
"Additional pprof listen addr for non-server modes (tagger, etc..), overrides pprof-listen from common ",
)
- sdList := flag.Bool("sd-list", false, "List registered nodes in SD")
- sdDelete := flag.Bool("sd-delete", false, "Delete registered nodes for this hostname in SD")
- sdEvict := flag.String("sd-evict", "", "Delete registered nodes for hostname in SD")
- sdClean := flag.Bool("sd-clean", false, "Cleanup expired registered nodes in SD")
- sdExpired := flag.Bool("sd-expired", false, "List expired registered nodes in SD")
-
printVersion := flag.Bool("version", false, "Print version")
verbose := flag.Bool("verbose", false, "Verbose (print config on startup)")
+ flag.Usage = func() {
+ fmt.Fprintf(os.Stderr, "Usage of %s:\n", os.Args[0])
+
+ flag.PrintDefaults()
+
+ fmt.Fprintf(os.Stderr, "\n\nAdditional commands:\n")
+ fmt.Fprintf(os.Stderr, " sd-list List registered nodes in SD\n")
+ fmt.Fprintf(os.Stderr, " sd-delete Delete registered nodes for local hostname in SD\n")
+ fmt.Fprintf(os.Stderr, " sd-evict Delete registered nodes for hostnames in SD\n")
+ fmt.Fprintf(os.Stderr, " sd-clean Cleanup expired registered nodes in SD\n")
+ fmt.Fprintf(os.Stderr, " sd-expired List expired registered nodes in SD\n")
+ fmt.Fprintf(os.Stderr, " match Match metric against rollup rules\n")
+ }
+
+ if len(os.Args) > 0 {
+ switch os.Args[1] {
+ case "sd-list", "-sd-list":
+ sdList(os.Args[0], os.Args[2:])
+ return
+ case "sd-delete", "-sd-delete":
+ sdDelete(os.Args[0], os.Args[2:])
+ return
+ case "sd-evict", "-sd-evict":
+ sdEvict(os.Args[0], os.Args[2:])
+ return
+ case "sd-clean", "-sd-clean":
+ sdClean(os.Args[0], os.Args[2:])
+ return
+ case "sd-expired", "-sd-expired":
+ sdExpired(os.Args[0], os.Args[2:])
+ return
+ case "match", "-match":
+ checkRollupMatch(os.Args[0], os.Args[2:])
+ return
+ }
+ }
+
flag.Parse()
if *printVersion {
@@ -141,65 +464,6 @@ func main() {
return
}
- if *sdEvict != "" {
- if cfg.Common.SD != "" && cfg.NeedLoadAvgColect() {
- var s sd.SD
- logger := zapwriter.Default()
- if s, err = sd.New(&cfg.Common, *sdEvict, logger); err != nil {
- fmt.Fprintf(os.Stderr, "service discovery type %q can be registered", cfg.Common.SDType.String())
- os.Exit(1)
- }
- err = s.Clear("", "")
- }
- return
- } else if *sdList || *sdDelete || *sdExpired || *sdClean {
- if cfg.Common.SD != "" && cfg.NeedLoadAvgColect() {
- var s sd.SD
- logger := zapwriter.Default()
- if s, err = sd.New(&cfg.Common, "", logger); err != nil {
- fmt.Fprintf(os.Stderr, "service discovery type %q can be registered", cfg.Common.SDType.String())
- os.Exit(1)
- }
-
- // sdList := flag.Bool("sd-list", false, "List registered nodes in SD")
- // sdDelete := flag.Bool("sd-delete", false, "Delete registered nodes for this hostname in SD")
- // sdEvict := flag.String("sd-evict", "", "Delete registered nodes for hostname in SD")
- // sdClean := flag.Bool("sd-clean", false, "Cleanup expired registered nodes in SD")
-
- if *sdDelete {
- hostname, _ := os.Hostname()
- hostname, _, _ = strings.Cut(hostname, ".")
- if err = s.Clear("", ""); err != nil {
- fmt.Fprintln(os.Stderr, err.Error())
- os.Exit(1)
- }
- } else if *sdExpired {
- if err = sd.Cleanup(&cfg.Common, s, true); err != nil {
- fmt.Fprintln(os.Stderr, err.Error())
- os.Exit(1)
- }
- } else if *sdClean {
- if err = sd.Cleanup(&cfg.Common, s, false); err != nil {
- fmt.Fprintln(os.Stderr, err.Error())
- os.Exit(1)
- }
- } else {
- if nodes, err := s.Nodes(); err == nil {
- for _, node := range nodes {
- fmt.Printf("%s/%s: %s (%s)\n", s.Namespace(), node.Key, node.Value, time.Unix(node.Flags, 0).UTC().Format(time.RFC3339Nano))
- }
- } else {
- fmt.Fprintln(os.Stderr, err.Error())
- os.Exit(1)
- }
- }
- } else {
- fmt.Fprintln(os.Stderr, "SD not enabled")
- os.Exit(1)
- }
- return
- }
-
if err = zapwriter.ApplyConfig(cfg.Logging); err != nil {
log.Fatal(err)
}
diff --git a/helper/rollup/remote.go b/helper/rollup/remote.go
index b1373dec5..0fe0488f2 100644
--- a/helper/rollup/remote.go
+++ b/helper/rollup/remote.go
@@ -4,7 +4,6 @@ import (
"context"
"crypto/tls"
"encoding/json"
- "fmt"
"strconv"
"strings"
"time"
@@ -16,11 +15,12 @@ import (
)
type rollupRulesResponseRecord struct {
- Regexp string `json:"regexp"`
- Function string `json:"function"`
- Age string `json:"age"`
- Precision string `json:"precision"`
- IsDefault int `json:"is_default"`
+ RuleType RuleType `json:"rule_type"`
+ Regexp string `json:"regexp"`
+ Function string `json:"function"`
+ Age string `json:"age"`
+ Precision string `json:"precision"`
+ IsDefault int `json:"is_default"`
}
type rollupRulesResponse struct {
Data []rollupRulesResponseRecord `json:"data"`
@@ -77,6 +77,7 @@ func parseJson(body []byte) (*Rules, error) {
} else {
if last() == nil || last().Regexp != d.Regexp || last().Function != d.Function {
r.Pattern = append(r.Pattern, Pattern{
+ RuleType: d.RuleType,
Retention: make([]Retention, 0),
Regexp: d.Regexp,
Function: d.Function,
@@ -103,7 +104,9 @@ func parseJson(body []byte) (*Rules, error) {
return r.compile()
}
-func remoteLoad(addr string, tlsConf *tls.Config, table string) (*Rules, error) {
+var timeoutRulesLoad = 10 * time.Second
+
+func RemoteLoad(addr string, tlsConf *tls.Config, table string) (*Rules, error) {
var db string
arr := strings.SplitN(table, ".", 2)
if len(arr) == 1 {
@@ -112,8 +115,8 @@ func remoteLoad(addr string, tlsConf *tls.Config, table string) (*Rules, error)
db, table = arr[0], arr[1]
}
- query := fmt.Sprintf(`
- SELECT
+ query := `SELECT
+ rule_type,
regexp,
function,
age,
@@ -121,26 +124,56 @@ func remoteLoad(addr string, tlsConf *tls.Config, table string) (*Rules, error)
is_default
FROM system.graphite_retentions
ARRAY JOIN Tables AS table
- WHERE (table.database = '%s') AND (table.table = '%s')
+ WHERE (table.database = '` + db + `') AND (table.table = '` + table + `')
ORDER BY
is_default ASC,
priority ASC,
regexp ASC,
age ASC
- FORMAT JSON
- `, db, table)
+ FORMAT JSON`
body, _, _, err := clickhouse.Query(
scope.New(context.Background()).WithLogger(zapwriter.Logger("rollup")).WithTable("system.graphite_retentions"),
addr,
query,
clickhouse.Options{
- Timeout: 10 * time.Second,
- ConnectTimeout: 10 * time.Second,
+ Timeout: timeoutRulesLoad,
+ ConnectTimeout: timeoutRulesLoad,
TLSConfig: tlsConf,
},
nil,
)
+ if err != nil && strings.Contains(err.Error(), " Missing columns: 'rule_type' ") {
+ // for old version
+ query = `SELECT
+ regexp,
+ function,
+ age,
+ precision,
+ is_default
+ FROM system.graphite_retentions
+ ARRAY JOIN Tables AS table
+ WHERE (table.database = '` + db + `') AND (table.table = '` + table + `')
+ ORDER BY
+ is_default ASC,
+ priority ASC,
+ regexp ASC,
+ age ASC
+ FORMAT JSON`
+
+ body, _, _, err = clickhouse.Query(
+ scope.New(context.Background()).WithLogger(zapwriter.Logger("rollup")).WithTable("system.graphite_retentions"),
+ addr,
+ query,
+ clickhouse.Options{
+ Timeout: timeoutRulesLoad,
+ ConnectTimeout: timeoutRulesLoad,
+ TLSConfig: tlsConf,
+ },
+ nil,
+ )
+ }
+
if err != nil {
return nil, err
}
diff --git a/helper/rollup/remote_test.go b/helper/rollup/remote_test.go
index 490ffc488..4e1c0bfb0 100644
--- a/helper/rollup/remote_test.go
+++ b/helper/rollup/remote_test.go
@@ -2,6 +2,7 @@ package rollup
import (
"encoding/json"
+ "regexp"
"testing"
"github.com/stretchr/testify/assert"
@@ -124,3 +125,270 @@ func TestParseJson(t *testing.T) {
assert.NoError(err)
assert.Equal(expected, r)
}
+
+func TestParseJsonTyped(t *testing.T) {
+ response := `{
+ "meta":
+ [
+ {
+ "name": "rule_type",
+ "type": "String"
+ },
+ {
+ "name": "regexp",
+ "type": "String"
+ },
+ {
+ "name": "function",
+ "type": "String"
+ },
+ {
+ "name": "age",
+ "type": "UInt64"
+ },
+ {
+ "name": "precision",
+ "type": "UInt64"
+ },
+ {
+ "name": "is_default",
+ "type": "UInt8"
+ }
+ ],
+
+ "data":
+ [
+ {
+ "rule_type": "all",
+ "regexp": "^hourly",
+ "function": "",
+ "age": "0",
+ "precision": "3600",
+ "is_default": 0
+ },
+ {
+ "rule_type": "all",
+ "regexp": "^hourly",
+ "function": "",
+ "age": "3600",
+ "precision": "13600",
+ "is_default": 0
+ },
+ {
+ "rule_type": "all",
+ "regexp": "^live",
+ "function": "",
+ "age": "0",
+ "precision": "1",
+ "is_default": 0
+ },
+ {
+ "rule_type": "plain",
+ "regexp": "total$",
+ "function": "sum",
+ "age": "0",
+ "precision": "0",
+ "is_default": 0
+ },
+ {
+ "rule_type": "plain",
+ "regexp": "min$",
+ "function": "min",
+ "age": "0",
+ "precision": "0",
+ "is_default": 0
+ },
+ {
+ "rule_type": "plain",
+ "regexp": "max$",
+ "function": "max",
+ "age": "0",
+ "precision": "0",
+ "is_default": 0
+ },
+ {
+ "rule_type": "tagged",
+ "regexp": "^tag_name\\?",
+ "function": "min"
+ },
+ {
+ "rule_type": "tag_list",
+ "regexp": "fake3;tag=Fake3",
+ "function": "sum"
+ },
+ {
+ "rule_type": "all",
+ "regexp": "",
+ "function": "max",
+ "age": "0",
+ "precision": "60",
+ "is_default": 1
+ }
+ ],
+
+ "rows": 7,
+
+ "statistics":
+ {
+ "elapsed": 0.00053715,
+ "rows_read": 7,
+ "bytes_read": 1158
+ }
+}`
+
+ expected := &Rules{
+ Separated: true,
+ Pattern: []Pattern{
+ {
+ Regexp: "^hourly",
+ Retention: []Retention{
+ {Age: 0, Precision: 3600},
+ {Age: 3600, Precision: 13600},
+ },
+ re: regexp.MustCompile("^hourly"),
+ },
+ {
+ Regexp: "^live",
+ Retention: []Retention{
+ {Age: 0, Precision: 1},
+ },
+ re: regexp.MustCompile("^live"),
+ },
+ {
+ RuleType: RulePlain,
+ Regexp: "total$",
+ Function: "sum",
+ re: regexp.MustCompile("total$"),
+ aggr: AggrMap["sum"],
+ },
+ {
+ RuleType: RulePlain,
+ Regexp: "min$",
+ Function: "min",
+ re: regexp.MustCompile("min$"),
+ aggr: AggrMap["min"],
+ },
+ {
+ RuleType: RulePlain,
+ Regexp: "max$",
+ Function: "max",
+ re: regexp.MustCompile("max$"),
+ aggr: AggrMap["max"],
+ },
+ {
+ RuleType: RuleTagged,
+ Regexp: `^tag_name\?`,
+ Function: "min",
+ re: regexp.MustCompile(`^tag_name\?`),
+ aggr: AggrMap["min"],
+ },
+ {
+ RuleType: RuleTagged,
+ Regexp: `^fake3\?(.*&)?tag=Fake3(&.*)?$`,
+ Function: "sum",
+ re: regexp.MustCompile(`^fake3\?(.*&)?tag=Fake3(&.*)?$`),
+ aggr: AggrMap["sum"],
+ },
+ {
+ Regexp: ".*",
+ Function: "max",
+ Retention: []Retention{
+ {Age: 0, Precision: 60},
+ },
+ aggr: AggrMap["max"],
+ },
+ },
+ PatternPlain: []Pattern{
+ {
+ Regexp: "^hourly",
+ Retention: []Retention{
+ {Age: 0, Precision: 3600},
+ {Age: 3600, Precision: 13600},
+ },
+ re: regexp.MustCompile("^hourly"),
+ },
+ {
+ Regexp: "^live",
+ Retention: []Retention{
+ {Age: 0, Precision: 1},
+ },
+ re: regexp.MustCompile("^live"),
+ },
+ {
+ RuleType: RulePlain,
+ Regexp: "total$",
+ Function: "sum",
+ re: regexp.MustCompile("total$"),
+ aggr: AggrMap["sum"],
+ },
+ {
+ RuleType: RulePlain,
+ Regexp: "min$",
+ Function: "min",
+ re: regexp.MustCompile("min$"),
+ aggr: AggrMap["min"],
+ },
+ {
+ RuleType: RulePlain,
+ Regexp: "max$",
+ Function: "max",
+ re: regexp.MustCompile("max$"),
+ aggr: AggrMap["max"],
+ },
+ {
+ Regexp: ".*",
+ Function: "max",
+ Retention: []Retention{
+ {Age: 0, Precision: 60},
+ },
+ aggr: AggrMap["max"],
+ },
+ },
+ PatternTagged: []Pattern{
+ {
+ Regexp: "^hourly",
+ Retention: []Retention{
+ {Age: 0, Precision: 3600},
+ {Age: 3600, Precision: 13600},
+ },
+ re: regexp.MustCompile("^hourly"),
+ },
+ {
+ Regexp: "^live",
+ Retention: []Retention{
+ {Age: 0, Precision: 1},
+ },
+ re: regexp.MustCompile("^live"),
+ },
+ {
+ RuleType: RuleTagged,
+ Regexp: `^tag_name\?`,
+ Function: "min",
+ re: regexp.MustCompile(`^tag_name\?`),
+ aggr: AggrMap["min"],
+ },
+ {
+ RuleType: RuleTagged,
+ Regexp: `^fake3\?(.*&)?tag=Fake3(&.*)?$`,
+ Function: "sum",
+ re: regexp.MustCompile(`^fake3\?(.*&)?tag=Fake3(&.*)?$`),
+ aggr: AggrMap["sum"],
+ },
+ {
+ Regexp: ".*",
+ Function: "max",
+ Retention: []Retention{
+ {Age: 0, Precision: 60},
+ },
+ aggr: AggrMap["max"],
+ },
+ },
+ }
+
+ assert := assert.New(t)
+
+ r, err := parseJson([]byte(response))
+ assert.NotNil(r)
+ assert.NoError(err)
+ assert.Equal(expected, r)
+}
diff --git a/helper/rollup/rollup.go b/helper/rollup/rollup.go
index 60261a587..fcdb1c616 100644
--- a/helper/rollup/rollup.go
+++ b/helper/rollup/rollup.go
@@ -82,7 +82,7 @@ func (r *Rollup) Rules() *Rules {
}
func (r *Rollup) update() error {
- rules, err := remoteLoad(r.addr, r.tlsConfig, r.table)
+ rules, err := RemoteLoad(r.addr, r.tlsConfig, r.table)
if err != nil {
zapwriter.Logger("rollup").Error(fmt.Sprintf("rollup rules update failed for table %#v", r.table), zap.Error(err))
return err
diff --git a/helper/rollup/rules.go b/helper/rollup/rules.go
index 3bbc71266..ba5b1a17e 100644
--- a/helper/rollup/rules.go
+++ b/helper/rollup/rules.go
@@ -1,9 +1,11 @@
package rollup
import (
+ "encoding/xml"
"fmt"
"regexp"
"sort"
+ "strings"
"time"
"github.com/lomik/graphite-clickhouse/pkg/dry"
@@ -16,7 +18,122 @@ type Retention struct {
Precision uint32 `json:"precision"`
}
+type RuleType uint8
+
+const (
+ RuleAll RuleType = iota
+ RulePlain
+ RuleTagged
+ RuleTagList
+)
+
+var timeNow = time.Now
+
+var ruleTypeStrings []string = []string{"all", "plain", "tagged", "tag_list"}
+
+func (r *RuleType) String() string {
+ return ruleTypeStrings[*r]
+}
+
+func (r *RuleType) Set(value string) error {
+ switch strings.ToLower(value) {
+ case "all":
+ *r = RuleAll
+ case "plain":
+ *r = RulePlain
+ case "tagged":
+ *r = RuleTagged
+ case "tag_list":
+ *r = RuleTagList
+ default:
+ return fmt.Errorf("invalid rule type %s", value)
+ }
+ return nil
+}
+
+func (r *RuleType) UnmarshalJSON(data []byte) error {
+ s := string(data)
+ if strings.HasPrefix(s, `"`) && strings.HasSuffix(s, `"`) {
+ s = s[1 : len(s)-1]
+ }
+ return r.Set(s)
+}
+
+func (r *RuleType) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error {
+ var s string
+ if err := d.DecodeElement(&s, &start); err != nil {
+ return err
+ }
+
+ return r.Set(s)
+}
+
+func splitTags(tagsStr string) (tags []string) {
+ vals := strings.Split(tagsStr, ";")
+ tags = make([]string, 0, len(vals))
+ // remove empthy elements
+ for _, v := range vals {
+ if v != "" {
+ tags = append(tags, v)
+ }
+ }
+ return
+}
+
+func buildTaggedRegex(regexpStr string) string {
+ // see buildTaggedRegex in https://github.com/ClickHouse/ClickHouse/blob/780a1b2abea918d3205d149db7689a31fdff2f70/src/Processors/Merges/Algorithms/Graphite.cpp#L241
+ //
+ // * tags list in format (for name or any value can use regexp, alphabet sorting not needed)
+ // * spaces are not stiped and used as tag and value part
+ // * name must be first (if used)
+ // *
+ // * tag1=value1; tag2=VALUE2_REGEX;tag3=value3
+ // * or
+ // * name;tag1=value1;tag2=VALUE2_REGEX;tag3=value3
+ // * or for one tag
+ // * tag1=value1
+ // *
+ // * Resulting regex against metric like
+ // * name?tag1=value1&tag2=value2
+ // *
+ // * So,
+ // *
+ // * name
+ // * produce
+ // * name\?
+ // *
+ // * tag2=val2
+ // * produce
+ // * [\?&]tag2=val2(&.*)?$
+ // *
+ // * nam.* ; tag1=val1 ; tag2=val2
+ // * produce
+ // * nam.*\?(.*&)?tag1=val1&(.*&)?tag2=val2(&.*)?$
+
+ tags := splitTags(regexpStr)
+
+ if strings.Contains(tags[0], "=") {
+ regexpStr = "[\\?&]"
+ } else {
+ if len(tags) == 1 {
+ // only name
+ return "^" + tags[0] + "\\?"
+ }
+ // start with name value
+ regexpStr = "^" + tags[0] + "\\?(.*&)?"
+ tags = tags[1:]
+ }
+
+ sort.Strings(tags) // sorted tag keys
+ regexpStr = regexpStr +
+ strings.Join(tags, "&(.*&)?") +
+ "(&.*)?$" // close regex
+
+ return regexpStr
+}
+
type Pattern struct {
+ RuleType RuleType `json:"rule_type"`
Regexp string `json:"regexp"`
Function string `json:"function"`
Retention []Retention `json:"retention"`
@@ -25,8 +142,11 @@ type Pattern struct {
}
type Rules struct {
- Pattern []Pattern `json:"pattern"`
- Updated int64 `json:"updated"`
+ Pattern []Pattern `json:"pattern"`
+ Updated int64 `json:"updated"`
+ Separated bool `json:"-"`
+ PatternPlain []Pattern `json:"-"`
+ PatternTagged []Pattern `json:"-"`
}
// NewMockRulles creates mock rollup for tests
@@ -48,6 +168,12 @@ var superDefaultFunction = AggrMap["avg"]
const superDefaultPrecision = uint32(60)
func (p *Pattern) compile() error {
+ if p.RuleType == RuleTagList {
+ // convert to tagged rule type
+ p.RuleType = RuleTagged
+ p.Regexp = buildTaggedRegex(p.Regexp)
+ }
+
var err error
if p.Regexp != "" && p.Regexp != ".*" {
p.re, err = regexp.Compile(p.Regexp)
@@ -82,11 +208,31 @@ func (r *Rules) compile() (*Rules, error) {
if r.Pattern == nil {
r.Pattern = make([]Pattern, 0)
}
+ r.PatternPlain = make([]Pattern, 0)
+ r.PatternTagged = make([]Pattern, 0)
+ r.Separated = false
for i := range r.Pattern {
if err := r.Pattern[i].compile(); err != nil {
return r, err
}
+ if !r.Separated && r.Pattern[i].RuleType != RuleAll {
+ r.Separated = true
+ }
+ }
+
+ if r.Separated {
+ for i := range r.Pattern {
+ switch r.Pattern[i].RuleType {
+ case RulePlain:
+ r.PatternPlain = append(r.PatternPlain, r.Pattern[i])
+ case RuleTagged:
+ r.PatternTagged = append(r.PatternTagged, r.Pattern[i])
+ default:
+ r.PatternPlain = append(r.PatternPlain, r.Pattern[i])
+ r.PatternTagged = append(r.PatternTagged, r.Pattern[i])
+ }
+ }
}
return r, nil
@@ -119,7 +265,7 @@ func (r *Rules) withDefault(defaultPrecision uint32, defaultFunction *Aggr) *Rul
}
func (r *Rules) setUpdated() *Rules {
- r.Updated = time.Now().Unix()
+ r.Updated = timeNow().Unix()
return r
}
@@ -128,10 +274,21 @@ func (r *Rules) withSuperDefault() *Rules {
}
// Lookup returns precision and aggregate function for metric name and age
-func (r *Rules) Lookup(metric string, age uint32) (precision uint32, ag *Aggr) {
+func (r *Rules) Lookup(metric string, age uint32, verbose bool) (precision uint32, ag *Aggr, aggrPattern, retentionPattern *Pattern) {
+ if r.Separated {
+ if strings.Contains(metric, "?") {
+ return lookup(metric, age, r.PatternTagged, verbose)
+ }
+ return lookup(metric, age, r.PatternPlain, verbose)
+ }
+ return lookup(metric, age, r.Pattern, verbose)
+}
+
+// Lookup returns precision and aggregate function for metric name and age
+func lookup(metric string, age uint32, patterns []Pattern, verbose bool) (precision uint32, ag *Aggr, aggrPattern, retentionPattern *Pattern) {
precisionFound := false
- for _, p := range r.Pattern {
+ for n, p := range patterns {
// pattern hasn't interested data
if (ag != nil || p.aggr == nil) && (precisionFound || len(p.Retention) == 0) {
continue
@@ -143,6 +300,9 @@ func (r *Rules) Lookup(metric string, age uint32) (precision uint32, ag *Aggr) {
}
if ag == nil && p.aggr != nil {
+ if verbose {
+ aggrPattern = &patterns[n]
+ }
ag = p.aggr
}
@@ -152,12 +312,18 @@ func (r *Rules) Lookup(metric string, age uint32) (precision uint32, ag *Aggr) {
if i > 0 {
precision = p.Retention[i-1].Precision
precisionFound = true
+ if verbose {
+ retentionPattern = &patterns[n]
+ }
}
break
}
if i == len(p.Retention)-1 {
precision = r.Precision
precisionFound = true
+ if verbose {
+ retentionPattern = &patterns[n]
+ }
break
}
}
@@ -181,8 +347,8 @@ func (r *Rules) Lookup(metric string, age uint32) (precision uint32, ag *Aggr) {
}
// LookupBytes returns precision and aggregate function for metric name and age
-func (r *Rules) LookupBytes(metric []byte, age uint32) (precision uint32, ag *Aggr) {
- return r.Lookup(dry.UnsafeString(metric), age)
+func (r *Rules) LookupBytes(metric []byte, age uint32, verbose bool) (precision uint32, ag *Aggr, aggrPattern, retentionPattern *Pattern) {
+ return r.Lookup(dry.UnsafeString(metric), age, verbose)
}
func doMetricPrecision(points []point.Point, precision uint32, aggr *Aggr) []point.Point {
@@ -229,7 +395,7 @@ func (r *Rules) RollupMetricAge(metricName string, age uint32, points []point.Po
return points, 1, nil
}
- precision, ag := r.Lookup(metricName, age)
+ precision, ag, _, _ := r.Lookup(metricName, age, false)
points = doMetricPrecision(points, precision, ag)
return points, precision, nil
@@ -238,7 +404,7 @@ func (r *Rules) RollupMetricAge(metricName string, age uint32, points []point.Po
// RollupMetric rolling up list of points of ONE metric sorted by key "time"
// returns (new points slice, precision)
func (r *Rules) RollupMetric(metricName string, from uint32, points []point.Point) ([]point.Point, uint32, error) {
- now := uint32(time.Now().Unix())
+ now := uint32(timeNow().Unix())
age := uint32(0)
if now > from {
age = now - from
@@ -252,6 +418,13 @@ func (r *Rules) RollupPoints(pp *point.Points, from int64, step int64) error {
if from < 0 || step < 0 {
return fmt.Errorf("from and step must be >= 0: %v, %v", from, step)
}
+
+ now := int64(timeNow().Unix())
+ age := int64(0)
+ if now > from {
+ age = now - from
+ }
+
var i, n int
// i - current position of iterator
// n - position of the first record with current metric
@@ -265,9 +438,9 @@ func (r *Rules) RollupPoints(pp *point.Points, from int64, step int64) error {
metricName := pp.MetricName(p[0].MetricID)
var err error
if step == 0 {
- p, _, err = r.RollupMetric(metricName, uint32(from), p)
+ p, _, err = r.RollupMetricAge(metricName, uint32(age), p)
} else {
- _, agg := r.Lookup(metricName, uint32(from))
+ _, agg, _, _ := r.Lookup(metricName, uint32(from), false)
p = doMetricPrecision(p, uint32(step), agg)
}
for i := range p {
diff --git a/helper/rollup/rules_test.go b/helper/rollup/rules_test.go
index ffc5b68a4..e3dfdeef7 100644
--- a/helper/rollup/rules_test.go
+++ b/helper/rollup/rules_test.go
@@ -2,11 +2,14 @@ package rollup
import (
"fmt"
+ "regexp"
"strconv"
"testing"
+ "time"
"github.com/lomik/graphite-clickhouse/helper/point"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
)
func TestMetricPrecision(t *testing.T) {
@@ -30,6 +33,64 @@ func TestMetricPrecision(t *testing.T) {
}
}
+func Test_buildTaggedRegex(t *testing.T) {
+ tests := []struct {
+ tagsStr string
+ want string
+ match string
+ nomatch string
+ }{
+ {
+ tagsStr: `cpu\.loadavg;project=DB.*;env=st.*`, want: `^cpu\.loadavg\?(.*&)?env=st.*&(.*&)?project=DB.*(&.*)?$`,
+ match: `cpu.loadavg?env=staging&project=DBAAS`,
+ nomatch: `cpu.loadavg?env=staging&project=D`,
+ },
+ {
+ tagsStr: `project=DB.*;env=staging;`, want: `[\?&]env=staging&(.*&)?project=DB.*(&.*)?$`,
+ match: `cpu.loadavg?env=staging&project=DBPG`,
+ nomatch: `cpu.loadavg?env=stagingN&project=DBAAS`,
+ },
+ {
+ tagsStr: "env=staging;", want: `[\?&]env=staging(&.*)?$`,
+ match: `cpu.loadavg?env=staging&project=DPG`,
+ nomatch: `cpu.loadavg?env=stagingN`,
+ },
+ {
+ tagsStr: " env = staging ;", // spaces are allowed,
+ want: `[\?&] env = staging (&.*)?$`,
+ match: `cpu.loadavg? env = staging &project=DPG`,
+ nomatch: `cpu.loadavg?env=stagingN`,
+ },
+ {
+ tagsStr: "name;",
+ want: `^name\?`,
+ match: `name?env=staging&project=DPG`,
+ nomatch: `nameN?env=stagingN`,
+ },
+ {
+ tagsStr: "name",
+ want: `^name\?`,
+ match: `name?env=staging&project=DPG`,
+ nomatch: `nameN?env=stagingN`,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.tagsStr, func(t *testing.T) {
+ if got := buildTaggedRegex(tt.tagsStr); got != tt.want {
+ t.Errorf("buildTaggedRegex(%q) = %v, want %v", tt.tagsStr, got, tt.want)
+ } else {
+ re := regexp.MustCompile(got)
+ if tt.match != "" && !re.Match([]byte(tt.match)) {
+ t.Errorf("match(%q, %q) must be true", tt.tagsStr, tt.match)
+ }
+ if tt.nomatch != "" && re.Match([]byte(tt.nomatch)) {
+ t.Errorf("match(%q, %q) must be false", tt.tagsStr, tt.match)
+ }
+ }
+ })
+ }
+}
+
func TestLookup(t *testing.T) {
config := `
^hourly;;3600:60,86400:3600
@@ -60,7 +121,150 @@ func TestLookup(t *testing.T) {
}
r, err := parseCompact(config)
- assert.NoError(t, err)
+ require.NoError(t, err)
+
+ for _, c := range table {
+ t.Run(fmt.Sprintf("%#v", c[:]), func(t *testing.T) {
+ assert := assert.New(t)
+ age, err := strconv.Atoi(c[1])
+ assert.NoError(err)
+
+ precision, ag, _, _ := r.Lookup(c[0], uint32(age), false)
+ assert.Equal(c[2], ag.String())
+ assert.Equal(c[3], fmt.Sprintf("%d", precision))
+ })
+ }
+}
+
+func TestLookupTyped(t *testing.T) {
+ config := `
+
+
+ ^hourly
+
+ 3600
+ 60
+
+
+ 86400
+ 3600
+
+
+
+ ^live
+
+ 0
+ 1
+
+
+
+ tag_list
+ fake3;tag3=Fake3
+
+ 0
+ 1
+
+
+
+ tag_list
+ tag5=Fake5;tag3=Fake3
+
+ 0
+ 90
+
+
+
+ tag_list
+ fake_name
+
+ 0
+ 20
+
+
+
+ plain
+ total$
+ sum
+
+
+ plain
+ min$
+ min
+
+
+ plain
+ max$
+ max
+
+
+ tagged
+ total?
+ sum
+
+
+ tagged
+ min\?
+ min
+
+
+ tagged
+ max\?
+ max
+
+
+ tagged
+ ^hourly
+ sum
+
+
+ avg
+
+ 0
+ 42
+
+
+ 60
+ 10
+
+
+
+ `
+
+ table := [][4]string{
+ {"hello.world", "0", "avg", "42"},
+ {"hourly.rps", "0", "avg", "42"},
+ {"hourly.rps?tag=value", "0", "sum", "42"},
+ {"hourly.rps", "0", "avg", "42"},
+ {"hourly.rps_total", "0", "sum", "42"},
+ {"live.rps_total", "0", "sum", "1"},
+ {"hourly.rps_min", "0", "min", "42"},
+ {"hourly.rps_min?tag=value", "0", "min", "42"},
+ {"hourly.rps_min", "1", "min", "42"},
+ {"hourly.rps_min", "59", "min", "42"},
+ {"hourly.rps_min?tag=value", "59", "min", "42"},
+ {"hourly.rps_min", "60", "min", "10"},
+ {"hourly.rps_min", "61", "min", "10"},
+ {"hourly.rps_min", "3599", "min", "10"},
+ {"hourly.rps_min", "3600", "min", "60"},
+ {"hourly.rps_min", "3601", "min", "60"},
+ {"hourly.rps_min", "86399", "min", "60"},
+ {"hourly.rps_min", "86400", "min", "3600"},
+ {"hourly.rps_min", "86401", "min", "3600"},
+ {"fake3?tag3=Fake3", "0", "avg", "1"},
+ {"fake3?tag1=Fake1&tag3=Fake3", "0", "avg", "1"},
+ {"fake3?tag1=Fake1&tag3=Fake3&tag4=Fake4", "0", "avg", "1"},
+ {"fake3?tag3=Fake", "0", "avg", "42"},
+ {"fake3?tag1=Fake1&tag3=Fake", "0", "avg", "42"},
+ {"fake3?tag1=Fake1&tag3=Fake&tag4=Fake4", "0", "avg", "42"},
+ {"fake?tag3=Fake3", "0", "avg", "42"},
+ {"fake_name?tag3=Fake3", "0", "avg", "20"},
+ {"fake5?tag1=Fake1&tag3=Fake3&tag4=Fake4&tag5=Fake5", "0", "avg", "90"},
+ {"fake5?tag3=Fake3&tag4=Fake4&tag5=Fake5&tag6=Fake6", "0", "avg", "90"},
+ {"fake5?tag4=Fake4&tag5=Fake5&tag6=Fake6", "0", "avg", "42"},
+ }
+
+ r, err := parseXML([]byte(config))
+ require.NoError(t, err)
for _, c := range table {
t.Run(fmt.Sprintf("%#v", c[:]), func(t *testing.T) {
@@ -68,9 +272,575 @@ func TestLookup(t *testing.T) {
age, err := strconv.Atoi(c[1])
assert.NoError(err)
- precision, ag := r.Lookup(c[0], uint32(age))
+ precision, ag, _, _ := r.Lookup(c[0], uint32(age), false)
assert.Equal(c[2], ag.String())
assert.Equal(c[3], fmt.Sprintf("%d", precision))
})
}
}
+
+func TestRules_RollupPoints(t *testing.T) {
+ config := `
+ ^10sec;;0:10,3600:60
+ ;max;0:20`
+
+ r, err := parseCompact(config)
+ require.NoError(t, err)
+
+ timeNow = func() time.Time {
+ return time.Unix(10010, 0)
+ }
+
+ newPoints := func() *point.Points {
+ pp := point.NewPoints()
+
+ id10Sec := pp.MetricID("10sec")
+ pp.AppendPoint(id10Sec, 1.0, 10, 0)
+ pp.AppendPoint(id10Sec, 2.0, 20, 0)
+ pp.AppendPoint(id10Sec, 3.0, 30, 0)
+ pp.AppendPoint(id10Sec, 6.0, 60, 0)
+ pp.AppendPoint(id10Sec, 7.0, 70, 0)
+
+ idDefault := pp.MetricID("default")
+ pp.AppendPoint(idDefault, 2.0, 20, 0)
+ pp.AppendPoint(idDefault, 4.0, 40, 0)
+ pp.AppendPoint(idDefault, 6.0, 60, 0)
+ pp.AppendPoint(idDefault, 8.0, 80, 0)
+
+ return pp
+ }
+
+ pointsTo60SecNoDefault := func() *point.Points {
+ pp := point.NewPoints()
+
+ id10Sec := pp.MetricID("10sec")
+ pp.AppendPoint(id10Sec, 3.0, 0, 0)
+ pp.AppendPoint(id10Sec, 7.0, 60, 0)
+
+ idDefault := pp.MetricID("default")
+ pp.AppendPoint(idDefault, 2.0, 20, 0)
+ pp.AppendPoint(idDefault, 4.0, 40, 0)
+ pp.AppendPoint(idDefault, 6.0, 60, 0)
+ pp.AppendPoint(idDefault, 8.0, 80, 0)
+
+ return pp
+ }
+
+ pointsTo60Sec := func() *point.Points {
+ pp := point.NewPoints()
+
+ id10Sec := pp.MetricID("10sec")
+ pp.AppendPoint(id10Sec, 3.0, 0, 0)
+ pp.AppendPoint(id10Sec, 7.0, 60, 0)
+
+ idDefault := pp.MetricID("default")
+ pp.AppendPoint(idDefault, 4.0, 0, 0)
+ pp.AppendPoint(idDefault, 8.0, 60, 0)
+
+ return pp
+ }
+
+ tests := []struct {
+ name string
+ pp *point.Points
+ from int64
+ step int64
+ want *point.Points
+ wantErr bool
+ }{
+ {
+ name: "without step and no rollup",
+ pp: newPoints(),
+ from: int64(10000), step: int64(0),
+ want: newPoints(),
+ },
+ {
+ name: "without step",
+ pp: newPoints(),
+ from: int64(10), step: int64(0),
+ want: pointsTo60SecNoDefault(),
+ },
+ {
+ name: "with step 10",
+ pp: newPoints(),
+ from: int64(10000), step: int64(10),
+ want: newPoints(),
+ },
+ {
+ name: "with step 60",
+ pp: newPoints(),
+ from: int64(10), step: int64(60),
+ want: pointsTo60Sec(),
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if err := r.RollupPoints(tt.pp, tt.from, tt.step); (err != nil) != tt.wantErr {
+ t.Errorf("Rules.RollupPoints() error = %v, wantErr %v", err, tt.wantErr)
+ } else if err == nil {
+ assert.Equal(t, tt.want, tt.pp)
+ }
+ })
+ }
+}
+
+var benchConfig = `
+
+
+ ^hourly
+
+ 3600
+ 60
+
+
+ 86400
+ 3600
+
+
+
+ ^live
+
+ 0
+ 1
+
+
+
+ \.fake1\..*\.Fake1\.
+
+ 3600
+ 60
+
+
+ 86400
+ 3600
+
+
+
+
+
+ 3600
+ 60
+
+
+ 86400
+ 3600
+
+
+
+ \.fake2\..*\.Fake2\.
+
+ 3600
+ 60
+
+
+ 86400
+ 3600
+
+
+
+
+
+ 3600
+ 60
+
+
+ 86400
+ 3600
+
+
+
+ \.fake3\..*\.Fake3\.
+
+ 3600
+ 60
+
+
+ 86400
+ 3600
+
+
+
+
+
+ 3600
+ 60
+
+
+ 86400
+ 3600
+
+
+
+ \.fake4\..*\.Fake4\.
+
+ 3600
+ 60
+
+
+ 86400
+ 3600
+
+
+
+
+
+ 3600
+ 60
+
+
+ 86400
+ 3600
+
+
+
+ total$
+ sum
+
+
+ min$
+ min
+
+
+ max$
+ max
+
+
+ total?
+ sum
+
+
+ min\?
+ min
+
+
+ max\?
+ max
+
+
+ ^hourly
+ sum
+
+
+ avg
+
+ 0
+ 42
+
+
+ 60
+ 10
+
+
+
+ `
+
+var benchConfigSeparated = `
+
+
+ plain
+ ^hourly
+
+ 3600
+ 60
+
+
+ 86400
+ 3600
+
+
+
+ plain
+ ^live
+
+ 0
+ 1
+
+
+
+ plain
+ \.fake1\..*\.Fake1\.
+
+ 3600
+ 60
+
+
+ 86400
+ 3600
+
+
+
+ tagged
+
+
+ 3600
+ 60
+
+
+ 86400
+ 3600
+
+
+
+ plain
+ \.fake2\..*\.Fake2\.
+
+ 3600
+ 60
+
+
+ 86400
+ 3600
+
+
+
+ tagged
+
+
+ 3600
+ 60
+
+
+ 86400
+ 3600
+
+
+
+ plain
+ \.fake3\..*\.Fake3\.
+
+ 3600
+ 60
+
+
+ 86400
+ 3600
+
+
+
+ tagged
+
+
+ 3600
+ 60
+
+
+ 86400
+ 3600
+
+
+
+ plain
+ \.fake4\..*\.Fake4\.
+
+ 3600
+ 60
+
+
+ 86400
+ 3600
+
+
+
+ tagged
+
+
+ 3600
+ 60
+
+
+ 86400
+ 3600
+
+
+
+ plain
+ total$
+ sum
+
+
+ plain
+ min$
+ min
+
+
+ plain
+ max$
+ max
+
+
+ tagged
+ total?
+ sum
+
+
+ tagged
+ min\?
+ min
+
+
+ tagged
+ max\?
+ max
+
+
+ tagged
+ ^hourly
+ sum
+
+
+ avg
+
+ 0
+ 42
+
+
+ 60
+ 10
+
+
+
+ `
+
+func BenchmarkLookupSum(b *testing.B) {
+ r, err := parseXML([]byte(benchConfig))
+ require.NoError(b, err)
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ precision, ag, _, _ := r.Lookup("test.sum", 1, false)
+ _ = precision
+ _ = ag
+ }
+}
+
+func BenchmarkLookupSumSeparated(b *testing.B) {
+ r, err := parseXML([]byte(benchConfigSeparated))
+ require.NoError(b, err)
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ precision, ag, _, _ := r.Lookup("test.sum", 1, false)
+ _ = precision
+ _ = ag
+ }
+}
+
+func BenchmarkLookupSumTagged(b *testing.B) {
+ r, err := parseXML([]byte(benchConfig))
+ require.NoError(b, err)
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ precision, ag, _, _ := r.Lookup("sum?env=test&tag=Fake5", 1, false)
+ _ = precision
+ _ = ag
+ }
+}
+
+func BenchmarkLookupSumTaggedSeparated(b *testing.B) {
+ r, err := parseXML([]byte(benchConfigSeparated))
+ require.NoError(b, err)
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ precision, ag, _, _ := r.Lookup("sum?env=test&tag=Fake5", 1, false)
+ _ = precision
+ _ = ag
+ }
+}
+
+func BenchmarkLookupMax(b *testing.B) {
+ r, err := parseXML([]byte(benchConfig))
+ require.NoError(b, err)
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ precision, ag, _, _ := r.Lookup("test.max", 1, false)
+ _ = precision
+ _ = ag
+ }
+}
+
+func BenchmarkLookupMaxSeparated(b *testing.B) {
+ r, err := parseXML([]byte(benchConfigSeparated))
+ require.NoError(b, err)
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ precision, ag, _, _ := r.Lookup("test.max", 1, false)
+ _ = precision
+ _ = ag
+ }
+}
+
+func BenchmarkLookupMaxTagged(b *testing.B) {
+ r, err := parseXML([]byte(benchConfig))
+ require.NoError(b, err)
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ precision, ag, _, _ := r.Lookup("max?env=test&tag=Fake5", 1, false)
+ _ = precision
+ _ = ag
+ }
+}
+
+func BenchmarkLookupMaxTaggedSeparated(b *testing.B) {
+ r, err := parseXML([]byte(benchConfigSeparated))
+ require.NoError(b, err)
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ precision, ag, _, _ := r.Lookup("max?env=test&tag=Fake5", 1, false)
+ _ = precision
+ _ = ag
+ }
+}
+
+func BenchmarkLookupDefault(b *testing.B) {
+ r, err := parseXML([]byte(benchConfig))
+ require.NoError(b, err)
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ precision, ag, _, _ := r.Lookup("test.p95", 1, false)
+ _ = precision
+ _ = ag
+ }
+}
+
+func BenchmarkLookupDefaultSeparated(b *testing.B) {
+ r, err := parseXML([]byte(benchConfigSeparated))
+ require.NoError(b, err)
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ precision, ag, _, _ := r.Lookup("test.p95", 1, false)
+ _ = precision
+ _ = ag
+ }
+}
+
+func BenchmarkLookupDefaultTagged(b *testing.B) {
+ r, err := parseXML([]byte(benchConfig))
+ require.NoError(b, err)
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ precision, ag, _, _ := r.Lookup("p95?env=test&tag=Fake5", 1, false)
+ _ = precision
+ _ = ag
+ }
+}
+
+func BenchmarkLookupDefaultTaggedSeparated(b *testing.B) {
+ r, err := parseXML([]byte(benchConfigSeparated))
+ require.NoError(b, err)
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ precision, ag, _, _ := r.Lookup("p95?env=test&tag=Fake5", 1, false)
+ _ = precision
+ _ = ag
+ }
+}
diff --git a/helper/rollup/xml.go b/helper/rollup/xml.go
index 6db62860a..5a202dd84 100644
--- a/helper/rollup/xml.go
+++ b/helper/rollup/xml.go
@@ -46,6 +46,7 @@ type RetentionXML struct {
}
type PatternXML struct {
+ RuleType RuleType `xml:"rule_type"`
Regexp string `xml:"regexp"`
Function string `xml:"function"`
Retention []*RetentionXML `xml:"retention"`
@@ -62,6 +63,7 @@ func (r *RetentionXML) retention() Retention {
func (p *PatternXML) pattern() Pattern {
result := Pattern{
+ RuleType: p.RuleType,
Regexp: p.Regexp,
Function: p.Function,
Retention: make([]Retention, 0, len(p.Retention)),
diff --git a/helper/rollup/xml_test.go b/helper/rollup/xml_test.go
index 33c7de7d4..485412aa7 100644
--- a/helper/rollup/xml_test.go
+++ b/helper/rollup/xml_test.go
@@ -2,9 +2,11 @@ package rollup
import (
"fmt"
+ "regexp"
"testing"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
)
func TestParseXML(t *testing.T) {
@@ -109,3 +111,195 @@ func TestParseXML(t *testing.T) {
assert.Equal(expectedCompact, r)
})
}
+
+func TestParseXMLTyped(t *testing.T) {
+ config := `
+
+
+ all>
+ click_cost
+ any
+
+ 0
+ 3600
+
+
+ 86400
+ 60
+
+
+
+ without_function
+
+ 0
+ 3600
+
+
+ 86400
+ 60
+
+
+
+ plain
+ without_retention
+ min
+
+
+ tagged
+ ^((.*)|.)sum\?
+ sum
+
+
+ tag_list
+ fake3;tag=Fake3
+ min
+
+
+ tagged
+
+ min
+
+
+ max
+
+ 0
+ 60
+
+
+ 3600
+ 300
+
+
+ 86400
+ 3600
+
+
+
+`
+
+ expected := (&Rules{
+ Separated: true,
+ Pattern: []Pattern{
+ {
+ Regexp: "click_cost", Function: "any", Retention: []Retention{
+ {Age: 0, Precision: 3600},
+ {Age: 86400, Precision: 60},
+ },
+ aggr: AggrMap["any"], re: regexp.MustCompile("click_cost"),
+ },
+ {
+ Regexp: "without_function", Function: "", Retention: []Retention{
+ {Age: 0, Precision: 3600},
+ {Age: 86400, Precision: 60},
+ },
+ re: regexp.MustCompile("without_function"),
+ },
+ {
+ Regexp: "without_retention", RuleType: RulePlain, Function: "min", Retention: nil,
+ aggr: AggrMap["min"], re: regexp.MustCompile("without_retention"),
+ },
+ {
+ Regexp: `^((.*)|.)sum\?`, RuleType: RuleTagged, Function: "sum", Retention: nil,
+ aggr: AggrMap["sum"], re: regexp.MustCompile(`^((.*)|.)sum\?`),
+ },
+ {
+ Regexp: `^fake3\?(.*&)?tag=Fake3(&.*)?$`, RuleType: RuleTagged, Function: "min", Retention: nil,
+ aggr: AggrMap["min"], re: regexp.MustCompile(`^fake3\?(.*&)?tag=Fake3(&.*)?$`),
+ },
+ {
+ Regexp: `^fake4\\?(.*&)?tag4=Fake4(&.*)?$`, RuleType: RuleTagged, Function: "min", Retention: nil,
+ aggr: AggrMap["min"], re: regexp.MustCompile(`^fake4\\?(.*&)?tag4=Fake4(&.*)?$`),
+ },
+ {
+ Regexp: ".*", Function: "max", Retention: []Retention{
+ {Age: 0, Precision: 60},
+ {Age: 3600, Precision: 300},
+ {Age: 86400, Precision: 3600},
+ },
+ aggr: AggrMap["max"],
+ },
+ },
+ PatternPlain: []Pattern{
+ {
+ Regexp: "click_cost", Function: "any", Retention: []Retention{
+ {Age: 0, Precision: 3600},
+ {Age: 86400, Precision: 60},
+ },
+ aggr: AggrMap["any"], re: regexp.MustCompile("click_cost"),
+ },
+ {
+ Regexp: "without_function", Function: "", Retention: []Retention{
+ {Age: 0, Precision: 3600},
+ {Age: 86400, Precision: 60},
+ },
+ re: regexp.MustCompile("without_function"),
+ },
+ {
+ Regexp: "without_retention", RuleType: RulePlain, Function: "min", Retention: nil,
+ aggr: AggrMap["min"], re: regexp.MustCompile("without_retention"),
+ },
+ {
+ Regexp: ".*", Function: "max", Retention: []Retention{
+ {Age: 0, Precision: 60},
+ {Age: 3600, Precision: 300},
+ {Age: 86400, Precision: 3600},
+ },
+ aggr: AggrMap["max"],
+ },
+ },
+ PatternTagged: []Pattern{
+ {
+ Regexp: "click_cost", Function: "any", Retention: []Retention{
+ {Age: 0, Precision: 3600},
+ {Age: 86400, Precision: 60},
+ },
+ aggr: AggrMap["any"], re: regexp.MustCompile("click_cost"),
+ },
+ {
+ Regexp: "without_function", Function: "", Retention: []Retention{
+ {Age: 0, Precision: 3600},
+ {Age: 86400, Precision: 60},
+ },
+ re: regexp.MustCompile("without_function"),
+ },
+ {
+ Regexp: `^((.*)|.)sum\?`, RuleType: RuleTagged, Function: "sum", Retention: nil,
+ aggr: AggrMap["sum"], re: regexp.MustCompile(`^((.*)|.)sum\?`),
+ },
+ {
+ Regexp: `^fake3\?(.*&)?tag=Fake3(&.*)?$`, RuleType: RuleTagged, Function: "min", Retention: nil,
+ aggr: AggrMap["min"], re: regexp.MustCompile(`^fake3\?(.*&)?tag=Fake3(&.*)?$`),
+ },
+ {
+ Regexp: `^fake4\\?(.*&)?tag4=Fake4(&.*)?$`, RuleType: RuleTagged, Function: "min", Retention: nil,
+ aggr: AggrMap["min"], re: regexp.MustCompile(`^fake4\\?(.*&)?tag4=Fake4(&.*)?$`),
+ },
+ {
+ Regexp: ".*", Function: "max", Retention: []Retention{
+ {Age: 0, Precision: 60},
+ {Age: 3600, Precision: 300},
+ {Age: 86400, Precision: 3600},
+ },
+ aggr: AggrMap["max"],
+ },
+ },
+ })
+
+ t.Run("default", func(t *testing.T) {
+ assert := assert.New(t)
+ r, err := parseXML([]byte(config))
+ require.NoError(t, err)
+ assert.Equal(expected, r)
+
+ // check sorting
+ assert.Equal(uint32(0), r.Pattern[0].Retention[0].Age)
+ assert.Equal(uint32(3600), r.Pattern[0].Retention[0].Precision)
+ })
+
+ t.Run("inside yandex tag", func(t *testing.T) {
+ assert := assert.New(t)
+ r, err := parseXML([]byte(fmt.Sprintf("%s", config)))
+ assert.NoError(err)
+ assert.Equal(expected, r)
+ })
+}
diff --git a/render/data/query.go b/render/data/query.go
index e271bcdb5..bda64f912 100644
--- a/render/data/query.go
+++ b/render/data/query.go
@@ -288,7 +288,7 @@ func (c *conditions) prepareLookup() {
aggName := ""
for i := range c.metricsRequested {
- step, agg := c.rollupRules.Lookup(c.metricsLookup[i], age)
+ step, agg, _, _ := c.rollupRules.Lookup(c.metricsLookup[i], age, false)
// Override agregation with an argument of consolidateBy function.
// consolidateBy with its argument is passed through FilteringFunctions field of carbonapi_v3_pb protocol.