Skip to content

Commit

Permalink
Merge pull request #830 from mchrome/feat/add-support-for-consolidateBy
Browse files Browse the repository at this point in the history
feat(functions): add a config option to pass consolidateBy to the storage backend
  • Loading branch information
Civil authored Apr 25, 2024
2 parents 9b5b4a4 + ad2e1cf commit 38ac020
Show file tree
Hide file tree
Showing 97 changed files with 682 additions and 663 deletions.
3 changes: 2 additions & 1 deletion cmd/carbonapi/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ type ConfigType struct {
SendGlobsAsIs *bool `mapstructure:"sendGlobsAsIs"`
AlwaysSendGlobsAsIs *bool `mapstructure:"alwaysSendGlobsAsIs"`
ExtractTagsFromArgs bool `mapstructure:"extractTagsFromArgs"`
PassFunctionsToBackend bool `mapstructure:"passFunctionsToBackend"`
MaxBatchSize int `mapstructure:"maxBatchSize"`
Zipper string `mapstructure:"zipper"`
Upstreams zipperCfg.Config `mapstructure:"upstreams"`
Expand Down Expand Up @@ -138,7 +139,7 @@ func (c ConfigType) String() string {

func (c *ConfigType) SetZipper(zipper zipper.CarbonZipper) (err error) {
c.ZipperInstance = zipper
c.Evaluator, err = expr.NewEvaluator(c.Limiter, c.ZipperInstance)
c.Evaluator, err = expr.NewEvaluator(c.Limiter, c.ZipperInstance, c.PassFunctionsToBackend)
return
}

Expand Down
16 changes: 12 additions & 4 deletions expr/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import (
var ErrZipperNotInit = errors.New("zipper not initialized")

type Evaluator struct {
limiter limiter.SimpleLimiter
zipper zipper.CarbonZipper
limiter limiter.SimpleLimiter
zipper zipper.CarbonZipper
passFunctionsToBackend bool
}

func (eval Evaluator) Fetch(ctx context.Context, exprs []parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) (map[parser.MetricRequest][]*types.MetricData, error) {
Expand Down Expand Up @@ -53,6 +54,13 @@ func (eval Evaluator) Fetch(ctx context.Context, exprs []parser.Expr, from, unti
Until: fetchRequest.StopTime,
}

if eval.passFunctionsToBackend && m.ConsolidationFunc != "" {
fetchRequest.FilterFunctions = append(fetchRequest.FilterFunctions, &pb.FilteringFunction{
Name: "consolidateBy",
Arguments: []string{m.ConsolidationFunc},
})
}

if exp.Target() == "fallbackSeries" {
haveFallbackSeries = true
}
Expand Down Expand Up @@ -140,11 +148,11 @@ func (eval Evaluator) Eval(ctx context.Context, exp parser.Expr, from, until int
}

// NewEvaluator create evaluator with limiter and zipper
func NewEvaluator(limiter limiter.SimpleLimiter, zipper zipper.CarbonZipper) (*Evaluator, error) {
func NewEvaluator(limiter limiter.SimpleLimiter, zipper zipper.CarbonZipper, passFunctionsToBackend bool) (*Evaluator, error) {
if zipper == nil {
return nil, ErrZipperNotInit
}
return &Evaluator{limiter: limiter, zipper: zipper}, nil
return &Evaluator{limiter: limiter, zipper: zipper, passFunctionsToBackend: passFunctionsToBackend}, nil
}

// EvalExpr is the main expression evaluator.
Expand Down
49 changes: 25 additions & 24 deletions expr/expr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ import (
"time"
"unicode"

pb "github.com/go-graphite/protocol/carbonapi_v3_pb"

"github.com/go-graphite/carbonapi/expr/functions"
"github.com/go-graphite/carbonapi/expr/helper"
"github.com/go-graphite/carbonapi/expr/rewrite"
"github.com/go-graphite/carbonapi/expr/types"
"github.com/go-graphite/carbonapi/pkg/parser"
th "github.com/go-graphite/carbonapi/tests"
"github.com/go-graphite/carbonapi/tests/compare"
pb "github.com/go-graphite/protocol/carbonapi_v3_pb"
)

func init() {
Expand Down Expand Up @@ -205,7 +206,7 @@ func TestEvalExpr(t *testing.T) {
&data,
}

eval, err := NewEvaluator(nil, th.NewTestZipper(nil))
eval, err := NewEvaluator(nil, th.NewTestZipper(nil), false)
if err == nil {
_, err = EvalExpr(context.Background(), eval, exp, request.From, request.Until, metricMap)
}
Expand All @@ -224,7 +225,7 @@ func TestEvalExpression(t *testing.T) {
{
"metric",
map[parser.MetricRequest][]*types.MetricData{
{"metric", 0, 1}: {types.MakeMetricData("metric", []float64{1, 2, 3, 4, 5}, 1, now32)},
{Metric: "metric", From: 0, Until: 1}: {types.MakeMetricData("metric", []float64{1, 2, 3, 4, 5}, 1, now32)},
},
[]*types.MetricData{types.MakeMetricData("metric", []float64{1, 2, 3, 4, 5}, 1, now32)},
},
Expand All @@ -236,7 +237,7 @@ func TestEvalExpression(t *testing.T) {
{
"metric*",
map[parser.MetricRequest][]*types.MetricData{
{"metric*", 0, 1}: {
{"metric*", "", 0, 1}: {
types.MakeMetricData("metric1", []float64{1, 2, 3, 4, 5}, 1, now32),
types.MakeMetricData("metric2", []float64{2, 3, 4, 5, 6}, 1, now32),
},
Expand All @@ -249,7 +250,7 @@ func TestEvalExpression(t *testing.T) {
{
"reduceSeries(mapSeries(devops.service.*.filter.received.*.count,2), \"asPercent\", 5,\"valid\",\"total\")",
map[parser.MetricRequest][]*types.MetricData{
{"devops.service.*.filter.received.*.count", 0, 1}: {
{Metric: "devops.service.*.filter.received.*.count", From: 0, Until: 1}: {
types.MakeMetricData("devops.service.server1.filter.received.valid.count", []float64{2, 4, 8}, 1, now32),
types.MakeMetricData("devops.service.server1.filter.received.total.count", []float64{8, 2, 4}, 1, now32),
types.MakeMetricData("devops.service.server2.filter.received.valid.count", []float64{3, 9, 12}, 1, now32),
Expand All @@ -264,7 +265,7 @@ func TestEvalExpression(t *testing.T) {
{
"reduceSeries(mapSeries(devops.service.*.filter.received.*.count,2), \"asPercent\", 5,\"valid\",\"total\")",
map[parser.MetricRequest][]*types.MetricData{
{"devops.service.*.filter.received.*.count", 0, 1}: {
{Metric: "devops.service.*.filter.received.*.count", From: 0, Until: 1}: {
types.MakeMetricData("devops.service.server1.filter.received.total.count", []float64{8, 2, 4}, 1, now32),
types.MakeMetricData("devops.service.server2.filter.received.valid.count", []float64{3, 9, 12}, 1, now32),
types.MakeMetricData("devops.service.server2.filter.received.total.count", []float64{12, 9, 3}, 1, now32),
Expand All @@ -277,7 +278,7 @@ func TestEvalExpression(t *testing.T) {
{
"sumSeries(pow(devops.service.*.filter.received.*.count, 0))",
map[parser.MetricRequest][]*types.MetricData{
{"devops.service.*.filter.received.*.count", 0, 1}: {
{Metric: "devops.service.*.filter.received.*.count", From: 0, Until: 1}: {
types.MakeMetricData("devops.service.server1.filter.received.total.count", []float64{8, 2, 4}, 1, now32),
types.MakeMetricData("devops.service.server2.filter.received.valid.count", []float64{3, 9, 12}, 1, now32),
types.MakeMetricData("devops.service.server2.filter.received.total.count", []float64{math.NaN(), math.NaN(), math.NaN()}, 1, now32),
Expand All @@ -288,7 +289,7 @@ func TestEvalExpression(t *testing.T) {
{
"multiplySeriesWithWildcards(metric1.foo.*.*,1,2)",
map[parser.MetricRequest][]*types.MetricData{
{"metric1.foo.*.*", 0, 1}: {
{Metric: "metric1.foo.*.*", From: 0, Until: 1}: {
types.MakeMetricData("metric1.foo.bar1.baz", []float64{1, 2, 3, 4, 5}, 1, now32),
types.MakeMetricData("metric1.foo.bar2.baz", []float64{11, 12, 13, 14, 15}, 1, now32),
types.MakeMetricData("metric1.foo.bar3.baz", []float64{2, 2, 2, 2, 2}, 1, now32),
Expand All @@ -299,7 +300,7 @@ func TestEvalExpression(t *testing.T) {
{
"groupByNode(metric1foo.*,0,\"asPercent\")",
map[parser.MetricRequest][]*types.MetricData{
{"metric1foo.*", 0, 1}: {
{Metric: "metric1foo.*", From: 0, Until: 1}: {
types.MakeMetricData("metric1foo.bar1.baz", []float64{1, 2, 3, 4, 5}, 1, now32),
types.MakeMetricData("metric1foo.bar1.qux", []float64{6, 7, 8, 9, 10}, 1, now32),
types.MakeMetricData("metric1foo.bar2.baz", []float64{11, 12, 13, 14, 15}, 1, now32),
Expand All @@ -311,7 +312,7 @@ func TestEvalExpression(t *testing.T) {
{
"groupByNodes(test.metric*.foo*,\"keepLastValue\",1,0)",
map[parser.MetricRequest][]*types.MetricData{
{"test.metric*.foo*", 0, 1}: {
{Metric: "test.metric*.foo*", From: 0, Until: 1}: {
types.MakeMetricData("test.metric1.foo1", []float64{0}, 1, now32),
types.MakeMetricData("test.metric1.foo2", []float64{0}, 1, now32),
types.MakeMetricData("test.metric2.foo1", []float64{0}, 1, now32),
Expand All @@ -326,7 +327,7 @@ func TestEvalExpression(t *testing.T) {
{
"groupByNodes(test.metric*.foo*,\"keepLastValue\",1,2)",
map[parser.MetricRequest][]*types.MetricData{
{"test.metric*.foo*", 0, 1}: {
{Metric: "test.metric*.foo*", From: 0, Until: 1}: {
types.MakeMetricData("test.metric1.foo1", []float64{0}, 1, now32),
types.MakeMetricData("test.metric1.foo2", []float64{0}, 1, now32),
types.MakeMetricData("test.metric2.foo1", []float64{0}, 1, now32),
Expand All @@ -343,7 +344,7 @@ func TestEvalExpression(t *testing.T) {
{
"groupByNodes(test.metric*.foo*,\"keepLastValue\",1)",
map[parser.MetricRequest][]*types.MetricData{
{"test.metric*.foo*", 0, 1}: {
{Metric: "test.metric*.foo*", From: 0, Until: 1}: {
types.MakeMetricData("test.metric1.foo1", []float64{0}, 1, now32),
types.MakeMetricData("test.metric1.foo2", []float64{0}, 1, now32),
types.MakeMetricData("test.metric2.foo1", []float64{0}, 1, now32),
Expand All @@ -360,7 +361,7 @@ func TestEvalExpression(t *testing.T) {
for _, tt := range tests {
testName := tt.Target
t.Run(testName, func(t *testing.T) {
eval, err := NewEvaluator(nil, th.NewTestZipper(nil))
eval, err := NewEvaluator(nil, th.NewTestZipper(nil), false)
if err == nil {
th.TestEvalExpr(t, eval, &tt)
} else {
Expand All @@ -387,10 +388,10 @@ func TestRewriteExpr(t *testing.T) {
"metric*",
),
map[parser.MetricRequest][]*types.MetricData{
{"metric*", 0, 1}: {
{Metric: "metric*", From: 0, Until: 1}: {
types.MakeMetricData("metric1", []float64{1, 2, 3}, 1, now32),
},
{"metric1", 0, 1}: {
{Metric: "metric1", From: 0, Until: 1}: {
types.MakeMetricData("metric1", []float64{1, 2, 3}, 1, now32),
},
},
Expand All @@ -406,10 +407,10 @@ func TestRewriteExpr(t *testing.T) {
parser.ArgValue("%.count"),
),
map[parser.MetricRequest][]*types.MetricData{
{"metric*", 0, 1}: {
{Metric: "metric*", From: 0, Until: 1}: {
types.MakeMetricData("metric1", []float64{1, 2, 3}, 1, now32),
},
{"metric1", 0, 1}: {
{Metric: "metric1", From: 0, Until: 1}: {
types.MakeMetricData("metric1", []float64{1, 2, 3}, 1, now32),
},
},
Expand All @@ -426,10 +427,10 @@ func TestRewriteExpr(t *testing.T) {
parser.ArgValue("% count"),
),
map[parser.MetricRequest][]*types.MetricData{
{"metric*", 0, 1}: {
{Metric: "metric*", From: 0, Until: 1}: {
types.MakeMetricData("metric1", []float64{1, 2, 3}, 1, now32),
},
{"metric1", 0, 1}: {
{Metric: "metric1", From: 0, Until: 1}: {
types.MakeMetricData("metric1", []float64{1, 2, 3}, 1, now32),
},
},
Expand All @@ -445,14 +446,14 @@ func TestRewriteExpr(t *testing.T) {
parser.ArgValue("%.count"),
),
map[parser.MetricRequest][]*types.MetricData{
{"foo.metric*", 0, 1}: {
{Metric: "foo.metric*", From: 0, Until: 1}: {
types.MakeMetricData("foo.metric1", []float64{1, 2, 3}, 1, now32),
types.MakeMetricData("foo.metric2", []float64{1, 2, 3}, 1, now32),
},
{"foo.metric1", 0, 1}: {
{Metric: "foo.metric1", From: 0, Until: 1}: {
types.MakeMetricData("foo.metric1", []float64{1, 2, 3}, 1, now32),
},
{"foo.metric2", 0, 1}: {
{Metric: "foo.metric2", From: 0, Until: 1}: {
types.MakeMetricData("foo.metric2", []float64{1, 2, 3}, 1, now32),
},
},
Expand All @@ -463,7 +464,7 @@ func TestRewriteExpr(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
eval, err := NewEvaluator(nil, th.NewTestZipper(nil))
eval, err := NewEvaluator(nil, th.NewTestZipper(nil), false)
if err == nil {
rewritten, newTargets, err := RewriteExpr(context.Background(), eval, tt.e, 0, 1, tt.m)

Expand Down Expand Up @@ -520,7 +521,7 @@ func TestEvalCustomFromUntil(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
originalMetrics := th.DeepClone(tt.m)
exp, _, _ := parser.ParseExpr(tt.target)
eval, err := NewEvaluator(nil, th.NewTestZipper(nil))
eval, err := NewEvaluator(nil, th.NewTestZipper(nil), false)
if err == nil {
g, err := EvalExpr(context.Background(), eval, exp, tt.from, tt.until, tt.m)
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions expr/functions/absolute/function_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestAbsolute(t *testing.T) {
{
"absolute(metric1)",
map[parser.MetricRequest][]*types.MetricData{
{"metric1", 0, 1}: {types.MakeMetricData("metric1", []float64{0, -1, 2, -3, 4, 5}, 1, now32)},
{"metric1", "", 0, 1}: {types.MakeMetricData("metric1", []float64{0, -1, 2, -3, 4, 5}, 1, now32)},
},
[]*types.MetricData{types.MakeMetricData("absolute(metric1)",
[]float64{0, 1, 2, 3, 4, 5}, 1, now32).SetTag("absolute", "1").SetNameTag("absolute(metric1)")},
Expand All @@ -42,5 +42,4 @@ func TestAbsolute(t *testing.T) {
th.TestEvalExpr(t, eval, &tt)
})
}

}
Loading

0 comments on commit 38ac020

Please sign in to comment.