Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MaxDataPoints consolidation improvements: support nudging for consistent bucketing #842

Merged
merged 2 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/carbonapi/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ type ConfigType struct {
MaxQueryLength uint64 `mapstructure:"maxQueryLength"`
CombineMultipleTargetsInOne bool `mapstructure:"combineMultipleTargetsInOne"`

NudgeStartTimeOnAggregation bool `mapstructure:"nudgeStartTimeOnAggregation"`
UseBucketsHighestTimestampOnAggregation bool `mapstructure:"useBucketsHighestTimestampOnAggregation"`

ResponseCache cache.BytesCache `mapstructure:"-" json:"-"`
BackendCache cache.BytesCache `mapstructure:"-" json:"-"`

Expand Down
7 changes: 7 additions & 0 deletions cmd/carbonapi/config/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
fconfig "github.com/go-graphite/carbonapi/expr/functions/config"
"github.com/go-graphite/carbonapi/expr/helper"
"github.com/go-graphite/carbonapi/expr/rewrite"
tconfig "github.com/go-graphite/carbonapi/expr/types/config"
"github.com/go-graphite/carbonapi/limiter"
"github.com/go-graphite/carbonapi/pkg/parser"
zipperTypes "github.com/go-graphite/carbonapi/zipper/types"
Expand Down Expand Up @@ -257,6 +258,9 @@ func SetUpConfig(logger *zap.Logger, BuildVersion string) {
)
}

tconfig.Config.NudgeStartTimeOnAggregation = Config.NudgeStartTimeOnAggregation
tconfig.Config.UseBucketsHighestTimestampOnAggregation = Config.UseBucketsHighestTimestampOnAggregation

if Config.Listen != "" {
listeners := make(map[string]struct{})
for _, l := range Config.Listeners {
Expand Down Expand Up @@ -401,6 +405,9 @@ func SetUpViper(logger *zap.Logger, configPath *string, exactConfig bool, viperP
viper.SetDefault("useCachingDNSResolver", false)
viper.SetDefault("logger", map[string]string{})
viper.SetDefault("combineMultipleTargetsInOne", false)
viper.SetDefault("nudgeStartTimeOnAggregation", false)
viper.SetDefault("useBucketsHighestTimestampOnAggregation", false)

viper.AutomaticEnv()

var err error
Expand Down
27 changes: 27 additions & 0 deletions doc/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ Table of Contents
* [For IRONdb](#for-irondb)
* [expireDelaySec](#expiredelaysec)
* [Example](#example-21)
* [nudgeStartTimeOnAggregation](#nudgestarttimeonaggregation)
* [useBucketsHighestTimestampOnAggregation](#usebucketshighesttimestamponaggregation)

# General configuration for carbonapi

Expand Down Expand Up @@ -940,3 +942,28 @@ Default: 600 (10 minutes)
```yaml
expireDelaySec: 10
```

***
## nudgeStartTimeOnAggregation
Enables nudging the start time of metrics when aggregating to honor MaxDataPoints.
The start time is nudged in such way that timestamps always fall in the same bucket.
This is done by GraphiteWeb, and is useful to avoid jitter in graphs when refreshing the page.

Default: false

### Example
```yaml
nudgeStartTimeOnAggregation: true
```

***
## useBucketsHighestTimestampOnAggregation
Enables using the highest timestamp of the buckets when aggregating to honor MaxDataPoints, instead of the lowest timestamp.
This prevents results to appear to predict the future.

Default: false

### Example
```yaml
useBucketsHighestTimestampOnAggregation: true
```
16 changes: 16 additions & 0 deletions expr/types/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package config

type ConfigType = struct {
// NudgeStartTimeOnAggregation enables nudging the start time of metrics
// when aggregated. The start time is nudged in such way that timestamps
// always fall in the same bucket. This is done by GraphiteWeb, and is
// useful to avoid jitter in graphs when refreshing the page.
NudgeStartTimeOnAggregation bool

// UseBucketsHighestTimestampOnAggregation enables using the highest timestamp of the
// buckets when aggregating to honor MaxDataPoints, instead of the lowest timestamp.
// This prevents results to appear to predict the future.
UseBucketsHighestTimestampOnAggregation bool
}

var Config = ConfigType{}
60 changes: 58 additions & 2 deletions expr/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/go-graphite/carbonapi/expr/consolidations"
"github.com/go-graphite/carbonapi/expr/tags"
"github.com/go-graphite/carbonapi/expr/types/config"
pbv2 "github.com/go-graphite/protocol/carbonapi_v2_pb"
pb "github.com/go-graphite/protocol/carbonapi_v3_pb"
pickle "github.com/lomik/og-rek"
Expand Down Expand Up @@ -141,7 +142,7 @@ func MarshalJSON(results []*MetricData, timestampMultiplier int64, noNullPoints
b = append(b, `,"datapoints":[`...)

var innerComma bool
t := r.StartTime * timestampMultiplier
t := r.AggregatedStartTime() * timestampMultiplier
for _, v := range r.AggregatedValues() {
if noNullPoints && math.IsNaN(v) {
t += r.AggregatedTimeStep() * timestampMultiplier
Expand Down Expand Up @@ -330,6 +331,60 @@ func (r *MetricData) AggregatedTimeStep() int64 {
return r.StepTime * int64(r.ValuesPerPoint)
}

// AggregatedStartTime returns the start time of the aggregated series.
// This can be different from the original start time if NudgeStartTimeOnAggregation
// or UseBucketsHighestTimestampOnAggregation are enabled.
func (r *MetricData) AggregatedStartTime() int64 {
start := r.StartTime + r.nudgePointsCount()*r.StepTime
if config.Config.UseBucketsHighestTimestampOnAggregation {
return start + r.AggregatedTimeStep() - r.StepTime
}
return start
}

// nudgePointsCount returns the number of points to discard at the beginning of
// the series when aggregating. This is done if NudgeStartTimeOnAggregation is
// enabled, and has the purpose of assigning timestamps of a series to buckets
// consistently across different time ranges. To simplify the aggregation
// logic, we discard points at the beginning of the series so that a bucket
// starts right at the beginning. This function calculates how many points to
// discard.
func (r *MetricData) nudgePointsCount() int64 {
if !config.Config.NudgeStartTimeOnAggregation {
return 0
}

if len(r.Values) <= 2*r.ValuesPerPoint {
// There would be less than 2 points after aggregation, removing one would be too impactful.
return 0
}

// Suppose r.StartTime=4, r.StepTime=3 and aggTimeStep=6.
// - ts: 00 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 ...
// - original buckets: -- -- --| | | | | | ...
// - aggregated buckets: -- -- --| | | ...

// We start counting our aggTimeStep buckets at absolute time r.StepTime.
// Notice the following:
// - ts: 00 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 ...
// - bucket #: - - - 1 1 1 1 1 1 2 2 2 2 2 2 3 3 ...
// - (ts-step) % aggTimeStep: - - - 0 1 2 3 4 5 0 1 2 3 4 5 0 1 ...

// Given a timestamp 'ts', we can calculate how far it is from the beginning
// of the nearest bucket to the right by doing:
// * aggTimeStep - ((ts-r.StepTime) % aggTimeStep)
// Using this, we calculate the 'distance' from r.StartTime to the
// nearest bucket to the right. If this distance is less than aggTimeStep,
// then r.StartTime is not the beginning of a bucket. We need to discard
// dist / r.StepTime points (which could be zero if dist < r.StepTime).
aggTimeStep := r.AggregatedTimeStep()
dist := aggTimeStep - ((r.StartTime - r.StepTime) % aggTimeStep)
if dist < aggTimeStep {
return dist / r.StepTime
}
return 0
}

// GetAggregateFunction returns MetricData.AggregateFunction and set it, if it's not yet
func (r *MetricData) GetAggregateFunction() func([]float64) float64 {
if r.AggregateFunction == nil {
Expand Down Expand Up @@ -363,7 +418,8 @@ func (r *MetricData) AggregateValues() {
n := len(r.Values)/r.ValuesPerPoint + 1
aggV := make([]float64, 0, n)

v := r.Values
nudgeCount := r.nudgePointsCount()
v := r.Values[nudgeCount:]

for len(v) >= r.ValuesPerPoint {
val := aggFunc(v[:r.ValuesPerPoint])
Expand Down
Loading
Loading