From 0ddbd96291d4ff05499ec53c5a04a5dda32d36ad Mon Sep 17 00:00:00 2001 From: Ankit Jain Date: Tue, 28 Nov 2023 23:09:11 +0530 Subject: [PATCH] Improving the performance of date histogram aggregation (without any sub-aggregation) (#11083) * Adding filter based optimization logic to date histogram aggregation Signed-off-by: Ankit Jain * Reading the field name for aggregation correctly Signed-off-by: Ankit Jain * Adding the limit on number of buckets for filter aggregation Signed-off-by: Ankit Jain * Applying the optimizations for match all query as well Signed-off-by: Ankit Jain * Handling the unwrapped match all query Signed-off-by: Ankit Jain * Adding logic for recursively unwrapping the query Signed-off-by: Ankit Jain * Restructuring the code for making it more reusable and unit testable Signed-off-by: Ankit Jain * Adding javadocs for fixing build failure Signed-off-by: Ankit Jain * Fixing minor bugs in refactoring Signed-off-by: Ankit Jain * Adding logic for optimizing auto date histogram Signed-off-by: Ankit Jain * Fixing bugs and passing unit tests for date histogram Signed-off-by: Ankit Jain * Temporarily reverting auto date histogram changes Signed-off-by: Ankit Jain * Fixing spotless check bugs Signed-off-by: Ankit Jain * Adding back auto date histogram and passing all unit tests Signed-off-by: Ankit Jain * Fixing the integration tests for reduced collector work Signed-off-by: Ankit Jain * Fixing the integration test regression Signed-off-by: Ankit Jain * Addressing code review comments Signed-off-by: Ankit Jain * Fixing hardbound, missing and script test cases Signed-off-by: Ankit Jain * Removing collect_count validation to prevent backward compatibility tests from failing Signed-off-by: Ankit Jain * Finally fixing hardbounds test case Signed-off-by: Ankit Jain * Refactoring code for reusability Signed-off-by: Ankit Jain --------- Signed-off-by: Ankit Jain --- CHANGELOG.md | 1 + .../test/search.aggregation/10_histogram.yml | 1 - .../java/org/opensearch/common/Rounding.java | 38 ++- .../index/mapper/DateFieldMapper.java | 10 + .../AutoDateHistogramAggregator.java | 89 +++++- .../histogram/DateHistogramAggregator.java | 55 +++- .../bucket/histogram/FilterRewriteHelper.java | 281 ++++++++++++++++++ .../AutoDateHistogramAggregatorTests.java | 1 + 8 files changed, 456 insertions(+), 20 deletions(-) create mode 100644 server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/FilterRewriteHelper.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 4b18d64f3d8a9..cda33c4e05dc8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -146,6 +146,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Disallow removing some metadata fields by remove ingest processor ([#10895](https://github.com/opensearch-project/OpenSearch/pull/10895)) - Refactor common parts from the Rounding class into a separate 'round' package ([#11023](https://github.com/opensearch-project/OpenSearch/issues/11023)) - Performance improvement for MultiTerm Queries on Keyword fields ([#7057](https://github.com/opensearch-project/OpenSearch/issues/7057)) +- Performance improvement for date histogram aggregations without sub-aggregations ([#11083](https://github.com/opensearch-project/OpenSearch/pull/11083)) - Disable concurrent aggs for Diversified Sampler and Sampler aggs ([#11087](https://github.com/opensearch-project/OpenSearch/issues/11087)) - Made leader/follower check timeout setting dynamic ([#10528](https://github.com/opensearch-project/OpenSearch/pull/10528)) - Improve boolean parsing performance ([#11308](https://github.com/opensearch-project/OpenSearch/pull/11308)) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/10_histogram.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/10_histogram.yml index 3b16cdb13a22f..e7da9a0bc454c 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/10_histogram.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/10_histogram.yml @@ -598,7 +598,6 @@ setup: - match: { aggregations.histo.buckets.0.doc_count: 2 } - match: { profile.shards.0.aggregations.0.type: DateHistogramAggregator } - match: { profile.shards.0.aggregations.0.description: histo } - - match: { profile.shards.0.aggregations.0.breakdown.collect_count: 4 } - match: { profile.shards.0.aggregations.0.debug.total_buckets: 3 } --- diff --git a/server/src/main/java/org/opensearch/common/Rounding.java b/server/src/main/java/org/opensearch/common/Rounding.java index 5a740b8527704..002d3924a6324 100644 --- a/server/src/main/java/org/opensearch/common/Rounding.java +++ b/server/src/main/java/org/opensearch/common/Rounding.java @@ -98,7 +98,7 @@ long roundFloor(long utcMillis) { } @Override - long extraLocalOffsetLookup() { + public long extraLocalOffsetLookup() { return extraLocalOffsetLookup; } }, @@ -109,7 +109,7 @@ long roundFloor(long utcMillis) { return DateUtils.roundYear(utcMillis); } - long extraLocalOffsetLookup() { + public long extraLocalOffsetLookup() { return extraLocalOffsetLookup; } }, @@ -120,7 +120,7 @@ long roundFloor(long utcMillis) { return DateUtils.roundQuarterOfYear(utcMillis); } - long extraLocalOffsetLookup() { + public long extraLocalOffsetLookup() { return extraLocalOffsetLookup; } }, @@ -131,7 +131,7 @@ long roundFloor(long utcMillis) { return DateUtils.roundMonthOfYear(utcMillis); } - long extraLocalOffsetLookup() { + public long extraLocalOffsetLookup() { return extraLocalOffsetLookup; } }, @@ -140,7 +140,7 @@ long roundFloor(long utcMillis) { return DateUtils.roundFloor(utcMillis, this.ratio); } - long extraLocalOffsetLookup() { + public long extraLocalOffsetLookup() { return ratio; } }, @@ -149,7 +149,7 @@ long roundFloor(long utcMillis) { return DateUtils.roundFloor(utcMillis, ratio); } - long extraLocalOffsetLookup() { + public long extraLocalOffsetLookup() { return ratio; } }, @@ -164,7 +164,7 @@ long roundFloor(long utcMillis) { return DateUtils.roundFloor(utcMillis, ratio); } - long extraLocalOffsetLookup() { + public long extraLocalOffsetLookup() { return ratio; } }, @@ -179,7 +179,7 @@ long roundFloor(long utcMillis) { return DateUtils.roundFloor(utcMillis, ratio); } - long extraLocalOffsetLookup() { + public long extraLocalOffsetLookup() { return ratio; } }; @@ -216,7 +216,7 @@ long extraLocalOffsetLookup() { * look up so that we can see transitions that we might have rounded * down beyond. */ - abstract long extraLocalOffsetLookup(); + public abstract long extraLocalOffsetLookup(); public byte getId() { return id; @@ -487,7 +487,7 @@ public double roundingSize(long utcMillis, DateTimeUnit timeUnit) { * * @opensearch.internal */ - static class TimeUnitRounding extends Rounding { + public static class TimeUnitRounding extends Rounding { static final byte ID = 1; private final DateTimeUnit unit; @@ -515,6 +515,14 @@ public byte id() { return ID; } + public DateTimeUnit getUnit() { + return this.unit; + } + + public ZoneId getTimeZone() { + return this.timeZone; + } + private LocalDateTime truncateLocalDateTime(LocalDateTime localDateTime) { switch (unit) { case SECOND_OF_MINUTE: @@ -945,7 +953,7 @@ public final long nextRoundingValue(long utcMillis) { * * @opensearch.internal */ - static class TimeIntervalRounding extends Rounding { + public static class TimeIntervalRounding extends Rounding { static final byte ID = 2; private final long interval; @@ -972,6 +980,14 @@ public byte id() { return ID; } + public long getInterval() { + return this.interval; + } + + public ZoneId getTimeZone() { + return this.timeZone; + } + @Override public Prepared prepare(long minUtcMillis, long maxUtcMillis) { long minLookup = minUtcMillis - interval; diff --git a/server/src/main/java/org/opensearch/index/mapper/DateFieldMapper.java b/server/src/main/java/org/opensearch/index/mapper/DateFieldMapper.java index 3b832628695fe..d98e6ea6af83d 100644 --- a/server/src/main/java/org/opensearch/index/mapper/DateFieldMapper.java +++ b/server/src/main/java/org/opensearch/index/mapper/DateFieldMapper.java @@ -409,6 +409,16 @@ public long parse(String value) { return resolution.convert(DateFormatters.from(dateTimeFormatter().parse(value), dateTimeFormatter().locale()).toInstant()); } + public long convertNanosToMillis(long nanoSecondsSinceEpoch) { + if (resolution.numericType.equals(NumericType.DATE_NANOSECONDS)) return DateUtils.toMilliSeconds(nanoSecondsSinceEpoch); + return nanoSecondsSinceEpoch; + } + + public long convertRoundedMillisToNanos(long milliSecondsSinceEpoch) { + if (resolution.numericType.equals(NumericType.DATE_NANOSECONDS)) return DateUtils.toNanoSeconds(milliSecondsSinceEpoch); + return milliSecondsSinceEpoch; + } + @Override public ValueFetcher valueFetcher(QueryShardContext context, SearchLookup searchLookup, String format) { DateFormatter defaultFormatter = dateTimeFormatter(); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java index b4f1e78f77aaf..a71c15d551927 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java @@ -34,6 +34,7 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.SortedNumericDocValues; import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Weight; import org.apache.lucene.util.CollectionUtil; import org.opensearch.common.Rounding; import org.opensearch.common.Rounding.Prepared; @@ -41,6 +42,7 @@ import org.opensearch.common.util.IntArray; import org.opensearch.common.util.LongArray; import org.opensearch.core.common.util.ByteArray; +import org.opensearch.index.mapper.DateFieldMapper; import org.opensearch.search.DocValueFormat; import org.opensearch.search.aggregations.Aggregator; import org.opensearch.search.aggregations.AggregatorFactories; @@ -125,9 +127,13 @@ static AutoDateHistogramAggregator build( * {@link MergingBucketsDeferringCollector#mergeBuckets(long[])}. */ private MergingBucketsDeferringCollector deferringCollector; + private final Weight[] filters; + private final DateFieldMapper.DateFieldType fieldType; protected final RoundingInfo[] roundingInfos; protected final int targetBuckets; + protected int roundingIdx; + protected Rounding.Prepared preparedRounding; private AutoDateHistogramAggregator( String name, @@ -148,8 +154,51 @@ private AutoDateHistogramAggregator( this.formatter = valuesSourceConfig.format(); this.roundingInfos = roundingInfos; this.roundingPreparer = roundingPreparer; + this.preparedRounding = prepareRounding(0); + + FilterRewriteHelper.FilterContext filterContext = FilterRewriteHelper.buildFastFilterContext( + parent(), + subAggregators.length, + context, + b -> getMinimumRounding(b[0], b[1]), + // Passing prepared rounding as supplier to ensure the correct prepared + // rounding is set as it is done during getMinimumRounding + () -> preparedRounding, + valuesSourceConfig, + fc -> FilterRewriteHelper.getAggregationBounds(context, fc.field()) + ); + if (filterContext != null) { + fieldType = filterContext.fieldType; + filters = filterContext.filters; + } else { + fieldType = null; + filters = null; + } } + private Rounding getMinimumRounding(final long low, final long high) { + // max - min / targetBuckets = bestDuration + // find the right innerInterval this bestDuration belongs to + // since we cannot exceed targetBuckets, bestDuration should go up, + // so the right innerInterval should be an upper bound + long bestDuration = (high - low) / targetBuckets; + while (roundingIdx < roundingInfos.length - 1) { + final RoundingInfo curRoundingInfo = roundingInfos[roundingIdx]; + final int temp = curRoundingInfo.innerIntervals[curRoundingInfo.innerIntervals.length - 1]; + // If the interval duration is covered by the maximum inner interval, + // we can start with this outer interval for creating the buckets + if (bestDuration <= temp * curRoundingInfo.roughEstimateDurationMillis) { + break; + } + roundingIdx++; + } + + preparedRounding = prepareRounding(roundingIdx); + return roundingInfos[roundingIdx].rounding; + } + + protected abstract LongKeyedBucketOrds getBucketOrds(); + @Override public final ScoreMode scoreMode() { if (valuesSource != null && valuesSource.needsScores()) { @@ -176,7 +225,32 @@ public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBuc if (valuesSource == null) { return LeafBucketCollector.NO_OP_COLLECTOR; } - return getLeafCollector(valuesSource.longValues(ctx), sub); + + final SortedNumericDocValues values = valuesSource.longValues(ctx); + final LeafBucketCollector iteratingCollector = getLeafCollector(values, sub); + + // Need to be declared as final and array for usage within the + // LeafBucketCollectorBase subclass below + final boolean[] useOpt = new boolean[1]; + useOpt[0] = filters != null; + + return new LeafBucketCollectorBase(sub, values) { + @Override + public void collect(int doc, long owningBucketOrd) throws IOException { + // Try fast filter aggregation if the filters have been created + // Skip if tried before and gave incorrect/incomplete results + if (useOpt[0]) { + useOpt[0] = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType, (key, count) -> { + incrementBucketDocCount( + FilterRewriteHelper.getBucketOrd(getBucketOrds().add(owningBucketOrd, preparedRounding.round(key))), + count + ); + }); + } + + iteratingCollector.collect(doc, owningBucketOrd); + } + }; } protected final InternalAggregation[] buildAggregations( @@ -247,8 +321,6 @@ protected final void merge(long[] mergeMap, long newNumBuckets) { * @opensearch.internal */ private static class FromSingle extends AutoDateHistogramAggregator { - private int roundingIdx; - private Rounding.Prepared preparedRounding; /** * Map from value to bucket ordinals. *

@@ -286,10 +358,14 @@ private static class FromSingle extends AutoDateHistogramAggregator { metadata ); - preparedRounding = prepareRounding(0); bucketOrds = new LongKeyedBucketOrds.FromSingle(context.bigArrays()); } + @Override + protected LongKeyedBucketOrds getBucketOrds() { + return bucketOrds; + } + @Override protected LeafBucketCollector getLeafCollector(SortedNumericDocValues values, LeafBucketCollector sub) throws IOException { return new LeafBucketCollectorBase(sub, values) { @@ -507,6 +583,11 @@ private static class FromMany extends AutoDateHistogramAggregator { liveBucketCountUnderestimate = context.bigArrays().newIntArray(1, true); } + @Override + protected LongKeyedBucketOrds getBucketOrds() { + return bucketOrds; + } + @Override protected LeafBucketCollector getLeafCollector(SortedNumericDocValues values, LeafBucketCollector sub) throws IOException { return new LeafBucketCollectorBase(sub, values) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java index f602eea7a9b12..8437e1dce9fe0 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java @@ -34,10 +34,12 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.SortedNumericDocValues; import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Weight; import org.apache.lucene.util.CollectionUtil; import org.opensearch.common.Nullable; import org.opensearch.common.Rounding; import org.opensearch.common.lease.Releasables; +import org.opensearch.index.mapper.DateFieldMapper; import org.opensearch.search.DocValueFormat; import org.opensearch.search.aggregations.Aggregator; import org.opensearch.search.aggregations.AggregatorFactories; @@ -48,6 +50,7 @@ import org.opensearch.search.aggregations.LeafBucketCollectorBase; import org.opensearch.search.aggregations.bucket.BucketsAggregator; import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds; +import org.opensearch.search.aggregations.support.FieldContext; import org.opensearch.search.aggregations.support.ValuesSource; import org.opensearch.search.aggregations.support.ValuesSourceConfig; import org.opensearch.search.internal.SearchContext; @@ -66,7 +69,6 @@ * @opensearch.internal */ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAggregator { - private final ValuesSource.Numeric valuesSource; private final DocValueFormat formatter; private final Rounding rounding; @@ -76,12 +78,12 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg private final Rounding.Prepared preparedRounding; private final BucketOrder order; private final boolean keyed; - private final long minDocCount; private final LongBounds extendedBounds; private final LongBounds hardBounds; - + private final Weight[] filters; private final LongKeyedBucketOrds bucketOrds; + private final DateFieldMapper.DateFieldType fieldType; DateHistogramAggregator( String name, @@ -99,7 +101,6 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg CardinalityUpperBound cardinality, Map metadata ) throws IOException { - super(name, factories, aggregationContext, parent, CardinalityUpperBound.MANY, metadata); this.rounding = rounding; this.preparedRounding = preparedRounding; @@ -114,6 +115,35 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg this.formatter = valuesSourceConfig.format(); bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), cardinality); + + FilterRewriteHelper.FilterContext filterContext = FilterRewriteHelper.buildFastFilterContext( + parent, + subAggregators.length, + context, + x -> rounding, + () -> preparedRounding, + valuesSourceConfig, + this::computeBounds + ); + if (filterContext != null) { + fieldType = filterContext.fieldType; + filters = filterContext.filters; + } else { + filters = null; + fieldType = null; + } + } + + private long[] computeBounds(final FieldContext fieldContext) throws IOException { + final long[] bounds = FilterRewriteHelper.getAggregationBounds(context, fieldContext.field()); + if (bounds != null) { + // Update min/max limit if user specified any hard bounds + if (hardBounds != null) { + bounds[0] = Math.max(bounds[0], hardBounds.getMin()); + bounds[1] = Math.min(bounds[1], hardBounds.getMax() - 1); // hard bounds max is exclusive + } + } + return bounds; } @Override @@ -129,10 +159,27 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol if (valuesSource == null) { return LeafBucketCollector.NO_OP_COLLECTOR; } + + // Need to be declared as final and array for usage within the + // LeafBucketCollectorBase subclass below + final boolean[] useOpt = new boolean[1]; + useOpt[0] = filters != null; + SortedNumericDocValues values = valuesSource.longValues(ctx); return new LeafBucketCollectorBase(sub, values) { @Override public void collect(int doc, long owningBucketOrd) throws IOException { + // Try fast filter aggregation if the filters have been created + // Skip if tried before and gave incorrect/incomplete results + if (useOpt[0]) { + useOpt[0] = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType, (key, count) -> { + incrementBucketDocCount( + FilterRewriteHelper.getBucketOrd(bucketOrds.add(owningBucketOrd, preparedRounding.round(key))), + count + ); + }); + } + if (values.advanceExact(doc)) { int valuesCount = values.docValueCount(); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/FilterRewriteHelper.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/FilterRewriteHelper.java new file mode 100644 index 0000000000000..c6f8296e29dc0 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/FilterRewriteHelper.java @@ -0,0 +1,281 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.aggregations.bucket.histogram; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.PointValues; +import org.apache.lucene.search.CollectionTerminatedException; +import org.apache.lucene.search.ConstantScoreQuery; +import org.apache.lucene.search.IndexOrDocValuesQuery; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.PointRangeQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Weight; +import org.apache.lucene.util.NumericUtils; +import org.opensearch.common.CheckedFunction; +import org.opensearch.common.Rounding; +import org.opensearch.common.lucene.search.function.FunctionScoreQuery; +import org.opensearch.index.mapper.DateFieldMapper; +import org.opensearch.index.query.DateRangeIncludingNowQuery; +import org.opensearch.search.aggregations.support.FieldContext; +import org.opensearch.search.aggregations.support.ValuesSourceConfig; +import org.opensearch.search.internal.SearchContext; + +import java.io.IOException; +import java.time.ZoneId; +import java.time.format.TextStyle; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.function.BiConsumer; +import java.util.function.Function; +import java.util.function.Supplier; + +/** + * Helpers functions to rewrite and optimize aggregations using + * range filter queries + * + * @opensearch.internal + */ +public class FilterRewriteHelper { + + static class FilterContext { + final DateFieldMapper.DateFieldType fieldType; + final Weight[] filters; + + public FilterContext(DateFieldMapper.DateFieldType fieldType, Weight[] filters) { + this.fieldType = fieldType; + this.filters = filters; + } + } + + private static final int MAX_NUM_FILTER_BUCKETS = 1024; + private static final Map> queryWrappers; + + // Initialize the wrappers map for unwrapping the query + static { + queryWrappers = new HashMap<>(); + queryWrappers.put(ConstantScoreQuery.class, q -> ((ConstantScoreQuery) q).getQuery()); + queryWrappers.put(FunctionScoreQuery.class, q -> ((FunctionScoreQuery) q).getSubQuery()); + queryWrappers.put(DateRangeIncludingNowQuery.class, q -> ((DateRangeIncludingNowQuery) q).getQuery()); + queryWrappers.put(IndexOrDocValuesQuery.class, q -> ((IndexOrDocValuesQuery) q).getIndexQuery()); + } + + /** + * Recursively unwraps query into the concrete form + * for applying the optimization + */ + private static Query unwrapIntoConcreteQuery(Query query) { + while (queryWrappers.containsKey(query.getClass())) { + query = queryWrappers.get(query.getClass()).apply(query); + } + + return query; + } + + /** + * Finds the min and max bounds for segments within the passed search context + */ + private static long[] getIndexBoundsFromLeaves(final SearchContext context, final String fieldName) throws IOException { + final List leaves = context.searcher().getIndexReader().leaves(); + long min = Long.MAX_VALUE, max = Long.MIN_VALUE; + // Since the query does not specify bounds for aggregation, we can + // build the global min/max from local min/max within each segment + for (LeafReaderContext leaf : leaves) { + final PointValues values = leaf.reader().getPointValues(fieldName); + if (values != null) { + min = Math.min(min, NumericUtils.sortableBytesToLong(values.getMinPackedValue(), 0)); + max = Math.max(max, NumericUtils.sortableBytesToLong(values.getMaxPackedValue(), 0)); + } + } + + if (min == Long.MAX_VALUE || max == Long.MIN_VALUE) return null; + + return new long[] { min, max }; + } + + static long[] getAggregationBounds(final SearchContext context, final String fieldName) throws IOException { + final Query cq = unwrapIntoConcreteQuery(context.query()); + final long[] indexBounds = getIndexBoundsFromLeaves(context, fieldName); + if (cq instanceof PointRangeQuery) { + final PointRangeQuery prq = (PointRangeQuery) cq; + // Ensure that the query and aggregation are on the same field + if (prq.getField().equals(fieldName)) { + return new long[] { + // Minimum bound for aggregation is the max between query and global + Math.max(NumericUtils.sortableBytesToLong(prq.getLowerPoint(), 0), indexBounds[0]), + // Maximum bound for aggregation is the min between query and global + Math.min(NumericUtils.sortableBytesToLong(prq.getUpperPoint(), 0), indexBounds[1]) }; + } + } else if (cq instanceof MatchAllDocsQuery) { + return indexBounds; + } + + return null; + } + + /** + * Helper function for checking if the time zone requested for date histogram + * aggregation is utc or not + */ + private static boolean isUTCTimeZone(final ZoneId zoneId) { + return "Z".equals(zoneId.getDisplayName(TextStyle.FULL, Locale.ENGLISH)); + } + + /** + * Creates the range query filters for aggregations using the interval, min/max + * bounds and the rounding values + */ + private static Weight[] createFilterForAggregations( + final SearchContext context, + final Rounding rounding, + final Rounding.Prepared preparedRounding, + final String field, + final DateFieldMapper.DateFieldType fieldType, + final long low, + final long high + ) throws IOException { + long interval; + if (rounding instanceof Rounding.TimeUnitRounding) { + interval = (((Rounding.TimeUnitRounding) rounding).getUnit()).extraLocalOffsetLookup(); + if (!isUTCTimeZone(((Rounding.TimeUnitRounding) rounding).getTimeZone())) { + // Fast filter aggregation cannot be used if it needs time zone rounding + return null; + } + } else if (rounding instanceof Rounding.TimeIntervalRounding) { + interval = ((Rounding.TimeIntervalRounding) rounding).getInterval(); + if (!isUTCTimeZone(((Rounding.TimeIntervalRounding) rounding).getTimeZone())) { + // Fast filter aggregation cannot be used if it needs time zone rounding + return null; + } + } else { + // Unexpected scenario, exit and fall back to original + return null; + } + + // Calculate the number of buckets using range and interval + long roundedLow = preparedRounding.round(fieldType.convertNanosToMillis(low)); + long prevRounded = roundedLow; + int bucketCount = 0; + while (roundedLow <= fieldType.convertNanosToMillis(high)) { + bucketCount++; + // Below rounding is needed as the interval could return in + // non-rounded values for something like calendar month + roundedLow = preparedRounding.round(roundedLow + interval); + if (prevRounded == roundedLow) break; + prevRounded = roundedLow; + } + + Weight[] filters = null; + if (bucketCount > 0 && bucketCount <= MAX_NUM_FILTER_BUCKETS) { + int i = 0; + filters = new Weight[bucketCount]; + roundedLow = preparedRounding.round(fieldType.convertNanosToMillis(low)); + while (i < bucketCount) { + // Calculate the lower bucket bound + final byte[] lower = new byte[8]; + NumericUtils.longToSortableBytes(i == 0 ? low : fieldType.convertRoundedMillisToNanos(roundedLow), lower, 0); + // Calculate the upper bucket bound + final byte[] upper = new byte[8]; + roundedLow = preparedRounding.round(roundedLow + interval); + // Subtract -1 if the minimum is roundedLow as roundedLow itself + // is included in the next bucket + NumericUtils.longToSortableBytes( + i + 1 == bucketCount ? high : fieldType.convertRoundedMillisToNanos(roundedLow) - 1, + upper, + 0 + ); + filters[i++] = context.searcher().createWeight(new PointRangeQuery(field, lower, upper, 1) { + @Override + protected String toString(int dimension, byte[] value) { + return null; + } + }, ScoreMode.COMPLETE_NO_SCORES, 1); + } + } + + return filters; + } + + static FilterContext buildFastFilterContext( + final Object parent, + final int subAggLength, + SearchContext context, + Function roundingFunction, + Supplier preparedRoundingSupplier, + ValuesSourceConfig valuesSourceConfig, + CheckedFunction computeBounds + ) throws IOException { + // Create the filters for fast aggregation only if the query is instance + // of point range query and there aren't any parent/sub aggregations + if (parent == null && subAggLength == 0 && valuesSourceConfig.missing() == null && valuesSourceConfig.script() == null) { + final FieldContext fieldContext = valuesSourceConfig.fieldContext(); + if (fieldContext != null) { + final String fieldName = fieldContext.field(); + final long[] bounds = computeBounds.apply(fieldContext); + if (bounds != null) { + assert fieldContext.fieldType() instanceof DateFieldMapper.DateFieldType; + final DateFieldMapper.DateFieldType fieldType = (DateFieldMapper.DateFieldType) fieldContext.fieldType(); + final Rounding rounding = roundingFunction.apply(bounds); + final Weight[] filters = FilterRewriteHelper.createFilterForAggregations( + context, + rounding, + preparedRoundingSupplier.get(), + fieldName, + fieldType, + bounds[0], + bounds[1] + ); + return new FilterContext(fieldType, filters); + } + } + } + return null; + } + + static long getBucketOrd(long bucketOrd) { + if (bucketOrd < 0) { // already seen + bucketOrd = -1 - bucketOrd; + } + + return bucketOrd; + } + + static boolean tryFastFilterAggregation( + final LeafReaderContext ctx, + final Weight[] filters, + final DateFieldMapper.DateFieldType fieldType, + final BiConsumer incrementDocCount + ) throws IOException { + final int[] counts = new int[filters.length]; + int i; + for (i = 0; i < filters.length; i++) { + counts[i] = filters[i].count(ctx); + if (counts[i] == -1) { + // Cannot use the optimization if any of the counts + // is -1 indicating the segment might have deleted documents + return false; + } + } + + for (i = 0; i < filters.length; i++) { + if (counts[i] > 0) { + incrementDocCount.accept( + fieldType.convertNanosToMillis( + NumericUtils.sortableBytesToLong(((PointRangeQuery) filters[i].getQuery()).getLowerPoint(), 0) + ), + counts[i] + ); + } + } + throw new CollectionTerminatedException(); + } +} diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java index 37cd7a42c7cdf..dda053af78b30 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java @@ -969,6 +969,7 @@ private void indexSampleData(List dataset, RandomIndexWriter inde for (final ZonedDateTime date : dataset) { final long instant = date.toInstant().toEpochMilli(); document.add(new SortedNumericDocValuesField(DATE_FIELD, instant)); + document.add(new LongPoint(DATE_FIELD, instant)); document.add(new LongPoint(INSTANT_FIELD, instant)); document.add(new SortedNumericDocValuesField(NUMERIC_FIELD, i)); indexWriter.addDocument(document);