diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 5479bb4d00..599e787b73 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4193,6 +4193,13 @@ The `query_range_config` configures the query splitting and caching in the Corte # CLI flag: -querier.split-queries-by-interval [split_queries_by_interval: | default = 0s] +# Maximum number of splits for a range query, 0 disables it. Uses a multiple of +# `split-queries-by-interval` to maintain the number of splits below the limit. +# If vertical sharding is enabled for a query, the combined total number of +# vertical and interval shards is kept below this limit +# CLI flag: -querier.split-queries-by-interval-max-splits +[split_queries_by_interval_max_splits: | default = 0] + # Mutate incoming queries to align their start and end with their step. # CLI flag: -querier.align-querier-with-step [align_queries_with_step: | default = false] diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index a771c22116..d8bb4a3150 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -487,6 +487,8 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro prometheusCodec, shardedPrometheusCodec, t.Cfg.Querier.LookbackDelta, + t.Cfg.Querier.QueryStoreAfter, + t.Cfg.Querier.MaxDaysOfDataFetched, ) if err != nil { return nil, err diff --git a/pkg/frontend/transport/handler.go b/pkg/frontend/transport/handler.go index bba995988d..20f224e716 100644 --- a/pkg/frontend/transport/handler.go +++ b/pkg/frontend/transport/handler.go @@ -351,6 +351,7 @@ func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString u splitQueries := stats.LoadSplitQueries() dataSelectMaxTime := stats.LoadDataSelectMaxTime() dataSelectMinTime := stats.LoadDataSelectMinTime() + splitInterval := stats.LoadSplitInterval() // Track stats. f.querySeconds.WithLabelValues(userID).Add(wallTime.Seconds()) @@ -425,6 +426,10 @@ func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString u logMessage = append(logMessage, "query_storage_wall_time_seconds", sws) } + if splitInterval > 0 { + logMessage = append(logMessage, "split_interval", splitInterval.String()) + } + if error != nil { s, ok := status.FromError(error) if !ok { diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index e9a80374cd..22b260308d 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -62,6 +62,9 @@ type Config struct { // Limit of number of steps allowed for every subquery expression in a query. MaxSubQuerySteps int64 `yaml:"max_subquery_steps"` + // Max number of days of data fetched for a query, used to calculate appropriate interval and vertical shard size. + MaxDaysOfDataFetched int `yaml:"max_days_of_data_fetched"` + // Directory for ActiveQueryTracker. If empty, ActiveQueryTracker will be disabled and MaxConcurrent will not be applied (!). // ActiveQueryTracker logs queries that were active during the last crash, but logs them on the next startup. // However, we need to use active query tracker, otherwise we cannot limit Max Concurrent queries in the PromQL @@ -131,6 +134,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.Int64Var(&cfg.MaxSubQuerySteps, "querier.max-subquery-steps", 0, "Max number of steps allowed for every subquery expression in query. Number of steps is calculated using subquery range / step. A value > 0 enables it.") f.BoolVar(&cfg.IgnoreMaxQueryLength, "querier.ignore-max-query-length", false, "If enabled, ignore max query length check at Querier select method. Users can choose to ignore it since the validation can be done before Querier evaluation like at Query Frontend or Ruler.") f.BoolVar(&cfg.EnablePromQLExperimentalFunctions, "querier.enable-promql-experimental-functions", false, "[Experimental] If true, experimental promQL functions are enabled.") + f.IntVar(&cfg.MaxDaysOfDataFetched, "querier.max-days-of-data-fetched", 0, "Max number of days of data fetched for a query. This can be used to calculate appropriate interval and vertical shard size dynamically.") } // Validate the config diff --git a/pkg/querier/stats/stats.go b/pkg/querier/stats/stats.go index d4eda0f756..cdf432718d 100644 --- a/pkg/querier/stats/stats.go +++ b/pkg/querier/stats/stats.go @@ -21,6 +21,7 @@ type QueryStats struct { Priority int64 DataSelectMaxTime int64 DataSelectMinTime int64 + SplitInterval time.Duration m sync.Mutex } @@ -287,6 +288,14 @@ func (s *QueryStats) LoadDataSelectMinTime() int64 { return atomic.LoadInt64(&s.DataSelectMinTime) } +func (s *QueryStats) LoadSplitInterval() time.Duration { + if s == nil { + return 0 + } + + return s.SplitInterval +} + func (s *QueryStats) AddStoreGatewayTouchedPostings(count uint64) { if s == nil { return diff --git a/pkg/querier/tripperware/queryrange/query_range_middlewares.go b/pkg/querier/tripperware/queryrange/query_range_middlewares.go index 8af1ba1f84..af8e003661 100644 --- a/pkg/querier/tripperware/queryrange/query_range_middlewares.go +++ b/pkg/querier/tripperware/queryrange/query_range_middlewares.go @@ -34,11 +34,12 @@ const day = 24 * time.Hour // Config for query_range middleware chain. type Config struct { - SplitQueriesByInterval time.Duration `yaml:"split_queries_by_interval"` - AlignQueriesWithStep bool `yaml:"align_queries_with_step"` - ResultsCacheConfig `yaml:"results_cache"` - CacheResults bool `yaml:"cache_results"` - MaxRetries int `yaml:"max_retries"` + SplitQueriesByInterval time.Duration `yaml:"split_queries_by_interval"` + SplitQueriesByIntervalMaxSplits int `yaml:"split_queries_by_interval_max_splits"` + AlignQueriesWithStep bool `yaml:"align_queries_with_step"` + ResultsCacheConfig `yaml:"results_cache"` + CacheResults bool `yaml:"cache_results"` + MaxRetries int `yaml:"max_retries"` // List of headers which query_range middleware chain would forward to downstream querier. ForwardHeaders flagext.StringSlice `yaml:"forward_headers_list"` @@ -50,6 +51,7 @@ type Config struct { func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.MaxRetries, "querier.max-retries-per-request", 5, "Maximum number of retries for a single request; beyond this, the downstream error is returned.") f.DurationVar(&cfg.SplitQueriesByInterval, "querier.split-queries-by-interval", 0, "Split queries by an interval and execute in parallel, 0 disables it. You should use a multiple of 24 hours (same as the storage bucketing scheme), to avoid queriers downloading and processing the same chunks. This also determines how cache keys are chosen when result caching is enabled") + f.IntVar(&cfg.SplitQueriesByIntervalMaxSplits, "querier.split-queries-by-interval-max-splits", 0, "Maximum number of splits for a range query, 0 disables it. Uses a multiple of `split-queries-by-interval` to maintain the number of splits below the limit. If vertical sharding is enabled for a query, the combined total number of vertical and interval shards is kept below this limit") f.BoolVar(&cfg.AlignQueriesWithStep, "querier.align-querier-with-step", false, "Mutate incoming queries to align their start and end with their step.") f.BoolVar(&cfg.CacheResults, "querier.cache-results", false, "Cache query results.") f.Var(&cfg.ForwardHeaders, "frontend.forward-headers-list", "List of headers forwarded by the query Frontend to downstream querier.") @@ -66,6 +68,9 @@ func (cfg *Config) Validate(qCfg querier.Config) error { return errors.Wrap(err, "invalid ResultsCache config") } } + if cfg.SplitQueriesByIntervalMaxSplits > 0 && cfg.SplitQueriesByInterval <= 0 { + return errors.New("split-queries-by-interval-max-splits requires that a value for split-queries-by-interval is set.") + } return nil } @@ -80,6 +85,8 @@ func Middlewares( prometheusCodec tripperware.Codec, shardedPrometheusCodec tripperware.Codec, lookbackDelta time.Duration, + queryStoreAfter time.Duration, + maxDaysOfDataFetched int, ) ([]tripperware.Middleware, cache.Cache, error) { // Metric used to keep track of each middleware execution duration. metrics := tripperware.NewInstrumentMiddlewareMetrics(registerer) @@ -89,8 +96,11 @@ func Middlewares( queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("step_align", metrics), StepAlignMiddleware) } if cfg.SplitQueriesByInterval != 0 { - staticIntervalFn := func(_ tripperware.Request) time.Duration { return cfg.SplitQueriesByInterval } - queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("split_by_interval", metrics), SplitByIntervalMiddleware(staticIntervalFn, limits, prometheusCodec, registerer)) + intervalFn := staticIntervalFn(cfg) + if cfg.SplitQueriesByIntervalMaxSplits > 0 || maxDaysOfDataFetched > 0 { + intervalFn = dynamicIntervalFn(cfg, limits, queryAnalyzer, queryStoreAfter, lookbackDelta, maxDaysOfDataFetched) + } + queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("split_by_interval", metrics), SplitByIntervalMiddleware(intervalFn, limits, prometheusCodec, registerer, queryStoreAfter, lookbackDelta)) } var c cache.Cache diff --git a/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go b/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go index c8238a9a62..55be979a3d 100644 --- a/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go +++ b/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go @@ -64,6 +64,8 @@ func TestRoundTrip(t *testing.T) { PrometheusCodec, ShardedPrometheusCodec, 5*time.Minute, + 24*time.Hour, + 0, ) require.NoError(t, err) diff --git a/pkg/querier/tripperware/queryrange/split_by_interval.go b/pkg/querier/tripperware/queryrange/split_by_interval.go index 064b53d760..8b64484afe 100644 --- a/pkg/querier/tripperware/queryrange/split_by_interval.go +++ b/pkg/querier/tripperware/queryrange/split_by_interval.go @@ -3,20 +3,34 @@ package queryrange import ( "context" "net/http" + "sort" "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/promql/parser" + "github.com/thanos-io/thanos/pkg/querysharding" "github.com/weaveworks/common/httpgrpc" + querier_stats "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/querier/tripperware" + "github.com/cortexproject/cortex/pkg/tenant" + "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/validation" ) -type IntervalFn func(r tripperware.Request) time.Duration +// dayMillis is the L4 block range in milliseconds. +var dayMillis = util.DurationMilliseconds(24 * time.Hour) + +type IntervalFn func(ctx context.Context, r tripperware.Request) (time.Duration, error) + +type dayRange struct { + startDay int64 + endDay int64 +} // SplitByIntervalMiddleware creates a new Middleware that splits requests by a given interval. -func SplitByIntervalMiddleware(interval IntervalFn, limits tripperware.Limits, merger tripperware.Merger, registerer prometheus.Registerer) tripperware.Middleware { +func SplitByIntervalMiddleware(interval IntervalFn, limits tripperware.Limits, merger tripperware.Merger, registerer prometheus.Registerer, queryStoreAfter time.Duration, lookbackDelta time.Duration) tripperware.Middleware { return tripperware.MiddlewareFunc(func(next tripperware.Handler) tripperware.Handler { return splitByInterval{ next: next, @@ -28,6 +42,8 @@ func SplitByIntervalMiddleware(interval IntervalFn, limits tripperware.Limits, m Name: "frontend_split_queries_total", Help: "Total number of underlying query requests after the split by interval is applied", }), + queryStoreAfter: queryStoreAfter, + lookbackDelta: lookbackDelta, } }) } @@ -40,17 +56,28 @@ type splitByInterval struct { // Metrics. splitByCounter prometheus.Counter + + queryStoreAfter time.Duration + lookbackDelta time.Duration } func (s splitByInterval) Do(ctx context.Context, r tripperware.Request) (tripperware.Response, error) { // First we're going to build new requests, one for each day, taking care // to line up the boundaries with step. - reqs, err := splitQuery(r, s.interval(r)) + interval, err := s.interval(ctx, r) + if err != nil { + return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) + } + reqs, err := splitQuery(r, interval) if err != nil { return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) } s.splitByCounter.Add(float64(len(reqs))) + stats := querier_stats.FromContext(ctx) + if stats != nil { + stats.SplitInterval = interval + } reqResps, err := tripperware.DoRequests(ctx, s.next, reqs, s.limits) if err != nil { return nil, err @@ -135,3 +162,133 @@ func nextIntervalBoundary(t, step int64, interval time.Duration) int64 { } return target } + +func staticIntervalFn(cfg Config) func(ctx context.Context, r tripperware.Request) (time.Duration, error) { + return func(_ context.Context, _ tripperware.Request) (time.Duration, error) { + return cfg.SplitQueriesByInterval, nil + } +} + +func dynamicIntervalFn(cfg Config, limits tripperware.Limits, queryAnalyzer querysharding.Analyzer, queryStoreAfter time.Duration, lookbackDelta time.Duration, maxDaysOfDataFetched int) func(ctx context.Context, r tripperware.Request) (time.Duration, error) { + return func(ctx context.Context, r tripperware.Request) (time.Duration, error) { + tenantIDs, err := tenant.TenantIDs(ctx) + if err != nil { + return cfg.SplitQueriesByInterval, err + } + + queryDayRange := int((r.GetEnd() / dayMillis) - (r.GetStart() / dayMillis) + 1) + analysis, err := queryAnalyzer.Analyze(r.GetQuery()) + if err != nil { + return cfg.SplitQueriesByInterval, err + } + + queryVerticalShardSize := validation.SmallestPositiveIntPerTenant(tenantIDs, limits.QueryVerticalShardSize) + if queryVerticalShardSize <= 0 || !analysis.IsShardable() { + queryVerticalShardSize = 1 + } + + queryExpr, err := parser.ParseExpr(r.GetQuery()) + if err != nil { + return cfg.SplitQueriesByInterval, err + } + daysFetchedWithoutSharding := getDaysFetchedByQuery(queryExpr, []tripperware.Request{r}, queryStoreAfter, lookbackDelta, time.Now()) + extraDaysFetchedPerShard := daysFetchedWithoutSharding - queryDayRange + + // if lookbackDelta is configured and the query start time is not 00:00 UTC, we need to account for 1 fetched day of data per split except for the first split + lookbackDeltaCompensation := 0 + if lookbackDelta > 0 && (r.GetStart()-util.DurationMilliseconds(lookbackDelta))/dayMillis == r.GetStart()/dayMillis { + lookbackDeltaCompensation = 1 + } + + var maxSplitsByFetchedDaysOfData int + if maxDaysOfDataFetched > 0 { + maxSplitsByFetchedDaysOfData = ((maxDaysOfDataFetched / queryVerticalShardSize) - queryDayRange - lookbackDeltaCompensation) / (extraDaysFetchedPerShard + lookbackDeltaCompensation) + } + + var maxSplitsByConfig int + if cfg.SplitQueriesByIntervalMaxSplits > 0 { + maxSplitsByConfig = cfg.SplitQueriesByIntervalMaxSplits / queryVerticalShardSize + } + + var maxSplits time.Duration + switch { + case maxSplitsByFetchedDaysOfData <= 0 && maxSplitsByConfig <= 0: + return cfg.SplitQueriesByInterval, nil + case maxSplitsByFetchedDaysOfData <= 0: + maxSplits = time.Duration(maxSplitsByConfig) + case maxSplitsByConfig <= 0: + maxSplits = time.Duration(maxSplitsByFetchedDaysOfData) + default: + // Use the more restricting shard limit + maxSplits = time.Duration(min(maxSplitsByConfig, maxSplitsByFetchedDaysOfData)) + } + + queryRange := time.Duration((r.GetEnd() - r.GetStart()) * int64(time.Millisecond)) + baseInterval := cfg.SplitQueriesByInterval + n := (queryRange + baseInterval*maxSplits - 1) / (baseInterval * maxSplits) + return n * cfg.SplitQueriesByInterval, nil + } +} + +// calculates the total number of days the query will have to fetch during execution, considering the query itself, +// queryStoreAfter and lookbackDelta. +func getDaysFetchedByQuery(expr parser.Expr, reqs []tripperware.Request, queryStoreAfter, lookbackDelta time.Duration, now time.Time) int { + count := 0 + queryStoreMaxT := util.TimeToMillis(now.Add(-queryStoreAfter)) + var evalRange time.Duration + + for _, req := range reqs { + var ranges []dayRange + parser.Inspect(expr, func(node parser.Node, path []parser.Node) error { + switch n := node.(type) { + case *parser.VectorSelector: + start, end := util.GetTimeRangesForSelector(req.GetStart(), req.GetEnd(), lookbackDelta, n, path, evalRange) + // Query shouldn't touch Store Gateway. + if start > queryStoreMaxT { + return nil + } else { + // If the query split needs to query store, cap the max time to now - queryStoreAfter. + end = min(end, queryStoreMaxT) + } + + startDay := start / dayMillis + endDay := end / dayMillis + ranges = append(ranges, dayRange{startDay: startDay, endDay: endDay}) + evalRange = 0 + case *parser.MatrixSelector: + evalRange = n.Range + } + return nil + }) + nonOverlappingRanges := mergeDayRanges(ranges) + for _, dayRange := range nonOverlappingRanges { + count += int(dayRange.endDay-dayRange.startDay) + 1 + } + } + return count +} + +func mergeDayRanges(ranges []dayRange) []dayRange { + if len(ranges) == 0 { + return ranges + } + + // Sort ranges by their startDay + sort.Slice(ranges, func(i, j int) bool { + return ranges[i].startDay < ranges[j].startDay + }) + + // Merge overlapping ranges + merged := []dayRange{ranges[0]} + for _, current := range ranges { + last := &merged[len(merged)-1] + if current.startDay <= last.endDay { + if current.endDay > last.endDay { + last.endDay = current.endDay + } + } else { + merged = append(merged, current) + } + } + return merged +} diff --git a/pkg/querier/tripperware/queryrange/split_by_interval_test.go b/pkg/querier/tripperware/queryrange/split_by_interval_test.go index 23989ac64a..186ce9824b 100644 --- a/pkg/querier/tripperware/queryrange/split_by_interval_test.go +++ b/pkg/querier/tripperware/queryrange/split_by_interval_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/thanos-io/thanos/pkg/querysharding" "github.com/weaveworks/common/httpgrpc" "github.com/prometheus/prometheus/promql/parser" @@ -21,7 +22,11 @@ import ( "github.com/cortexproject/cortex/pkg/querier/tripperware" ) -const seconds = 1e3 // 1e3 milliseconds per second. +const ( + seconds = 1e3 // 1e3 milliseconds per second. + queryStoreAfter = 24 * time.Hour + lookbackDelta = 5 * time.Minute +) func TestNextIntervalBoundary(t *testing.T) { t.Parallel() @@ -305,11 +310,11 @@ func TestSplitByDay(t *testing.T) { u, err := url.Parse(s.URL) require.NoError(t, err) - interval := func(_ tripperware.Request) time.Duration { return 24 * time.Hour } + interval := func(_ context.Context, _ tripperware.Request) (time.Duration, error) { return 24 * time.Hour, nil } roundtripper := tripperware.NewRoundTripper(singleHostRoundTripper{ host: u.Host, next: http.DefaultTransport, - }, PrometheusCodec, nil, NewLimitsMiddleware(mockLimits{}, 5*time.Minute), SplitByIntervalMiddleware(interval, mockLimits{}, PrometheusCodec, nil)) + }, PrometheusCodec, nil, NewLimitsMiddleware(mockLimits{}, 5*time.Minute), SplitByIntervalMiddleware(interval, mockLimits{}, PrometheusCodec, nil, queryStoreAfter, lookbackDelta)) req, err := http.NewRequest("GET", tc.path, http.NoBody) require.NoError(t, err) @@ -408,3 +413,139 @@ func Test_evaluateAtModifier(t *testing.T) { }) } } + +func TestDynamicIntervalFn(t *testing.T) { + for _, tc := range []struct { + name string + baseSplitInterval time.Duration + req tripperware.Request + expectedInterval time.Duration + expectedError bool + maxQueryIntervalSplits int + maxDaysOfDataFetched int + }{ + { + baseSplitInterval: day, + name: "failed to parse request, return default interval", + req: &tripperware.PrometheusRequest{ + Query: "up[aaa", + Start: 0, + End: 10 * 24 * 3600 * seconds, + Step: 5 * 60 * seconds, + }, + maxQueryIntervalSplits: 30, + maxDaysOfDataFetched: 200, + expectedInterval: day, + expectedError: true, + }, + { + baseSplitInterval: day, + name: "30 day range no limits, expect split by 1 day", + req: &tripperware.PrometheusRequest{ + Start: 0, + End: 30 * 24 * 3600 * seconds, + Step: 5 * 60 * seconds, + Query: "up", + }, + expectedInterval: day, + }, + { + baseSplitInterval: day, + name: "30 day range with 20 max splits, expect split by 2 day", + req: &tripperware.PrometheusRequest{ + Start: 30 * 24 * 3600 * seconds, + End: 60 * 24 * 3600 * seconds, + Step: 5 * 60 * seconds, + Query: "up", + }, + maxQueryIntervalSplits: 20, + expectedInterval: 2 * day, + }, + { + baseSplitInterval: day, + name: "60 day range, expect split by 4 day", + req: &tripperware.PrometheusRequest{ + Start: 0, + End: 60 * 24 * 3600 * seconds, + Step: 5 * 60 * seconds, + Query: "up", + }, + maxQueryIntervalSplits: 15, + expectedInterval: 4 * day, + }, + { + baseSplitInterval: day, + name: "61 day range, expect split by 5 day", + req: &tripperware.PrometheusRequest{ + Start: 0, + End: 61 * 24 * 3600 * seconds, + Step: 5 * 60 * seconds, + Query: "up", + }, + maxQueryIntervalSplits: 15, + expectedInterval: 5 * day, + }, + { + baseSplitInterval: day, + name: "30 day range short matrix selector with 200 days fetched limit, expect split by 1 day", + req: &tripperware.PrometheusRequest{ + Start: 30 * 24 * 3600 * seconds, + End: 60 * 24 * 3600 * seconds, + Step: 5 * 60 * seconds, + Query: "avg_over_time(up[1h])", + }, + maxDaysOfDataFetched: 200, + expectedInterval: day, + }, + { + baseSplitInterval: day, + name: "30 day range long matrix selector with 200 days fetched limit, expect split by 1 day", + req: &tripperware.PrometheusRequest{ + Start: 30 * 24 * 3600 * seconds, + End: 60 * 24 * 3600 * seconds, + Step: 5 * 60 * seconds, + Query: "avg_over_time(up[20d])", + }, + maxDaysOfDataFetched: 200, + expectedInterval: 4 * day, + }, + { + baseSplitInterval: day, + name: "60 day range, expect split by 7 day", + req: &tripperware.PrometheusRequest{ + Start: (2 * 24 * 3600 * seconds) + (3600*seconds - 120), + End: (61 * 24 * 3600 * seconds) + (2*3600*seconds + 500), + Step: 30 * 60 * seconds, + Query: "rate(up[1d]) + rate(up[2d]) + rate(up[5d]) + rate(up[15d])", + }, + maxDaysOfDataFetched: 200, + expectedInterval: 7 * day, + }, + { + baseSplitInterval: day, + name: "30 day range, expect split by 7 day", + req: &tripperware.PrometheusRequest{ + Start: 0, + End: 100 * 24 * 3600 * seconds, + Step: 60 * 60 * seconds, + Query: "up[5d:10m]", + }, + maxQueryIntervalSplits: 100, + maxDaysOfDataFetched: 300, + expectedInterval: 4 * day, + }, + } { + t.Run(tc.name, func(t *testing.T) { + cfg := Config{ + SplitQueriesByInterval: tc.baseSplitInterval, + SplitQueriesByIntervalMaxSplits: tc.maxQueryIntervalSplits, + } + ctx := user.InjectOrgID(context.Background(), "1") + interval, err := dynamicIntervalFn(cfg, mockLimits{}, querysharding.NewQueryAnalyzer(), queryStoreAfter, lookbackDelta, tc.maxDaysOfDataFetched)(ctx, tc.req) + require.Equal(t, tc.expectedInterval, interval) + if !tc.expectedError { + require.Nil(t, err) + } + }) + } +} diff --git a/pkg/util/time.go b/pkg/util/time.go index 3f19a71da9..2656065650 100644 --- a/pkg/util/time.go +++ b/pkg/util/time.go @@ -240,3 +240,70 @@ func ParseDurationMs(s string) (int64, error) { } return 0, httpgrpc.Errorf(http.StatusBadRequest, "cannot parse %q to a valid duration", s) } + +func DurationMilliseconds(d time.Duration) int64 { + return int64(d / (time.Millisecond / time.Nanosecond)) +} + +func GetTimeRangesForSelector(start, end int64, lookbackDelta time.Duration, n *parser.VectorSelector, path []parser.Node, evalRange time.Duration) (int64, int64) { + subqOffset, subqRange, subqTs := subqueryTimes(path) + + if subqTs != nil { + // The timestamp on the subquery overrides the eval statement time ranges. + start = *subqTs + end = *subqTs + } + + if n.Timestamp != nil { + // The timestamp on the selector overrides everything. + start = *n.Timestamp + end = *n.Timestamp + } else { + offsetMilliseconds := DurationMilliseconds(subqOffset) + start = start - offsetMilliseconds - DurationMilliseconds(subqRange) + end -= offsetMilliseconds + } + + if evalRange == 0 { + start -= DurationMilliseconds(lookbackDelta) + } else { + // For all matrix queries we want to ensure that we have (end-start) + range selected + // this way we have `range` data before the start time + start -= DurationMilliseconds(evalRange) + } + + offsetMilliseconds := DurationMilliseconds(n.OriginalOffset) + start -= offsetMilliseconds + end -= offsetMilliseconds + + return start, end +} + +// subqueryTimes returns the sum of offsets and ranges of all subqueries in the path. +// If the @ modifier is used, then the offset and range is w.r.t. that timestamp +// (i.e. the sum is reset when we have @ modifier). +// The returned *int64 is the closest timestamp that was seen. nil for no @ modifier. +func subqueryTimes(path []parser.Node) (time.Duration, time.Duration, *int64) { + var ( + subqOffset, subqRange time.Duration + ts int64 = math.MaxInt64 + ) + for _, node := range path { + if n, ok := node.(*parser.SubqueryExpr); ok { + subqOffset += n.OriginalOffset + subqRange += n.Range + if n.Timestamp != nil { + // The @ modifier on subquery invalidates all the offset and + // range till now. Hence resetting it here. + subqOffset = n.OriginalOffset + subqRange = n.Range + ts = *n.Timestamp + } + } + } + var tsp *int64 + if ts != math.MaxInt64 { + tsp = &ts + } + return subqOffset, subqRange, tsp +}