Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add limit for max range query splits by interval #6458

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4193,6 +4193,13 @@ The `query_range_config` configures the query splitting and caching in the Corte
# CLI flag: -querier.split-queries-by-interval
[split_queries_by_interval: <duration> | default = 0s]

# Maximum number of splits for a range query, 0 disables it. Uses a multiple of
# `split-queries-by-interval` to maintain the number of splits below the limit.
# If vertical sharding is enabled for a query, the combined total number of
# vertical and interval shards is kept below this limit
# CLI flag: -querier.split-queries-by-interval-max-splits
[split_queries_by_interval_max_splits: <int> | default = 0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should run: make doc?


# Mutate incoming queries to align their start and end with their step.
# CLI flag: -querier.align-querier-with-step
[align_queries_with_step: <boolean> | default = false]
Expand Down
2 changes: 2 additions & 0 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,8 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
prometheusCodec,
shardedPrometheusCodec,
t.Cfg.Querier.LookbackDelta,
t.Cfg.Querier.QueryStoreAfter,
t.Cfg.Querier.MaxDaysOfDataFetched,
)
if err != nil {
return nil, err
Expand Down
5 changes: 5 additions & 0 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString u
splitQueries := stats.LoadSplitQueries()
dataSelectMaxTime := stats.LoadDataSelectMaxTime()
dataSelectMinTime := stats.LoadDataSelectMinTime()
splitInterval := stats.LoadSplitInterval()

// Track stats.
f.querySeconds.WithLabelValues(userID).Add(wallTime.Seconds())
Expand Down Expand Up @@ -425,6 +426,10 @@ func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString u
logMessage = append(logMessage, "query_storage_wall_time_seconds", sws)
}

if splitInterval > 0 {
logMessage = append(logMessage, "split_interval", splitInterval.String())
}

if error != nil {
s, ok := status.FromError(error)
if !ok {
Expand Down
4 changes: 4 additions & 0 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ type Config struct {
// Limit of number of steps allowed for every subquery expression in a query.
MaxSubQuerySteps int64 `yaml:"max_subquery_steps"`

// Max number of days of data fetched for a query, used to calculate appropriate interval and vertical shard size.
MaxDaysOfDataFetched int `yaml:"max_days_of_data_fetched"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does MaxDurationOfDataFetchedFromStoragePerQuery sound better?
Should this be part of QueryRange configuration?


// Directory for ActiveQueryTracker. If empty, ActiveQueryTracker will be disabled and MaxConcurrent will not be applied (!).
// ActiveQueryTracker logs queries that were active during the last crash, but logs them on the next startup.
// However, we need to use active query tracker, otherwise we cannot limit Max Concurrent queries in the PromQL
Expand Down Expand Up @@ -131,6 +134,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.Int64Var(&cfg.MaxSubQuerySteps, "querier.max-subquery-steps", 0, "Max number of steps allowed for every subquery expression in query. Number of steps is calculated using subquery range / step. A value > 0 enables it.")
f.BoolVar(&cfg.IgnoreMaxQueryLength, "querier.ignore-max-query-length", false, "If enabled, ignore max query length check at Querier select method. Users can choose to ignore it since the validation can be done before Querier evaluation like at Query Frontend or Ruler.")
f.BoolVar(&cfg.EnablePromQLExperimentalFunctions, "querier.enable-promql-experimental-functions", false, "[Experimental] If true, experimental promQL functions are enabled.")
f.IntVar(&cfg.MaxDaysOfDataFetched, "querier.max-days-of-data-fetched", 0, "Max number of days of data fetched for a query. This can be used to calculate appropriate interval and vertical shard size dynamically.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could more details be added to the explanation? Also add "0 to disable".

}

// Validate the config
Expand Down
9 changes: 9 additions & 0 deletions pkg/querier/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type QueryStats struct {
Priority int64
DataSelectMaxTime int64
DataSelectMinTime int64
SplitInterval time.Duration
m sync.Mutex
}

Expand Down Expand Up @@ -287,6 +288,14 @@ func (s *QueryStats) LoadDataSelectMinTime() int64 {
return atomic.LoadInt64(&s.DataSelectMinTime)
}

func (s *QueryStats) LoadSplitInterval() time.Duration {
if s == nil {
return 0
}

return s.SplitInterval
}

func (s *QueryStats) AddStoreGatewayTouchedPostings(count uint64) {
if s == nil {
return
Expand Down
24 changes: 17 additions & 7 deletions pkg/querier/tripperware/queryrange/query_range_middlewares.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ const day = 24 * time.Hour

// Config for query_range middleware chain.
type Config struct {
SplitQueriesByInterval time.Duration `yaml:"split_queries_by_interval"`
AlignQueriesWithStep bool `yaml:"align_queries_with_step"`
ResultsCacheConfig `yaml:"results_cache"`
CacheResults bool `yaml:"cache_results"`
MaxRetries int `yaml:"max_retries"`
SplitQueriesByInterval time.Duration `yaml:"split_queries_by_interval"`
SplitQueriesByIntervalMaxSplits int `yaml:"split_queries_by_interval_max_splits"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe these both could be nested inside another config called DynamicQuerySplits?

AlignQueriesWithStep bool `yaml:"align_queries_with_step"`
ResultsCacheConfig `yaml:"results_cache"`
CacheResults bool `yaml:"cache_results"`
MaxRetries int `yaml:"max_retries"`
// List of headers which query_range middleware chain would forward to downstream querier.
ForwardHeaders flagext.StringSlice `yaml:"forward_headers_list"`

Expand All @@ -50,6 +51,7 @@ type Config struct {
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxRetries, "querier.max-retries-per-request", 5, "Maximum number of retries for a single request; beyond this, the downstream error is returned.")
f.DurationVar(&cfg.SplitQueriesByInterval, "querier.split-queries-by-interval", 0, "Split queries by an interval and execute in parallel, 0 disables it. You should use a multiple of 24 hours (same as the storage bucketing scheme), to avoid queriers downloading and processing the same chunks. This also determines how cache keys are chosen when result caching is enabled")
f.IntVar(&cfg.SplitQueriesByIntervalMaxSplits, "querier.split-queries-by-interval-max-splits", 0, "Maximum number of splits for a range query, 0 disables it. Uses a multiple of `split-queries-by-interval` to maintain the number of splits below the limit. If vertical sharding is enabled for a query, the combined total number of vertical and interval shards is kept below this limit")
f.BoolVar(&cfg.AlignQueriesWithStep, "querier.align-querier-with-step", false, "Mutate incoming queries to align their start and end with their step.")
f.BoolVar(&cfg.CacheResults, "querier.cache-results", false, "Cache query results.")
f.Var(&cfg.ForwardHeaders, "frontend.forward-headers-list", "List of headers forwarded by the query Frontend to downstream querier.")
Expand All @@ -66,6 +68,9 @@ func (cfg *Config) Validate(qCfg querier.Config) error {
return errors.Wrap(err, "invalid ResultsCache config")
}
}
if cfg.SplitQueriesByIntervalMaxSplits > 0 && cfg.SplitQueriesByInterval <= 0 {
return errors.New("split-queries-by-interval-max-splits requires that a value for split-queries-by-interval is set.")
}
return nil
}

Expand All @@ -80,6 +85,8 @@ func Middlewares(
prometheusCodec tripperware.Codec,
shardedPrometheusCodec tripperware.Codec,
lookbackDelta time.Duration,
queryStoreAfter time.Duration,
maxDaysOfDataFetched int,
) ([]tripperware.Middleware, cache.Cache, error) {
// Metric used to keep track of each middleware execution duration.
metrics := tripperware.NewInstrumentMiddlewareMetrics(registerer)
Expand All @@ -89,8 +96,11 @@ func Middlewares(
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("step_align", metrics), StepAlignMiddleware)
}
if cfg.SplitQueriesByInterval != 0 {
staticIntervalFn := func(_ tripperware.Request) time.Duration { return cfg.SplitQueriesByInterval }
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("split_by_interval", metrics), SplitByIntervalMiddleware(staticIntervalFn, limits, prometheusCodec, registerer))
intervalFn := staticIntervalFn(cfg)
if cfg.SplitQueriesByIntervalMaxSplits > 0 || maxDaysOfDataFetched > 0 {
intervalFn = dynamicIntervalFn(cfg, limits, queryAnalyzer, queryStoreAfter, lookbackDelta, maxDaysOfDataFetched)
}
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("split_by_interval", metrics), SplitByIntervalMiddleware(intervalFn, limits, prometheusCodec, registerer, queryStoreAfter, lookbackDelta))
}

var c cache.Cache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ func TestRoundTrip(t *testing.T) {
PrometheusCodec,
ShardedPrometheusCodec,
5*time.Minute,
24*time.Hour,
0,
)
require.NoError(t, err)

Expand Down
162 changes: 159 additions & 3 deletions pkg/querier/tripperware/queryrange/split_by_interval.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,34 @@ package queryrange
import (
"context"
"net/http"
"sort"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/promql/parser"
"github.com/thanos-io/thanos/pkg/querysharding"
"github.com/weaveworks/common/httpgrpc"

querier_stats "github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/querier/tripperware"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/validation"
)

type IntervalFn func(r tripperware.Request) time.Duration
// dayMillis is the L4 block range in milliseconds.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The L4 block range is configurable in Cortex. Do we have to tie it to the L4 block range? Could the configuration itself be of type time.Duration?

var dayMillis = util.DurationMilliseconds(24 * time.Hour)

type IntervalFn func(ctx context.Context, r tripperware.Request) (time.Duration, error)

type dayRange struct {
startDay int64
endDay int64
}

// SplitByIntervalMiddleware creates a new Middleware that splits requests by a given interval.
func SplitByIntervalMiddleware(interval IntervalFn, limits tripperware.Limits, merger tripperware.Merger, registerer prometheus.Registerer) tripperware.Middleware {
func SplitByIntervalMiddleware(interval IntervalFn, limits tripperware.Limits, merger tripperware.Merger, registerer prometheus.Registerer, queryStoreAfter time.Duration, lookbackDelta time.Duration) tripperware.Middleware {
return tripperware.MiddlewareFunc(func(next tripperware.Handler) tripperware.Handler {
return splitByInterval{
next: next,
Expand All @@ -28,6 +42,8 @@ func SplitByIntervalMiddleware(interval IntervalFn, limits tripperware.Limits, m
Name: "frontend_split_queries_total",
Help: "Total number of underlying query requests after the split by interval is applied",
}),
queryStoreAfter: queryStoreAfter,
lookbackDelta: lookbackDelta,
}
})
}
Expand All @@ -40,17 +56,28 @@ type splitByInterval struct {

// Metrics.
splitByCounter prometheus.Counter

queryStoreAfter time.Duration
lookbackDelta time.Duration
}

func (s splitByInterval) Do(ctx context.Context, r tripperware.Request) (tripperware.Response, error) {
// First we're going to build new requests, one for each day, taking care
// to line up the boundaries with step.
reqs, err := splitQuery(r, s.interval(r))
interval, err := s.interval(ctx, r)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This should be an InternalServerError

}
reqs, err := splitQuery(r, interval)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
s.splitByCounter.Add(float64(len(reqs)))

stats := querier_stats.FromContext(ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the stats used for? Is it only used to log in query-frontend?

if stats != nil {
stats.SplitInterval = interval
}
reqResps, err := tripperware.DoRequests(ctx, s.next, reqs, s.limits)
if err != nil {
return nil, err
Expand Down Expand Up @@ -135,3 +162,132 @@ func nextIntervalBoundary(t, step int64, interval time.Duration) int64 {
}
return target
}

func staticIntervalFn(cfg Config) func(ctx context.Context, r tripperware.Request) (time.Duration, error) {
return func(_ context.Context, _ tripperware.Request) (time.Duration, error) {
return cfg.SplitQueriesByInterval, nil
}
}

func dynamicIntervalFn(cfg Config, limits tripperware.Limits, queryAnalyzer querysharding.Analyzer, queryStoreAfter time.Duration, lookbackDelta time.Duration, maxDaysOfDataFetched int) func(ctx context.Context, r tripperware.Request) (time.Duration, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could all of these be passed through the cfg?

return func(ctx context.Context, r tripperware.Request) (time.Duration, error) {
tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return cfg.SplitQueriesByInterval, err
}

queryDayRange := int((r.GetEnd() / dayMillis) - (r.GetStart() / dayMillis) + 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we avoid using day here? Other users of Cortex might choose to split by multiple days or less than a day?

baseInterval := int(cfg.SplitQueriesByInterval / day)
analysis, err := queryAnalyzer.Analyze(r.GetQuery())
if err != nil {
return cfg.SplitQueriesByInterval, err
}

queryVerticalShardSize := validation.SmallestPositiveIntPerTenant(tenantIDs, limits.QueryVerticalShardSize)
if queryVerticalShardSize <= 0 || !analysis.IsShardable() {
queryVerticalShardSize = 1
}

queryExpr, err := parser.ParseExpr(r.GetQuery())
if err != nil {
return cfg.SplitQueriesByInterval, err
}
daysFetchedWithoutSharding := getDaysFetchedByQuery(queryExpr, []tripperware.Request{r}, queryStoreAfter, lookbackDelta, time.Now())
extraDaysFetchedPerShard := daysFetchedWithoutSharding - queryDayRange

// if lookbackDelta is configured and the query start time is not 00:00 UTC, we need to account for 1 fetched day of data per split except for the first split
lookbackDeltaCompensation := 0
if lookbackDelta > 0 && (r.GetStart()-util.DurationMilliseconds(lookbackDelta))/dayMillis == r.GetStart()/dayMillis {
lookbackDeltaCompensation = 1
}

var maxSplitsByFetchedDaysOfData int
if maxDaysOfDataFetched > 0 {
maxSplitsByFetchedDaysOfData = ((maxDaysOfDataFetched / queryVerticalShardSize) - queryDayRange - lookbackDeltaCompensation) / (extraDaysFetchedPerShard + lookbackDeltaCompensation)
}

var maxSplitsByConfig int
if cfg.SplitQueriesByIntervalMaxSplits > 0 {
maxSplitsByConfig = cfg.SplitQueriesByIntervalMaxSplits / queryVerticalShardSize
}

var maxSplits int
switch {
case maxSplitsByFetchedDaysOfData <= 0 && maxSplitsByConfig <= 0:
maxSplits = 1
case maxSplitsByFetchedDaysOfData <= 0:
maxSplits = maxSplitsByConfig
case maxSplitsByConfig <= 0:
maxSplits = maxSplitsByFetchedDaysOfData
default:
// Use the more restricting shard limit
maxSplits = min(maxSplitsByConfig, maxSplitsByFetchedDaysOfData)
}

n := (queryDayRange + baseInterval*maxSplits - 1) / (baseInterval * maxSplits)
return time.Duration(n) * cfg.SplitQueriesByInterval, nil
}
}

// calculates the total number of days the query will have to fetch during execution, considering the query itself,
// queryStoreAfter and lookbackDelta.
func getDaysFetchedByQuery(expr parser.Expr, reqs []tripperware.Request, queryStoreAfter, lookbackDelta time.Duration, now time.Time) int {
count := 0
queryStoreMaxT := util.TimeToMillis(now.Add(-queryStoreAfter))
var evalRange time.Duration

for _, req := range reqs {
var ranges []dayRange
parser.Inspect(expr, func(node parser.Node, path []parser.Node) error {
switch n := node.(type) {
case *parser.VectorSelector:
start, end := util.GetTimeRangesForSelector(req.GetStart(), req.GetEnd(), lookbackDelta, n, path, evalRange)
// Query shouldn't touch Store Gateway.
if start > queryStoreMaxT {
return nil
} else {
// If the query split needs to query store, cap the max time to now - queryStoreAfter.
end = min(end, queryStoreMaxT)
}

startDay := start / dayMillis
endDay := end / dayMillis
ranges = append(ranges, dayRange{startDay: startDay, endDay: endDay})
evalRange = 0
case *parser.MatrixSelector:
evalRange = n.Range
}
return nil
})
nonOverlappingRanges := mergeDayRanges(ranges)
for _, dayRange := range nonOverlappingRanges {
count += int(dayRange.endDay-dayRange.startDay) + 1
}
}
return count
}

func mergeDayRanges(ranges []dayRange) []dayRange {
if len(ranges) == 0 {
return ranges
}

// Sort ranges by their startDay
sort.Slice(ranges, func(i, j int) bool {
return ranges[i].startDay < ranges[j].startDay
})

// Merge overlapping ranges
merged := []dayRange{ranges[0]}
for _, current := range ranges {
last := &merged[len(merged)-1]
if current.startDay <= last.endDay {
if current.endDay > last.endDay {
last.endDay = current.endDay
}
} else {
merged = append(merged, current)
}
}
return merged
}
Loading
Loading