Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: set correct clickhouse aggregation functions when using consolidateBy #281

Merged
merged 7 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions cmd/e2e-test/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strings"
"time"

"github.com/go-graphite/protocol/carbonapi_v3_pb"
"github.com/lomik/graphite-clickhouse/helper/client"
"github.com/lomik/graphite-clickhouse/helper/datetime"
"github.com/lomik/graphite-clickhouse/helper/tests/compare"
Expand Down Expand Up @@ -271,6 +272,24 @@ func compareRender(errors *[]string, name, url string, actual, expected []client
}
}

func parseFilteringFunctions(strFilteringFuncs []string) ([]*carbonapi_v3_pb.FilteringFunction, error) {
res := make([]*carbonapi_v3_pb.FilteringFunction, 0, len(strFilteringFuncs))
for _, strFF := range strFilteringFuncs {
strFFSplit := strings.Split(strFF, "(")
if len(strFFSplit) != 2 {
return nil, fmt.Errorf("could not parse filtering function: %s", strFF)
}
name := strFFSplit[0]
args := strings.Split(strFFSplit[1], ",")
for i := range args {
args[i] = strings.TrimSpace(args[i])
args[i] = strings.Trim(args[i], ")'")
}
res = append(res, &carbonapi_v3_pb.FilteringFunction{Name: name, Arguments: args})
}
return res, nil
}

func verifyRender(ch *Clickhouse, gch *GraphiteClickhouse, check *RenderCheck, defaultPreision time.Duration) []string {
var errors []string
httpClient := http.Client{
Expand All @@ -280,7 +299,18 @@ func verifyRender(ch *Clickhouse, gch *GraphiteClickhouse, check *RenderCheck, d
from := datetime.TimestampTruncate(check.from, defaultPreision)
until := datetime.TimestampTruncate(check.until, defaultPreision)
for _, format := range check.Formats {
if url, result, respHeader, err := client.Render(&httpClient, address, format, check.Targets, from, until); err == nil {

var filteringFunctions []*carbonapi_v3_pb.FilteringFunction
if format == client.FormatPb_v3 {
var err error
filteringFunctions, err = parseFilteringFunctions(check.FilteringFunctions)
if err != nil {
errors = append(errors, err.Error())
continue
}
}

if url, result, respHeader, err := client.Render(&httpClient, address, format, check.Targets, filteringFunctions, check.MaxDataPoints, from, until); err == nil {
id := requestId(respHeader)
name := ""
if check.ErrorRegexp != "" {
Expand All @@ -303,7 +333,7 @@ func verifyRender(ch *Clickhouse, gch *GraphiteClickhouse, check *RenderCheck, d
if check.CacheTTL > 0 && check.ErrorRegexp == "" {
// second query must be find-cached
name = "cache"
if url, result, respHeader, err = client.Render(&httpClient, address, format, check.Targets, from, until); err == nil {
if url, result, respHeader, err = client.Render(&httpClient, address, format, check.Targets, filteringFunctions, check.MaxDataPoints, from, until); err == nil {
compareRender(&errors, name, url, result, check.result, true, respHeader, check.CacheTTL)
} else {
errStr := strings.TrimRight(err.Error(), "\n")
Expand Down
19 changes: 12 additions & 7 deletions cmd/e2e-test/e2etesting.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,15 @@ type Metric struct {
}

type RenderCheck struct {
Name string `toml:"name"`
Formats []client.FormatType `toml:"formats"`
From string `toml:"from"`
Until string `toml:"until"`
Targets []string `toml:"targets"`
Timeout time.Duration `toml:"timeout"`
DumpIfEmpty []string `toml:"dump_if_empty"`
Name string `toml:"name"`
Formats []client.FormatType `toml:"formats"`
From string `toml:"from"`
Until string `toml:"until"`
Targets []string `toml:"targets"`
MaxDataPoints int64 `toml:"max_data_points"`
FilteringFunctions []string `toml:"filtering_functions"`
Timeout time.Duration `toml:"timeout"`
DumpIfEmpty []string `toml:"dump_if_empty"`

Optimize []string `toml:"optimize"` // optimize tables before run tests

Expand Down Expand Up @@ -338,6 +340,7 @@ func verifyGraphiteClickhouse(test *TestSchema, gch *GraphiteClickhouse, clickho
zap.String("clickhouse config", clickhouseDir),
zap.String("graphite-clickhouse config", gch.ConfigTpl),
zap.Strings("targets", check.Targets),
zap.Strings("filtering_functions", check.FilteringFunctions),
zap.String("from_raw", check.From),
zap.String("until_raw", check.Until),
zap.Int64("from", check.from),
Expand All @@ -361,6 +364,7 @@ func verifyGraphiteClickhouse(test *TestSchema, gch *GraphiteClickhouse, clickho
zap.String("clickhouse config", clickhouseDir),
zap.String("graphite-clickhouse config", gch.ConfigTpl),
zap.Strings("targets", check.Targets),
zap.Strings("filtering_functions", check.FilteringFunctions),
zap.String("from_raw", check.From),
zap.String("until_raw", check.Until),
zap.Int64("from", check.from),
Expand All @@ -377,6 +381,7 @@ func verifyGraphiteClickhouse(test *TestSchema, gch *GraphiteClickhouse, clickho
zap.String("clickhouse config", clickhouseDir),
zap.String("graphite-clickhouse config", gch.ConfigTpl),
zap.Strings("targets", check.Targets),
zap.Strings("filtering_functions", check.FilteringFunctions),
zap.String("from_raw", check.From),
zap.String("until_raw", check.Until),
zap.Int64("from", check.from),
Expand Down
10 changes: 9 additions & 1 deletion cmd/graphite-clickhouse-client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import (
"fmt"
"net/http"
"os"
"strconv"
"strings"
"time"

"github.com/go-graphite/protocol/carbonapi_v3_pb"
"github.com/lomik/graphite-clickhouse/helper/client"
"github.com/lomik/graphite-clickhouse/helper/datetime"
)
Expand All @@ -31,6 +33,7 @@ func main() {
address := flag.String("address", "http://127.0.0.1:9090", "Address of graphite-clickhouse server")
fromStr := flag.String("from", "0", "from")
untilStr := flag.String("until", "", "until")
maxDataPointsStr := flag.String("maxDataPoints", "1048576", "Maximum amount of datapoints in response")

metricsFind := flag.String("find", "", "Query for /metrics/find/ , valid formats are carbonapi_v3_pb. protobuf, pickle")

Expand Down Expand Up @@ -71,6 +74,11 @@ func main() {
fmt.Printf("invalid until: %s\n", *untilStr)
os.Exit(1)
}
maxDataPoints, err := strconv.ParseInt(*maxDataPointsStr, 10, 64)
if err != nil {
fmt.Printf("invalid maxDataPoints: %s\n", *maxDataPointsStr)
os.Exit(1)
}

httpClient := http.Client{
Timeout: *timeout,
Expand Down Expand Up @@ -182,7 +190,7 @@ func main() {
if formatRender == client.FormatDefault {
formatRender = client.FormatPb_v3
}
queryRaw, r, respHeader, err := client.Render(&httpClient, *address, formatRender, targets, int64(from), int64(until))
queryRaw, r, respHeader, err := client.Render(&httpClient, *address, formatRender, targets, []*carbonapi_v3_pb.FilteringFunction{}, maxDataPoints, int64(from), int64(until))
if respHeader != nil {
fmt.Printf("Responce header: %+v\n", respHeader)
}
Expand Down
22 changes: 13 additions & 9 deletions helper/client/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type Metric struct {

// Render do /metrics/find/ request
// Valid formats are carbonapi_v3_pb. protobuf, pickle, json
func Render(client *http.Client, address string, format FormatType, targets []string, from, until int64) (string, []Metric, http.Header, error) {
func Render(client *http.Client, address string, format FormatType, targets []string, filteringFunctions []*protov3.FilteringFunction, maxDataPoints, from, until int64) (string, []Metric, http.Header, error) {
rUrl := "/render/"
if format == FormatDefault {
format = FormatPb_v3
Expand All @@ -56,6 +56,7 @@ func Render(client *http.Client, address string, format FormatType, targets []st
}
fromStr := strconv.FormatInt(from, 10)
untilStr := strconv.FormatInt(until, 10)
maxDataPointsStr := strconv.FormatInt(maxDataPoints, 10)

u, err := url.Parse(address + rUrl)
if err != nil {
Expand All @@ -77,10 +78,12 @@ func Render(client *http.Client, address string, format FormatType, targets []st
}
for i, target := range targets {
r.Metrics[i] = protov3.FetchRequest{
Name: target,
StartTime: from,
StopTime: until,
PathExpression: target,
Name: target,
StartTime: from,
StopTime: until,
PathExpression: target,
FilterFunctions: filteringFunctions,
MaxDataPoints: maxDataPoints,
}
}

Expand All @@ -93,10 +96,11 @@ func Render(client *http.Client, address string, format FormatType, targets []st
}
case FormatPb_v2, FormatProtobuf, FormatPickle, FormatJSON:
v := url.Values{
"format": []string{format.String()},
"from": []string{fromStr},
"until": []string{untilStr},
"target": targets,
"format": []string{format.String()},
"from": []string{fromStr},
"until": []string{untilStr},
"target": targets,
"maxDataPoints": []string{maxDataPointsStr},
}
u.RawQuery = v.Encode()
default:
Expand Down
8 changes: 6 additions & 2 deletions render/data/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,14 @@ func (d *Data) GetAggregation(id uint32) (string, error) {
if err != nil {
return function, err
}
if function == "any" || function == "anyLast" {
switch function {
case "any":
return "first", nil
case "anyLast":
return "last", nil
default:
return function, nil
}
return function, nil
}

// data wraps Data and adds asynchronous processing of data
Expand Down
17 changes: 14 additions & 3 deletions render/data/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/tls"
"errors"
"fmt"
"net/http"
"os"
"strings"
"sync"
Expand All @@ -15,6 +16,7 @@ import (

"github.com/lomik/graphite-clickhouse/config"
"github.com/lomik/graphite-clickhouse/helper/clickhouse"
"github.com/lomik/graphite-clickhouse/helper/errs"
"github.com/lomik/graphite-clickhouse/helper/rollup"
"github.com/lomik/graphite-clickhouse/metrics"
"github.com/lomik/graphite-clickhouse/pkg/dry"
Expand Down Expand Up @@ -145,7 +147,11 @@ func (q *query) getDataPoints(ctx context.Context, cond *conditions) error {
// carbonlink request
carbonlinkResponseRead := queryCarbonlink(ctx, carbonlink, cond.metricsUnreverse)

cond.prepareLookup()
err = cond.prepareLookup()
if err != nil {
logger.Error("prepare_lookup", zap.Error(err))
return errs.NewErrorWithCode(err.Error(), http.StatusBadRequest)
}
cond.setStep(q.cStep)
if cond.step < 1 {
return ErrSetStepTimeout
Expand Down Expand Up @@ -279,7 +285,7 @@ func (c *conditions) prepareMetricsLists() {
}
}

func (c *conditions) prepareLookup() {
func (c *conditions) prepareLookup() error {
age := uint32(dry.Max(0, time.Now().Unix()-c.From))
c.aggregations = make(map[string][]string)
c.appliedFunctions = make(map[string][]string)
Expand All @@ -295,7 +301,11 @@ func (c *conditions) prepareLookup() {
// Currently it just finds the first target matching the metric
// to avoid making multiple request for every type of aggregation for a given metric.
for _, alias := range c.AM.Get(c.metricsUnreverse[i]) {
if requestedAgg := c.GetRequestedAggregation(alias.Target); requestedAgg != "" {
requestedAgg, err := c.GetRequestedAggregation(alias.Target)
if err != nil {
return fmt.Errorf("failed to choose appropriate aggregation for '%s': %s", alias.Target, err.Error())
}
if requestedAgg != "" {
agg = rollup.AggrMap[requestedAgg]
c.appliedFunctions[alias.Target] = []string{graphiteConsolidationFunction}
break
Expand Down Expand Up @@ -330,6 +340,7 @@ func (c *conditions) prepareLookup() {
mm.WriteString(c.metricsRequested[i] + "\n")
}
}
return nil
}

var ErrSetStepTimeout = errors.New("unexpected error, setStep timeout")
Expand Down
46 changes: 35 additions & 11 deletions render/data/targets.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,23 +131,47 @@ TableLoop:
return fmt.Errorf("data tables is not specified for %v", tt.List[0])
}

func (tt *Targets) GetRequestedAggregation(target string) string {
func (tt *Targets) GetRequestedAggregation(target string) (string, error) {
if ffs, ok := tt.filteringFunctionsByTarget[target]; !ok {
return ""
return "", nil
} else {
for _, filteringFunc := range ffs {
ffName := filteringFunc.GetName()
ffArgs := filteringFunc.GetArguments()
if ffName == graphiteConsolidationFunction && len(ffArgs) > 0 {
// Graphite standard supports both average and avg.
// It is the only aggregation that has two aliases.
// https://graphite.readthedocs.io/en/latest/functions.html#graphite.render.functions.consolidateBy
if ffArgs[0] == "average" {
return "avg"
}
return ffArgs[0]

if ffName != graphiteConsolidationFunction {
continue
}

if len(ffArgs) < 1 {
return "", fmt.Errorf("no argumets were provided to consolidateBy function")
}

switch ffArgs[0] {
// 'last' in graphite == clickhouse aggregate function 'anyLast'
case "last":
return "anyLast", nil
// 'first' in graphite == clickhouse aggregate function 'any'
case "first":
return "any", nil
// Graphite standard supports both average and avg.
// It is the only aggregation that has two aliases.
// https://graphite.readthedocs.io/en/latest/functions.html#graphite.render.functions.consolidateBy
case "average":
return "avg", nil
// avg, sum, max, min have the same name in clickhouse
case "avg", "sum", "max", "min":
return ffArgs[0], nil
default:
return "",
fmt.Errorf(
"unknown \"%s\" argument function (allowed argumets are: 'avg', 'average', 'sum', 'max', 'min', 'last', 'first'): recieved %s",
graphiteConsolidationFunction,
ffArgs[0],
)

}
}
}
return ""
return "", nil
}
45 changes: 45 additions & 0 deletions tests/consolidateBy/carbon-clickhouse.conf.tpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
[common]

[data]
path = "/etc/carbon-clickhouse/data"
chunk-interval = "1s"
chunk-auto-interval = ""

[upload.graphite_index]
type = "index"
table = "graphite_index"
url = "{{ .CLICKHOUSE_URL }}/"
timeout = "2m30s"
cache-ttl = "1h"

[upload.graphite_tags]
type = "tagged"
table = "graphite_tags"
threads = 3
url = "{{ .CLICKHOUSE_URL }}/"
timeout = "2m30s"
cache-ttl = "1h"

[upload.graphite_reverse]
type = "points-reverse"
table = "graphite_reverse"
url = "{{ .CLICKHOUSE_URL }}/"
timeout = "2m30s"
zero-timestamp = false

[upload.graphite]
type = "points"
table = "graphite"
url = "{{ .CLICKHOUSE_URL }}/"
timeout = "2m30s"
zero-timestamp = false

[tcp]
listen = ":2003"
enabled = true
drop-future = "0s"
drop-past = "0s"

[logging]
file = "/etc/carbon-clickhouse/carbon-clickhouse.log"
level = "debug"
Loading
Loading