diff --git a/render/data/ch_response.go b/render/data/ch_response.go index 96cd3a8f9..02414e57b 100644 --- a/render/data/ch_response.go +++ b/render/data/ch_response.go @@ -17,6 +17,7 @@ type CHResponse struct { Until int64 // if true, return points for all metrics, replacing empty results with list of NaN AppendOutEmptySeries bool + AppliedFunctions map[string][]string } // CHResponses is a slice of CHResponse @@ -142,6 +143,7 @@ func (c *CHResponse) ToMultiFetchResponseV3() (*v3pb.MultiFetchResponse, error) XFilesFactor: 0, HighPrecisionTimestamps: false, Values: values, + AppliedFunctions: c.AppliedFunctions[a.Target], RequestStartTime: c.From, RequestStopTime: c.Until, } diff --git a/render/data/multi_target.go b/render/data/multi_target.go index e1ae3bd28..7d1b7ad40 100644 --- a/render/data/multi_target.go +++ b/render/data/multi_target.go @@ -43,6 +43,9 @@ func MFRToMultiTarget(v3Request *v3pb.MultiFetchRequest) MultiTarget { } else { multiTarget[tf] = NewTargetsOne(m.PathExpression, len(v3Request.Metrics), alias.New()) } + if len(m.FilterFunctions) > 0 { + multiTarget[tf].SetFilteringFunctions(m.PathExpression, m.FilterFunctions) + } } } return multiTarget diff --git a/render/data/query.go b/render/data/query.go index 707b64cbb..e271bcdb5 100644 --- a/render/data/query.go +++ b/render/data/query.go @@ -15,6 +15,7 @@ import ( "github.com/lomik/graphite-clickhouse/config" "github.com/lomik/graphite-clickhouse/helper/clickhouse" + "github.com/lomik/graphite-clickhouse/helper/rollup" "github.com/lomik/graphite-clickhouse/metrics" "github.com/lomik/graphite-clickhouse/pkg/dry" "github.com/lomik/graphite-clickhouse/pkg/reverse" @@ -88,6 +89,7 @@ type conditions struct { metricsRequested []string metricsUnreverse []string metricsLookup []string + appliedFunctions map[string][]string } func newQuery(cfg *config.Config, targets int) *query { @@ -239,6 +241,7 @@ func (q *query) getDataPoints(ctx context.Context, cond *conditions) error { From: cond.From, Until: cond.Until, AppendOutEmptySeries: cond.appendEmptySeries, + AppliedFunctions: cond.appliedFunctions, }) return nil } @@ -279,6 +282,7 @@ func (c *conditions) prepareMetricsLists() { func (c *conditions) prepareLookup() { age := uint32(dry.Max(0, time.Now().Unix()-c.From)) c.aggregations = make(map[string][]string) + c.appliedFunctions = make(map[string][]string) c.extDataBodies = make(map[string]*strings.Builder) c.steps = make(map[uint32][]string) aggName := "" @@ -286,6 +290,18 @@ func (c *conditions) prepareLookup() { for i := range c.metricsRequested { step, agg := c.rollupRules.Lookup(c.metricsLookup[i], age) + // Override agregation with an argument of consolidateBy function. + // consolidateBy with its argument is passed through FilteringFunctions field of carbonapi_v3_pb protocol. + // Currently it just finds the first target matching the metric + // to avoid making multiple request for every type of aggregation for a given metric. + for _, alias := range c.AM.Get(c.metricsUnreverse[i]) { + if requestedAgg := c.GetRequestedAggregation(alias.Target); requestedAgg != "" { + agg = rollup.AggrMap[requestedAgg] + c.appliedFunctions[alias.Target] = []string{graphiteConsolidationFunction} + break + } + } + if _, ok := c.steps[step]; !ok { c.steps[step] = make([]string, 0) } diff --git a/render/data/query_test.go b/render/data/query_test.go index 2e875e4d0..272e036e9 100644 --- a/render/data/query_test.go +++ b/render/data/query_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + v3pb "github.com/go-graphite/protocol/carbonapi_v3_pb" "github.com/lomik/graphite-clickhouse/finder" "github.com/lomik/graphite-clickhouse/helper/date" "github.com/lomik/graphite-clickhouse/helper/rollup" @@ -304,6 +305,56 @@ func TestPrepareLookup(t *testing.T) { assert.Equal(t, steps, cond.steps) assert.Equal(t, aggregations, cond.aggregations) }) + + t.Run("non-reverse query with overriden aggregation", func(t *testing.T) { + cond := newCondition(5400, 1800, 5) + + cond.aggregated = true + cond.isReverse = false + cond.prepareMetricsLists() + sort.Strings(cond.metricsLookup) + sort.Strings(cond.metricsRequested) + sort.Strings(cond.metricsUnreverse) + var aggregations map[string][]string + for _, aggrStr := range []string{"avg", "min", "max", "sum"} { + cond.SetFilteringFunctions( + "*.name.*", + []*v3pb.FilteringFunction{{Name: "consolidateBy", Arguments: []string{aggrStr}}}, + ) + cond.prepareLookup() + aggregations = map[string][]string{ + aggrStr: {"10_min.name.any", "1_min.name.avg", "5_min.name.min", "5_sec.name.max"}, + } + assert.Equal(t, aggregations, cond.aggregations) + } + + // Steps saves only values, not the metrics list + steps := map[uint32][]string{ + 30: {}, + 60: {}, + 300: {}, + 1200: {}, + } + assert.Equal(t, steps, cond.steps) + bodies := make(map[string]string) + for a, m := range aggregations { + bodies[a] = strings.Join(m, "\n") + "\n" + } + assert.Equal(t, bodies, extTableString(cond.extDataBodies)) + + cond.From = ageToTimestamp(1800) + cond.Until = ageToTimestamp(0) + cond.prepareLookup() + steps = map[uint32][]string{ + 30: {}, + 60: {}, + 300: {}, + 5: {}, + } + assert.Equal(t, steps, cond.steps) + assert.Equal(t, aggregations, cond.aggregations) + assert.Equal(t, bodies, extTableString(cond.extDataBodies)) + }) } func TestSetStep(t *testing.T) { diff --git a/render/data/targets.go b/render/data/targets.go index c6da231d3..0a7775cc0 100644 --- a/render/data/targets.go +++ b/render/data/targets.go @@ -4,12 +4,16 @@ import ( "fmt" "time" + v3pb "github.com/go-graphite/protocol/carbonapi_v3_pb" "github.com/lomik/graphite-clickhouse/config" "github.com/lomik/graphite-clickhouse/helper/rollup" "github.com/lomik/graphite-clickhouse/metrics" "github.com/lomik/graphite-clickhouse/pkg/alias" ) +const graphiteConsolidationFunction = "consolidateBy" + +type FilteringFunctionsByTarget map[string][]*v3pb.FilteringFunction type Cache struct { Cached bool TS int64 // cached timestamp @@ -26,30 +30,35 @@ type Targets struct { Cache []Cache Cached bool // all is cached // AM stores found expanded metrics - AM *alias.Map - pointsTable string - isReverse bool - rollupRules *rollup.Rules - rollupUseReverted bool - queryMetrics *metrics.QueryMetrics + AM *alias.Map + filteringFunctionsByTarget FilteringFunctionsByTarget + pointsTable string + isReverse bool + rollupRules *rollup.Rules + rollupUseReverted bool + queryMetrics *metrics.QueryMetrics } func NewTargets(list []string, am *alias.Map) *Targets { - return &Targets{ - List: list, - Cache: make([]Cache, len(list)), - AM: am, + targets := &Targets{ + List: list, + Cache: make([]Cache, len(list)), + AM: am, + filteringFunctionsByTarget: make(FilteringFunctionsByTarget), } + return targets } func NewTargetsOne(target string, capacity int, am *alias.Map) *Targets { list := make([]string, 1, capacity) list[0] = target - return &Targets{ - List: list, - Cache: make([]Cache, 1, capacity), - AM: am, + targets := &Targets{ + List: list, + Cache: make([]Cache, len(list)), + AM: am, + filteringFunctionsByTarget: make(FilteringFunctionsByTarget), } + return targets } func (tt *Targets) Append(target string) { @@ -57,6 +66,10 @@ func (tt *Targets) Append(target string) { tt.Cache = append(tt.Cache, Cache{}) } +func (tt *Targets) SetFilteringFunctions(target string, filteringFunctions []*v3pb.FilteringFunction) { + tt.filteringFunctionsByTarget[target] = filteringFunctions +} + func (tt *Targets) selectDataTable(cfg *config.Config, tf *TimeFrame, context string) error { now := time.Now().Unix() @@ -117,3 +130,18 @@ TableLoop: return fmt.Errorf("data tables is not specified for %v", tt.List[0]) } + +func (tt *Targets) GetRequestedAggregation(target string) string { + if ffs, ok := tt.filteringFunctionsByTarget[target]; !ok { + return "" + } else { + for _, filteringFunc := range ffs { + ffName := filteringFunc.GetName() + ffArgs := filteringFunc.GetArguments() + if ffName == graphiteConsolidationFunction && len(ffArgs) > 0 { + return ffArgs[0] + } + } + } + return "" +}