Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rollup: split plain/tagged rules #276

Merged
merged 10 commits into from
May 3, 2024
394 changes: 329 additions & 65 deletions graphite-clickhouse.go

Large diffs are not rendered by default.

61 changes: 47 additions & 14 deletions helper/rollup/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"strconv"
"strings"
"time"
Expand All @@ -16,11 +15,12 @@ import (
)

type rollupRulesResponseRecord struct {
Regexp string `json:"regexp"`
Function string `json:"function"`
Age string `json:"age"`
Precision string `json:"precision"`
IsDefault int `json:"is_default"`
RuleType RuleType `json:"rule_type"`
Regexp string `json:"regexp"`
Function string `json:"function"`
Age string `json:"age"`
Precision string `json:"precision"`
IsDefault int `json:"is_default"`
}
type rollupRulesResponse struct {
Data []rollupRulesResponseRecord `json:"data"`
Expand Down Expand Up @@ -77,6 +77,7 @@ func parseJson(body []byte) (*Rules, error) {
} else {
if last() == nil || last().Regexp != d.Regexp || last().Function != d.Function {
r.Pattern = append(r.Pattern, Pattern{
RuleType: d.RuleType,
Retention: make([]Retention, 0),
Regexp: d.Regexp,
Function: d.Function,
Expand All @@ -103,7 +104,9 @@ func parseJson(body []byte) (*Rules, error) {
return r.compile()
}

func remoteLoad(addr string, tlsConf *tls.Config, table string) (*Rules, error) {
var timeoutRulesLoad = 10 * time.Second

func RemoteLoad(addr string, tlsConf *tls.Config, table string) (*Rules, error) {
var db string
arr := strings.SplitN(table, ".", 2)
if len(arr) == 1 {
Expand All @@ -112,35 +115,65 @@ func remoteLoad(addr string, tlsConf *tls.Config, table string) (*Rules, error)
db, table = arr[0], arr[1]
}

query := fmt.Sprintf(`
SELECT
query := `SELECT
rule_type,
regexp,
function,
age,
precision,
is_default
FROM system.graphite_retentions
ARRAY JOIN Tables AS table
WHERE (table.database = '%s') AND (table.table = '%s')
WHERE (table.database = '` + db + `') AND (table.table = '` + table + `')
ORDER BY
is_default ASC,
priority ASC,
regexp ASC,
age ASC
FORMAT JSON
`, db, table)
FORMAT JSON`

body, _, _, err := clickhouse.Query(
scope.New(context.Background()).WithLogger(zapwriter.Logger("rollup")).WithTable("system.graphite_retentions"),
addr,
query,
clickhouse.Options{
Timeout: 10 * time.Second,
ConnectTimeout: 10 * time.Second,
Timeout: timeoutRulesLoad,
ConnectTimeout: timeoutRulesLoad,
TLSConfig: tlsConf,
},
nil,
)
if err != nil && strings.Contains(err.Error(), " Missing columns: 'rule_type' ") {
// for old version
query = `SELECT
regexp,
function,
age,
precision,
is_default
FROM system.graphite_retentions
ARRAY JOIN Tables AS table
WHERE (table.database = '` + db + `') AND (table.table = '` + table + `')
ORDER BY
is_default ASC,
priority ASC,
regexp ASC,
age ASC
FORMAT JSON`

body, _, _, err = clickhouse.Query(
scope.New(context.Background()).WithLogger(zapwriter.Logger("rollup")).WithTable("system.graphite_retentions"),
addr,
query,
clickhouse.Options{
Timeout: timeoutRulesLoad,
ConnectTimeout: timeoutRulesLoad,
TLSConfig: tlsConf,
},
nil,
)
}

if err != nil {
return nil, err
}
Expand Down
268 changes: 268 additions & 0 deletions helper/rollup/remote_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package rollup

import (
"encoding/json"
"regexp"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -124,3 +125,270 @@ func TestParseJson(t *testing.T) {
assert.NoError(err)
assert.Equal(expected, r)
}

func TestParseJsonTyped(t *testing.T) {
response := `{
"meta":
[
{
"name": "rule_type",
"type": "String"
},
{
"name": "regexp",
"type": "String"
},
{
"name": "function",
"type": "String"
},
{
"name": "age",
"type": "UInt64"
},
{
"name": "precision",
"type": "UInt64"
},
{
"name": "is_default",
"type": "UInt8"
}
],

"data":
[
{
"rule_type": "all",
"regexp": "^hourly",
"function": "",
"age": "0",
"precision": "3600",
"is_default": 0
},
{
"rule_type": "all",
"regexp": "^hourly",
"function": "",
"age": "3600",
"precision": "13600",
"is_default": 0
},
{
"rule_type": "all",
"regexp": "^live",
"function": "",
"age": "0",
"precision": "1",
"is_default": 0
},
{
"rule_type": "plain",
"regexp": "total$",
"function": "sum",
"age": "0",
"precision": "0",
"is_default": 0
},
{
"rule_type": "plain",
"regexp": "min$",
"function": "min",
"age": "0",
"precision": "0",
"is_default": 0
},
{
"rule_type": "plain",
"regexp": "max$",
"function": "max",
"age": "0",
"precision": "0",
"is_default": 0
},
{
"rule_type": "tagged",
"regexp": "^tag_name\\?",
"function": "min"
},
{
"rule_type": "tag_list",
"regexp": "fake3;tag=Fake3",
"function": "sum"
},
{
"rule_type": "all",
"regexp": "",
"function": "max",
"age": "0",
"precision": "60",
"is_default": 1
}
],

"rows": 7,

"statistics":
{
"elapsed": 0.00053715,
"rows_read": 7,
"bytes_read": 1158
}
}`

expected := &Rules{
Separated: true,
Pattern: []Pattern{
{
Regexp: "^hourly",
Retention: []Retention{
{Age: 0, Precision: 3600},
{Age: 3600, Precision: 13600},
},
re: regexp.MustCompile("^hourly"),
},
{
Regexp: "^live",
Retention: []Retention{
{Age: 0, Precision: 1},
},
re: regexp.MustCompile("^live"),
},
{
RuleType: RulePlain,
Regexp: "total$",
Function: "sum",
re: regexp.MustCompile("total$"),
aggr: AggrMap["sum"],
},
{
RuleType: RulePlain,
Regexp: "min$",
Function: "min",
re: regexp.MustCompile("min$"),
aggr: AggrMap["min"],
},
{
RuleType: RulePlain,
Regexp: "max$",
Function: "max",
re: regexp.MustCompile("max$"),
aggr: AggrMap["max"],
},
{
RuleType: RuleTagged,
Regexp: `^tag_name\?`,
Function: "min",
re: regexp.MustCompile(`^tag_name\?`),
aggr: AggrMap["min"],
},
{
RuleType: RuleTagged,
Regexp: `^fake3\?(.*&)?tag=Fake3(&.*)?$`,
Function: "sum",
re: regexp.MustCompile(`^fake3\?(.*&)?tag=Fake3(&.*)?$`),
aggr: AggrMap["sum"],
},
{
Regexp: ".*",
Function: "max",
Retention: []Retention{
{Age: 0, Precision: 60},
},
aggr: AggrMap["max"],
},
},
PatternPlain: []Pattern{
{
Regexp: "^hourly",
Retention: []Retention{
{Age: 0, Precision: 3600},
{Age: 3600, Precision: 13600},
},
re: regexp.MustCompile("^hourly"),
},
{
Regexp: "^live",
Retention: []Retention{
{Age: 0, Precision: 1},
},
re: regexp.MustCompile("^live"),
},
{
RuleType: RulePlain,
Regexp: "total$",
Function: "sum",
re: regexp.MustCompile("total$"),
aggr: AggrMap["sum"],
},
{
RuleType: RulePlain,
Regexp: "min$",
Function: "min",
re: regexp.MustCompile("min$"),
aggr: AggrMap["min"],
},
{
RuleType: RulePlain,
Regexp: "max$",
Function: "max",
re: regexp.MustCompile("max$"),
aggr: AggrMap["max"],
},
{
Regexp: ".*",
Function: "max",
Retention: []Retention{
{Age: 0, Precision: 60},
},
aggr: AggrMap["max"],
},
},
PatternTagged: []Pattern{
{
Regexp: "^hourly",
Retention: []Retention{
{Age: 0, Precision: 3600},
{Age: 3600, Precision: 13600},
},
re: regexp.MustCompile("^hourly"),
},
{
Regexp: "^live",
Retention: []Retention{
{Age: 0, Precision: 1},
},
re: regexp.MustCompile("^live"),
},
{
RuleType: RuleTagged,
Regexp: `^tag_name\?`,
Function: "min",
re: regexp.MustCompile(`^tag_name\?`),
aggr: AggrMap["min"],
},
{
RuleType: RuleTagged,
Regexp: `^fake3\?(.*&)?tag=Fake3(&.*)?$`,
Function: "sum",
re: regexp.MustCompile(`^fake3\?(.*&)?tag=Fake3(&.*)?$`),
aggr: AggrMap["sum"],
},
{
Regexp: ".*",
Function: "max",
Retention: []Retention{
{Age: 0, Precision: 60},
},
aggr: AggrMap["max"],
},
},
}

assert := assert.New(t)

r, err := parseJson([]byte(response))
assert.NotNil(r)
assert.NoError(err)
assert.Equal(expected, r)
}
Loading
Loading