From ed33488aa426bd618685729fc638adad763f6ff7 Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Thu, 2 May 2024 19:35:32 -0700 Subject: [PATCH] Support multi ranges traversal when doing date histogram rewrite optimization (#13317) --- CHANGELOG.md | 1 + .../test/search.aggregation/10_histogram.yml | 56 ++ .../test/search.aggregation/230_composite.yml | 58 ++ .../330_auto_date_histogram.yml | 26 + .../org/opensearch/search/SearchService.java | 2 +- .../bucket/FastFilterRewriteHelper.java | 502 +++++++++++++----- .../bucket/composite/CompositeAggregator.java | 19 +- .../AutoDateHistogramAggregator.java | 14 +- .../histogram/DateHistogramAggregator.java | 9 +- .../DateHistogramAggregatorTests.java | 274 ++++++++++ .../aggregations/AggregatorTestCase.java | 4 +- 11 files changed, 821 insertions(+), 144 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cdaa1904e2522..3f2b92c76689f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Remote Store] Upload remote paths during remote enabled index creation ([#13386](https://github.com/opensearch-project/OpenSearch/pull/13386)) - [Search Pipeline] Handle default pipeline for multiple indices ([#13276](https://github.com/opensearch-project/OpenSearch/pull/13276)) - Add support for deep copying SearchRequest ([#12295](https://github.com/opensearch-project/OpenSearch/pull/12295)) +- Support multi ranges traversal when doing date histogram rewrite optimization. ([#13317](https://github.com/opensearch-project/OpenSearch/pull/13317)) ### Dependencies - Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896)) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/10_histogram.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/10_histogram.yml index e7da9a0bc454c..fa71137912a91 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/10_histogram.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/10_histogram.yml @@ -644,3 +644,59 @@ setup: - match: { aggregations.histo.buckets.0.doc_count: 1 } - match: { aggregations.histo.buckets.20.key: 20 } - match: { aggregations.histo.buckets.20.doc_count: 1 } + +--- +"date_histogram profiler shows filter rewrite info": + - skip: + version: " - 2.99.99" + reason: debug info for filter rewrite added in 3.0.0 (to be backported to 2.14.0) + + - do: + indices.create: + index: test_2 + body: + settings: + number_of_replicas: 0 + number_of_shards: 1 + mappings: + properties: + date: + type: date + + - do: + bulk: + index: test_2 + refresh: true + body: + - '{"index": {}}' + - '{"date": "2016-01-01"}' + - '{"index": {}}' + - '{"date": "2016-01-02"}' + - '{"index": {}}' + - '{"date": "2016-02-01"}' + - '{"index": {}}' + - '{"date": "2016-03-01"}' + + - do: + search: + index: test_2 + body: + size: 0 + profile: true + aggs: + histo: + date_histogram: + field: date + calendar_interval: month + + - match: { hits.total.value: 4 } + - length: { aggregations.histo.buckets: 3 } + - match: { aggregations.histo.buckets.0.key_as_string: "2016-01-01T00:00:00.000Z" } + - match: { aggregations.histo.buckets.0.doc_count: 2 } + - match: { profile.shards.0.aggregations.0.type: DateHistogramAggregator } + - match: { profile.shards.0.aggregations.0.description: histo } + - match: { profile.shards.0.aggregations.0.debug.total_buckets: 3 } + - match: { profile.shards.0.aggregations.0.debug.optimized_segments: 1 } + - match: { profile.shards.0.aggregations.0.debug.unoptimized_segments: 0 } + - match: { profile.shards.0.aggregations.0.debug.leaf_visited: 1 } + - match: { profile.shards.0.aggregations.0.debug.inner_visited: 0 } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/230_composite.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/230_composite.yml index 2808be8cd7045..3a0099dae3b33 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/230_composite.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/230_composite.yml @@ -1069,3 +1069,61 @@ setup: - match: { aggregations.test.buckets.1.doc_count: 2 } - match: { aggregations.test.buckets.2.key.kw: null } - match: { aggregations.test.buckets.2.doc_count: 2 } + +--- +"composite aggregation date_histogram profile shows filter rewrite info": + - skip: + version: " - 2.99.99" + reason: debug info for filter rewrite added in 3.0.0 (to be backported to 2.14.0) + + - do: + indices.create: + index: test_2 + body: + settings: + number_of_replicas: 0 + number_of_shards: 1 + mappings: + properties: + date: + type: date + - do: + bulk: + index: test_2 + refresh: true + body: + - '{"index": {}}' + - '{"date": "2016-01-01"}' + - '{"index": {}}' + - '{"date": "2016-01-02"}' + - '{"index": {}}' + - '{"date": "2016-02-01"}' + - '{"index": {}}' + - '{"date": "2016-03-01"}' + - do: + search: + index: test_2 + body: + size: 0 + profile: true + aggregations: + test: + composite: + sources: [ + { + "date": { + "date_histogram": { + "field": "date", + "calendar_interval": "1d", + "format": "strict_date" + } + } + } + ] + + - match: { hits.total.value: 4 } + - length: { aggregations.test.buckets: 4 } + - match: { profile.shards.0.aggregations.0.debug.optimized_segments: 1 } + - match: { profile.shards.0.aggregations.0.debug.unoptimized_segments: 0 } + - match: { profile.shards.0.aggregations.0.debug.leaf_visited: 1 } + - match: { profile.shards.0.aggregations.0.debug.inner_visited: 0 } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/330_auto_date_histogram.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/330_auto_date_histogram.yml index 6b5e06a549be3..1356eac41ae79 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/330_auto_date_histogram.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/330_auto_date_histogram.yml @@ -99,3 +99,29 @@ setup: - length: { aggregations.histo.buckets: 2 } - match: { profile.shards.0.aggregations.0.type: AutoDateHistogramAggregator.FromSingle } - match: { profile.shards.0.aggregations.0.debug.surviving_buckets: 4 } + +--- +"auto_date_histogram profile shows filter rewrite info": + - skip: + version: " - 2.99.99" + reason: debug info for filter rewrite added in 3.0.0 (to be backported to 2.14.0) + + - do: + search: + body: + profile: true + size: 0 + aggs: + histo: + auto_date_histogram: + field: date + buckets: 2 + + - match: { hits.total.value: 4 } + - length: { aggregations.histo.buckets: 2 } + - match: { profile.shards.0.aggregations.0.type: AutoDateHistogramAggregator.FromSingle } + - match: { profile.shards.0.aggregations.0.debug.surviving_buckets: 4 } + - match: { profile.shards.0.aggregations.0.debug.optimized_segments: 1 } + - match: { profile.shards.0.aggregations.0.debug.unoptimized_segments: 0 } + - match: { profile.shards.0.aggregations.0.debug.leaf_visited: 1 } + - match: { profile.shards.0.aggregations.0.debug.inner_visited: 0 } diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index 6b3620e65a271..744d3a19f1593 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -275,7 +275,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv // value 0 means rewrite filters optimization in aggregations will be disabled public static final Setting MAX_AGGREGATION_REWRITE_FILTERS = Setting.intSetting( "search.max_aggregation_rewrite_filters", - 72, + 3000, 0, Property.Dynamic, Property.NodeScope diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java index dde748bf0dc07..c8ce39a52f869 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java @@ -10,12 +10,13 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.lucene.document.LongPoint; import org.apache.lucene.index.DocValues; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.PointValues; +import org.apache.lucene.search.CollectionTerminatedException; import org.apache.lucene.search.ConstantScoreQuery; +import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.FieldExistsQuery; import org.apache.lucene.search.IndexOrDocValuesQuery; import org.apache.lucene.search.MatchAllDocsQuery; @@ -23,24 +24,30 @@ import org.apache.lucene.search.Query; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.Weight; +import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.NumericUtils; +import org.opensearch.common.CheckedRunnable; import org.opensearch.common.Rounding; import org.opensearch.common.lucene.search.function.FunctionScoreQuery; import org.opensearch.index.mapper.DateFieldMapper; import org.opensearch.index.mapper.DocCountFieldMapper; import org.opensearch.index.mapper.MappedFieldType; import org.opensearch.index.query.DateRangeIncludingNowQuery; +import org.opensearch.search.aggregations.bucket.composite.CompositeAggregator; import org.opensearch.search.aggregations.bucket.composite.CompositeValuesSourceConfig; import org.opensearch.search.aggregations.bucket.composite.RoundingValuesSource; import org.opensearch.search.aggregations.bucket.histogram.LongBounds; import org.opensearch.search.internal.SearchContext; import java.io.IOException; +import java.util.Arrays; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.OptionalLong; import java.util.function.BiConsumer; +import java.util.function.BiFunction; import java.util.function.Function; import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; @@ -128,9 +135,10 @@ private static long[] getSegmentBounds(final LeafReaderContext context, final St } /** - * This method also acts as a pre-condition check for the optimization + * Gets the min and max bounds of the field for the shard search + * Depending on the query part, the bounds are computed differently * - * @return null if the processed query not as expected + * @return null if the processed query not supported by the optimization */ public static long[] getDateHistoAggBounds(final SearchContext context, final String fieldName) throws IOException { final Query cq = unwrapIntoConcreteQuery(context.query()); @@ -167,67 +175,6 @@ private static long[] getBoundsWithRangeQuery(PointRangeQuery prq, String fieldN return null; } - /** - * Creates the date range filters for aggregations using the interval, min/max - * bounds and prepared rounding - */ - private static Weight[] createFilterForAggregations( - final SearchContext context, - final DateFieldMapper.DateFieldType fieldType, - final long interval, - final Rounding.Prepared preparedRounding, - long low, - final long high - ) throws IOException { - // Calculate the number of buckets using range and interval - long roundedLow = preparedRounding.round(fieldType.convertNanosToMillis(low)); - long prevRounded = roundedLow; - int bucketCount = 0; - while (roundedLow <= fieldType.convertNanosToMillis(high)) { - bucketCount++; - int maxNumFilterBuckets = context.maxAggRewriteFilters(); - if (bucketCount > maxNumFilterBuckets) { - logger.debug("Max number of filters reached [{}], skip the fast filter optimization", maxNumFilterBuckets); - return null; - } - // Below rounding is needed as the interval could return in - // non-rounded values for something like calendar month - roundedLow = preparedRounding.round(roundedLow + interval); - if (prevRounded == roundedLow) break; // prevents getting into an infinite loop - prevRounded = roundedLow; - } - - Weight[] filters = null; - if (bucketCount > 0) { - filters = new Weight[bucketCount]; - roundedLow = preparedRounding.round(fieldType.convertNanosToMillis(low)); - - int i = 0; - while (i < bucketCount) { - // Calculate the lower bucket bound - final byte[] lower = new byte[8]; - NumericUtils.longToSortableBytes(i == 0 ? low : fieldType.convertRoundedMillisToNanos(roundedLow), lower, 0); - - // Calculate the upper bucket bound - roundedLow = preparedRounding.round(roundedLow + interval); - final byte[] upper = new byte[8]; - NumericUtils.longToSortableBytes(i + 1 == bucketCount ? high : - // Subtract -1 if the minimum is roundedLow as roundedLow itself - // is included in the next bucket - fieldType.convertRoundedMillisToNanos(roundedLow) - 1, upper, 0); - - filters[i++] = context.searcher().createWeight(new PointRangeQuery(fieldType.name(), lower, upper, 1) { - @Override - protected String toString(int dimension, byte[] value) { - return Long.toString(LongPoint.decodeDimension(value, 0)); - } - }, ScoreMode.COMPLETE_NO_SCORES, 1); - } - } - - return filters; - } - /** * Context object for fast filter optimization *

@@ -235,12 +182,24 @@ protected String toString(int dimension, byte[] value) { */ public static class FastFilterContext { private boolean rewriteable = false; - private Weight[] filters = null; - private boolean filtersBuiltAtShardLevel = false; + private boolean rangesBuiltAtShardLevel = false; private AggregationType aggregationType; private final SearchContext context; + private String fieldName; + private long[][] ranges; + + // debug info related fields + public int leaf; + public int inner; + public int segments; + public int optimizedSegments; + + public void setFieldName(String fieldName) { + this.fieldName = fieldName; + } + public FastFilterContext(SearchContext context) { this.context = context; } @@ -262,24 +221,26 @@ public boolean isRewriteable(final Object parent, final int subAggLength) { return rewriteable; } - public void buildFastFilter() throws IOException { - assert filters == null : "Filters should only be built once, but they are already built"; - this.filters = this.aggregationType.buildFastFilter(context); - if (filters != null) { - logger.debug("Fast filter built for shard {}", context.indexShard().shardId()); - filtersBuiltAtShardLevel = true; + public void buildRanges() throws IOException { + assert ranges == null : "Ranges should only be built once at shard level, but they are already built"; + this.ranges = this.aggregationType.buildRanges(context); + if (ranges != null) { + logger.debug("Ranges built for shard {}", context.indexShard().shardId()); + rangesBuiltAtShardLevel = true; } } - /** - * Built filters for a segment - */ - public Weight[] buildFastFilter(LeafReaderContext leaf) throws IOException { - Weight[] filters = this.aggregationType.buildFastFilter(leaf, context); - if (filters != null) { - logger.debug("Fast filter built for shard {} segment {}", context.indexShard().shardId(), leaf.ord); + public long[][] buildRanges(LeafReaderContext leaf) throws IOException { + long[][] ranges = this.aggregationType.buildRanges(leaf, context); + if (ranges != null) { + logger.debug("Ranges built for shard {} segment {}", context.indexShard().shardId(), leaf.ord); } - return filters; + return ranges; + } + + private void consumeDebugInfo(DebugInfo debug) { + leaf += debug.leaf; + inner += debug.inner; } } @@ -287,16 +248,11 @@ public Weight[] buildFastFilter(LeafReaderContext leaf) throws IOException { * Different types have different pre-conditions, filter building logic, etc. */ interface AggregationType { - boolean isRewriteable(Object parent, int subAggLength); - Weight[] buildFastFilter(SearchContext ctx) throws IOException; - - Weight[] buildFastFilter(LeafReaderContext leaf, SearchContext ctx) throws IOException; + long[][] buildRanges(SearchContext ctx) throws IOException; - default int getSize() { - return Integer.MAX_VALUE; - } + long[][] buildRanges(LeafReaderContext leaf, SearchContext ctx) throws IOException; } /** @@ -330,20 +286,13 @@ public boolean isRewriteable(Object parent, int subAggLength) { } @Override - public Weight[] buildFastFilter(SearchContext context) throws IOException { + public long[][] buildRanges(SearchContext context) throws IOException { long[] bounds = getDateHistoAggBounds(context, fieldType.name()); logger.debug("Bounds are {} for shard {}", bounds, context.indexShard().shardId()); - return buildFastFilter(context, bounds); + return buildRanges(context, bounds); } - @Override - public Weight[] buildFastFilter(LeafReaderContext leaf, SearchContext context) throws IOException { - long[] bounds = getSegmentBounds(leaf, fieldType.name()); - logger.debug("Bounds are {} for shard {} segment {}", bounds, context.indexShard().shardId(), leaf.ord); - return buildFastFilter(context, bounds); - } - - private Weight[] buildFastFilter(SearchContext context, long[] bounds) throws IOException { + private long[][] buildRanges(SearchContext context, long[] bounds) throws IOException { bounds = processHardBounds(bounds); if (bounds == null) { return null; @@ -360,7 +309,7 @@ private Weight[] buildFastFilter(SearchContext context, long[] bounds) throws IO // process the after key of composite agg processAfterKey(bounds, interval); - return FastFilterRewriteHelper.createFilterForAggregations( + return FastFilterRewriteHelper.createRangesFromAgg( context, (DateFieldMapper.DateFieldType) fieldType, interval, @@ -370,6 +319,13 @@ private Weight[] buildFastFilter(SearchContext context, long[] bounds) throws IO ); } + @Override + public long[][] buildRanges(LeafReaderContext leaf, SearchContext context) throws IOException { + long[] bounds = getSegmentBounds(leaf, fieldType.name()); + logger.debug("Bounds are {} for shard {} segment {}", bounds, context.indexShard().shardId(), leaf.ord); + return buildRanges(context, bounds); + } + protected abstract Rounding getRounding(final long low, final long high); protected abstract Rounding.Prepared getRoundingPrepared(); @@ -413,7 +369,7 @@ public static long getBucketOrd(long bucketOrd) { } /** - * Try to get the bucket doc counts from the fast filters for the aggregation + * Try to get the bucket doc counts for the date histogram aggregation *

* Usage: invoked at segment level — in getLeafCollector of aggregator * @@ -424,11 +380,18 @@ public static boolean tryFastFilterAggregation( FastFilterContext fastFilterContext, final BiConsumer incrementDocCount ) throws IOException { - if (fastFilterContext == null) return false; + fastFilterContext.segments++; if (!fastFilterContext.rewriteable) { return false; } + if (ctx.reader().hasDeletions()) return false; + + PointValues values = ctx.reader().getPointValues(fastFilterContext.fieldName); + if (values == null) return false; + // only proceed if every document corresponds to exactly one point + if (values.getDocCount() != values.size()) return false; + NumericDocValues docCountValues = DocValues.getNumeric(ctx.reader(), DocCountFieldMapper.NAME); if (docCountValues.nextDoc() != NO_MORE_DOCS) { logger.debug( @@ -439,61 +402,330 @@ public static boolean tryFastFilterAggregation( return false; } - // if no filters built at shard level (see getDateHistoAggBounds method for possible reasons) - // check if the query is functionally match-all at segment level - if (!fastFilterContext.filtersBuiltAtShardLevel && !segmentMatchAll(fastFilterContext.context, ctx)) { + // even if no ranges built at shard level, we can still perform the optimization + // when functionally match-all at segment level + if (!fastFilterContext.rangesBuiltAtShardLevel && !segmentMatchAll(fastFilterContext.context, ctx)) { return false; } - Weight[] filters = fastFilterContext.filters; - if (filters == null) { + long[][] ranges = fastFilterContext.ranges; + if (ranges == null) { logger.debug( "Shard {} segment {} functionally match all documents. Build the fast filter", fastFilterContext.context.indexShard().shardId(), ctx.ord ); - filters = fastFilterContext.buildFastFilter(ctx); - if (filters == null) { + ranges = fastFilterContext.buildRanges(ctx); + if (ranges == null) { return false; } } - final int[] counts = new int[filters.length]; - int i; - for (i = 0; i < filters.length; i++) { - counts[i] = filters[i].count(ctx); - if (counts[i] == -1) { - // Cannot use the optimization if any of the counts - // is -1 indicating the segment might have deleted documents - return false; + final AggregationType aggregationType = fastFilterContext.aggregationType; + assert aggregationType instanceof AbstractDateHistogramAggregationType; + final DateFieldMapper.DateFieldType fieldType = ((AbstractDateHistogramAggregationType) aggregationType).getFieldType(); + int size = Integer.MAX_VALUE; + if (aggregationType instanceof CompositeAggregator.CompositeAggregationType) { + size = ((CompositeAggregator.CompositeAggregationType) aggregationType).getSize(); + } + DebugInfo debugInfo = multiRangesTraverse(values.getPointTree(), ranges, incrementDocCount, fieldType, size); + fastFilterContext.consumeDebugInfo(debugInfo); + + fastFilterContext.optimizedSegments++; + logger.debug("Fast filter optimization applied to shard {} segment {}", fastFilterContext.context.indexShard().shardId(), ctx.ord); + logger.debug("crossed leaf nodes: {}, inner nodes: {}", fastFilterContext.leaf, fastFilterContext.inner); + return true; + } + + private static boolean segmentMatchAll(SearchContext ctx, LeafReaderContext leafCtx) throws IOException { + Weight weight = ctx.searcher().createWeight(ctx.query(), ScoreMode.COMPLETE_NO_SCORES, 1f); + return weight != null && weight.count(leafCtx) == leafCtx.reader().numDocs(); + } + + /** + * Creates the date ranges from date histo aggregations using its interval, + * and min/max boundaries + */ + private static long[][] createRangesFromAgg( + final SearchContext context, + final DateFieldMapper.DateFieldType fieldType, + final long interval, + final Rounding.Prepared preparedRounding, + long low, + final long high + ) { + // Calculate the number of buckets using range and interval + long roundedLow = preparedRounding.round(fieldType.convertNanosToMillis(low)); + long prevRounded = roundedLow; + int bucketCount = 0; + while (roundedLow <= fieldType.convertNanosToMillis(high)) { + bucketCount++; + int maxNumFilterBuckets = context.maxAggRewriteFilters(); + if (bucketCount > maxNumFilterBuckets) { + logger.debug("Max number of filters reached [{}], skip the fast filter optimization", maxNumFilterBuckets); + return null; + } + // Below rounding is needed as the interval could return in + // non-rounded values for something like calendar month + roundedLow = preparedRounding.round(roundedLow + interval); + if (prevRounded == roundedLow) break; // prevents getting into an infinite loop + prevRounded = roundedLow; + } + + long[][] ranges = new long[bucketCount][2]; + if (bucketCount > 0) { + roundedLow = preparedRounding.round(fieldType.convertNanosToMillis(low)); + + int i = 0; + while (i < bucketCount) { + // Calculate the lower bucket bound + long lower = i == 0 ? low : fieldType.convertRoundedMillisToNanos(roundedLow); + roundedLow = preparedRounding.round(roundedLow + interval); + + // Subtract -1 if the minimum is roundedLow as roundedLow itself + // is included in the next bucket + long upper = i + 1 == bucketCount ? high : fieldType.convertRoundedMillisToNanos(roundedLow) - 1; + + ranges[i][0] = lower; + ranges[i][1] = upper; + i++; + } + } + + return ranges; + } + + /** + * @param maxNumNonZeroRanges the number of non-zero ranges to collect + */ + private static DebugInfo multiRangesTraverse( + final PointValues.PointTree tree, + final long[][] ranges, + final BiConsumer incrementDocCount, + final DateFieldMapper.DateFieldType fieldType, + final int maxNumNonZeroRanges + ) throws IOException { + // ranges are connected and in ascending order + Iterator rangeIter = Arrays.stream(ranges).iterator(); + long[] activeRange = rangeIter.next(); + + // make sure the first range at least crosses the min value of the tree + DebugInfo debugInfo = new DebugInfo(); + if (activeRange[0] > NumericUtils.sortableBytesToLong(tree.getMaxPackedValue(), 0)) { + logger.debug("No ranges match the query, skip the fast filter optimization"); + return debugInfo; + } + while (activeRange[1] < NumericUtils.sortableBytesToLong(tree.getMinPackedValue(), 0)) { + if (!rangeIter.hasNext()) { + logger.debug("No ranges match the query, skip the fast filter optimization"); + return debugInfo; } + activeRange = rangeIter.next(); } - int s = 0; - int size = fastFilterContext.aggregationType.getSize(); - for (i = 0; i < filters.length; i++) { - if (counts[i] > 0) { - long bucketKey = i; // the index of filters is the key for filters aggregation - if (fastFilterContext.aggregationType instanceof AbstractDateHistogramAggregationType) { - final DateFieldMapper.DateFieldType fieldType = - ((AbstractDateHistogramAggregationType) fastFilterContext.aggregationType).getFieldType(); - bucketKey = fieldType.convertNanosToMillis( - NumericUtils.sortableBytesToLong(((PointRangeQuery) filters[i].getQuery()).getLowerPoint(), 0) - ); + RangeCollectorForPointTree collector = new RangeCollectorForPointTree( + incrementDocCount, + fieldType, + rangeIter, + maxNumNonZeroRanges, + activeRange + ); + + final ArrayUtil.ByteArrayComparator comparator = ArrayUtil.getUnsignedComparator(8); + PointValues.IntersectVisitor visitor = getIntersectVisitor(collector, comparator); + try { + intersectWithRanges(visitor, tree, collector, debugInfo); + } catch (CollectionTerminatedException e) { + logger.debug("Early terminate since no more range to collect"); + } + collector.finalizePreviousRange(); + + return debugInfo; + } + + private static void intersectWithRanges( + PointValues.IntersectVisitor visitor, + PointValues.PointTree pointTree, + RangeCollectorForPointTree collector, + DebugInfo debug + ) throws IOException { + PointValues.Relation r = visitor.compare(pointTree.getMinPackedValue(), pointTree.getMaxPackedValue()); + + switch (r) { + case CELL_INSIDE_QUERY: + collector.countNode((int) pointTree.size()); + debug.visitInner(); + break; + case CELL_CROSSES_QUERY: + if (pointTree.moveToChild()) { + do { + intersectWithRanges(visitor, pointTree, collector, debug); + } while (pointTree.moveToSibling()); + pointTree.moveToParent(); + } else { + pointTree.visitDocValues(visitor); + debug.visitLeaf(); } - incrementDocCount.accept(bucketKey, counts[i]); - s++; - if (s > size) { - break; + break; + case CELL_OUTSIDE_QUERY: + } + } + + private static PointValues.IntersectVisitor getIntersectVisitor( + RangeCollectorForPointTree collector, + ArrayUtil.ByteArrayComparator comparator + ) { + return new PointValues.IntersectVisitor() { + @Override + public void visit(int docID) throws IOException { + // this branch should be unreachable + throw new UnsupportedOperationException( + "This IntersectVisitor does not perform any actions on a " + "docID=" + docID + " node being visited" + ); + } + + @Override + public void visit(int docID, byte[] packedValue) throws IOException { + visitPoints(packedValue, collector::count); + } + + @Override + public void visit(DocIdSetIterator iterator, byte[] packedValue) throws IOException { + visitPoints(packedValue, () -> { + for (int doc = iterator.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = iterator.nextDoc()) { + collector.count(); + } + }); + } + + private void visitPoints(byte[] packedValue, CheckedRunnable collect) throws IOException { + if (comparator.compare(packedValue, 0, collector.activeRangeAsByteArray[1], 0) > 0) { + // need to move to next range + collector.finalizePreviousRange(); + if (collector.iterateRangeEnd(packedValue, this::compareByteValue)) { + throw new CollectionTerminatedException(); + } + } + + if (pointCompare(collector.activeRangeAsByteArray[0], collector.activeRangeAsByteArray[1], packedValue)) { + collect.run(); + } + } + + private boolean pointCompare(byte[] lower, byte[] upper, byte[] packedValue) { + if (compareByteValue(packedValue, lower) < 0) { + return false; } + return compareByteValue(packedValue, upper) <= 0; + } + + private int compareByteValue(byte[] value1, byte[] value2) { + return comparator.compare(value1, 0, value2, 0); } + + @Override + public PointValues.Relation compare(byte[] minPackedValue, byte[] maxPackedValue) { + byte[] rangeMin = collector.activeRangeAsByteArray[0]; + byte[] rangeMax = collector.activeRangeAsByteArray[1]; + + if (compareByteValue(rangeMax, minPackedValue) < 0) { + collector.finalizePreviousRange(); + if (collector.iterateRangeEnd(minPackedValue, this::compareByteValue)) { + throw new CollectionTerminatedException(); + } + // compare the next range with this node's min max again + // new rangeMin = previous rangeMax + 1 <= min + rangeMax = collector.activeRangeAsByteArray[1]; + } + + if (compareByteValue(rangeMin, minPackedValue) > 0 || compareByteValue(rangeMax, maxPackedValue) < 0) { + return PointValues.Relation.CELL_CROSSES_QUERY; + } else { + return PointValues.Relation.CELL_INSIDE_QUERY; + } + } + }; + } + + private static class RangeCollectorForPointTree { + private final BiConsumer incrementDocCount; + private final DateFieldMapper.DateFieldType fieldType; + private int counter = 0; + + private long[] activeRange; + private byte[][] activeRangeAsByteArray; + private final Iterator rangeIter; + + private int visitedRange = 0; + private final int maxNumNonZeroRange; + + public RangeCollectorForPointTree( + BiConsumer incrementDocCount, + DateFieldMapper.DateFieldType fieldType, + Iterator rangeIter, + int maxNumNonZeroRange, + long[] activeRange + ) { + this.incrementDocCount = incrementDocCount; + this.fieldType = fieldType; + this.rangeIter = rangeIter; + this.maxNumNonZeroRange = maxNumNonZeroRange; + this.activeRange = activeRange; + this.activeRangeAsByteArray = activeRangeAsByteArray(); } - logger.debug("Fast filter optimization applied to shard {} segment {}", fastFilterContext.context.indexShard().shardId(), ctx.ord); - return true; + private void count() { + counter++; + } + + private void countNode(int count) { + counter += count; + } + + private void finalizePreviousRange() { + if (counter > 0) { + logger.debug("finalize previous range: {}", activeRange[0]); + logger.debug("counter: {}", counter); + incrementDocCount.accept(fieldType.convertNanosToMillis(activeRange[0]), counter); + counter = 0; + } + } + + /** + * @return true when iterator exhausted or collect enough non-zero ranges + */ + private boolean iterateRangeEnd(byte[] value, BiFunction comparator) { + // the new value may not be contiguous to the previous one + // so try to find the first next range that cross the new value + while (comparator.apply(activeRangeAsByteArray[1], value) < 0) { + if (!rangeIter.hasNext()) { + return true; + } + activeRange = rangeIter.next(); + activeRangeAsByteArray = activeRangeAsByteArray(); + } + visitedRange++; + return visitedRange > maxNumNonZeroRange; + } + + private byte[][] activeRangeAsByteArray() { + byte[] lower = new byte[8]; + byte[] upper = new byte[8]; + NumericUtils.longToSortableBytes(activeRange[0], lower, 0); + NumericUtils.longToSortableBytes(activeRange[1], upper, 0); + return new byte[][] { lower, upper }; + } } - private static boolean segmentMatchAll(SearchContext ctx, LeafReaderContext leafCtx) throws IOException { - Weight weight = ctx.searcher().createWeight(ctx.query(), ScoreMode.COMPLETE_NO_SCORES, 1f); - return weight != null && weight.count(leafCtx) == leafCtx.reader().numDocs(); + private static class DebugInfo { + private int leaf = 0; // leaf node visited + private int inner = 0; // inner node visited + + private void visitLeaf() { + leaf++; + } + + private void visitInner() { + inner++; + } } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java index b97c814cdf645..3713d8f83990d 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -87,6 +87,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.BiConsumer; import java.util.function.LongUnaryOperator; import java.util.stream.Collectors; @@ -97,7 +98,7 @@ * * @opensearch.internal */ -final class CompositeAggregator extends BucketsAggregator { +public final class CompositeAggregator extends BucketsAggregator { private final int size; private final List sourceNames; private final int[] reverseMuls; @@ -171,14 +172,15 @@ final class CompositeAggregator extends BucketsAggregator { // bucketOrds is used for saving date histogram results bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), CardinalityUpperBound.ONE); preparedRounding = ((CompositeAggregationType) fastFilterContext.getAggregationType()).getRoundingPrepared(); - fastFilterContext.buildFastFilter(); + fastFilterContext.setFieldName(sourceConfigs[0].fieldType().name()); + fastFilterContext.buildRanges(); } } /** * Currently the filter rewrite is only supported for date histograms */ - private class CompositeAggregationType extends FastFilterRewriteHelper.AbstractDateHistogramAggregationType { + public class CompositeAggregationType extends FastFilterRewriteHelper.AbstractDateHistogramAggregationType { private final RoundingValuesSource valuesSource; private long afterKey = -1L; @@ -210,7 +212,6 @@ protected void processAfterKey(long[] bound, long interval) { } } - @Override public int getSize() { return size; } @@ -706,4 +707,14 @@ private static class Entry { this.docIdSet = docIdSet; } } + + @Override + public void collectDebugInfo(BiConsumer add) { + if (fastFilterContext.optimizedSegments > 0) { + add.accept("optimized_segments", fastFilterContext.optimizedSegments); + add.accept("unoptimized_segments", fastFilterContext.segments - fastFilterContext.optimizedSegments); + add.accept("leaf_visited", fastFilterContext.leaf); + add.accept("inner_visited", fastFilterContext.inner); + } + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java index 12aefc540e75c..f326426800909 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java @@ -166,7 +166,8 @@ private AutoDateHistogramAggregator( ) ); if (fastFilterContext.isRewriteable(parent, subAggregators.length)) { - fastFilterContext.buildFastFilter(); + fastFilterContext.setFieldName(valuesSourceConfig.fieldType().name()); + fastFilterContext.buildRanges(); } } @@ -307,6 +308,17 @@ protected final void merge(long[] mergeMap, long newNumBuckets) { } } + @Override + public void collectDebugInfo(BiConsumer add) { + super.collectDebugInfo(add); + if (fastFilterContext.optimizedSegments > 0) { + add.accept("optimized_segments", fastFilterContext.optimizedSegments); + add.accept("unoptimized_segments", fastFilterContext.segments - fastFilterContext.optimizedSegments); + add.accept("leaf_visited", fastFilterContext.leaf); + add.accept("inner_visited", fastFilterContext.inner); + } + } + /** * Initially it uses the most fine grained rounding configuration possible * but as more data arrives it rebuckets the data until it "fits" in the diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java index 0e830106c8284..dd4ee9196fd62 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java @@ -126,7 +126,8 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg ) ); if (fastFilterContext.isRewriteable(parent, subAggregators.length)) { - fastFilterContext.buildFastFilter(); + fastFilterContext.setFieldName(valuesSourceConfig.fieldType().name()); + fastFilterContext.buildRanges(); } } @@ -255,6 +256,12 @@ public void doClose() { @Override public void collectDebugInfo(BiConsumer add) { add.accept("total_buckets", bucketOrds.size()); + if (fastFilterContext.optimizedSegments > 0) { + add.accept("optimized_segments", fastFilterContext.optimizedSegments); + add.accept("unoptimized_segments", fastFilterContext.segments - fastFilterContext.optimizedSegments); + add.accept("leaf_visited", fastFilterContext.leaf); + add.accept("inner_visited", fastFilterContext.inner); + } } /** diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java index 2a4fbca7a8541..cf95999ec5086 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java @@ -38,29 +38,42 @@ import org.apache.lucene.document.SortedNumericDocValuesField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.MatchNoDocsQuery; import org.apache.lucene.search.Query; import org.apache.lucene.store.Directory; import org.apache.lucene.tests.index.RandomIndexWriter; +import org.apache.lucene.tests.util.TestUtil; import org.opensearch.common.time.DateFormatters; +import org.opensearch.core.common.breaker.CircuitBreaker; +import org.opensearch.core.indices.breaker.NoneCircuitBreakerService; import org.opensearch.index.mapper.DateFieldMapper; import org.opensearch.index.mapper.DocCountFieldMapper; +import org.opensearch.index.mapper.MappedFieldType; import org.opensearch.search.aggregations.AggregationBuilder; import org.opensearch.search.aggregations.BucketOrder; +import org.opensearch.search.aggregations.InternalAggregation; +import org.opensearch.search.aggregations.MultiBucketConsumerService; import org.opensearch.search.aggregations.bucket.terms.StringTerms; import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.opensearch.search.aggregations.pipeline.PipelineAggregator; import org.opensearch.search.aggregations.support.AggregationInspectionHelper; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import static java.util.stream.Collectors.toList; +import static org.opensearch.test.InternalAggregationTestCase.DEFAULT_MAX_BUCKETS; import static org.hamcrest.Matchers.equalTo; public class DateHistogramAggregatorTests extends DateHistogramAggregatorTestCase { @@ -1450,6 +1463,267 @@ private void testSearchCase( } } + public void testMultiRangeTraversal() throws IOException { + Map dataset = new HashMap<>(); + dataset.put("2017-02-01T09:02:00.000Z", randomIntBetween(100, 2000)); + dataset.put("2017-02-01T09:59:59.999Z", randomIntBetween(100, 2000)); + dataset.put("2017-02-01T10:00:00.001Z", randomIntBetween(100, 2000)); + dataset.put("2017-02-01T13:06:00.000Z", randomIntBetween(100, 2000)); + dataset.put("2017-02-01T14:04:00.000Z", randomIntBetween(100, 2000)); + dataset.put("2017-02-01T14:05:00.000Z", randomIntBetween(100, 2000)); + dataset.put("2017-02-01T15:59:00.000Z", randomIntBetween(100, 2000)); + dataset.put("2017-02-01T16:06:00.000Z", randomIntBetween(100, 2000)); + dataset.put("2017-02-01T16:48:00.000Z", randomIntBetween(100, 2000)); + dataset.put("2017-02-01T16:59:00.000Z", randomIntBetween(100, 2000)); + + testFilterRewriteCase( + LongPoint.newRangeQuery(AGGREGABLE_DATE, asLong("2017-01-01T09:00:00.000Z"), asLong("2017-02-01T16:00:00.000Z")), + dataset, + aggregation -> aggregation.fixedInterval(new DateHistogramInterval("60m")).field(AGGREGABLE_DATE).minDocCount(1L), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(5, buckets.size()); + + Histogram.Bucket bucket = buckets.get(0); + assertEquals("2017-02-01T09:00:00.000Z", bucket.getKeyAsString()); + int expected = dataset.get("2017-02-01T09:02:00.000Z") + dataset.get("2017-02-01T09:59:59.999Z"); + assertEquals(expected, bucket.getDocCount()); + + bucket = buckets.get(1); + assertEquals("2017-02-01T10:00:00.000Z", bucket.getKeyAsString()); + expected = dataset.get("2017-02-01T10:00:00.001Z"); + assertEquals(expected, bucket.getDocCount()); + + bucket = buckets.get(2); + assertEquals("2017-02-01T13:00:00.000Z", bucket.getKeyAsString()); + expected = dataset.get("2017-02-01T13:06:00.000Z"); + assertEquals(expected, bucket.getDocCount()); + + bucket = buckets.get(3); + assertEquals("2017-02-01T14:00:00.000Z", bucket.getKeyAsString()); + expected = dataset.get("2017-02-01T14:04:00.000Z") + dataset.get("2017-02-01T14:05:00.000Z"); + assertEquals(expected, bucket.getDocCount()); + + bucket = buckets.get(4); + assertEquals("2017-02-01T15:00:00.000Z", bucket.getKeyAsString()); + expected = dataset.get("2017-02-01T15:59:00.000Z"); + assertEquals(expected, bucket.getDocCount()); + }, + false, + collectorCount -> assertEquals(0, (int) collectorCount), + true + ); + } + + public void testMultiRangeTraversalFixedData() throws IOException { + Map dataset = new HashMap<>(); + dataset.put("2017-02-01T09:02:00.000Z", 512); + dataset.put("2017-02-01T09:59:59.999Z", 256); + dataset.put("2017-02-01T10:00:00.001Z", 256); + dataset.put("2017-02-01T13:06:00.000Z", 512); + dataset.put("2017-02-01T14:04:00.000Z", 256); + dataset.put("2017-02-01T14:05:00.000Z", 256); + dataset.put("2017-02-01T15:59:00.000Z", 768); + + testFilterRewriteCase( + LongPoint.newRangeQuery(AGGREGABLE_DATE, asLong("2017-01-01T09:00:00.000Z"), asLong("2017-02-01T14:04:01.000Z")), + dataset, + aggregation -> aggregation.fixedInterval(new DateHistogramInterval("60m")).field(AGGREGABLE_DATE).minDocCount(1L), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(4, buckets.size()); + + Histogram.Bucket bucket = buckets.get(0); + assertEquals("2017-02-01T09:00:00.000Z", bucket.getKeyAsString()); + int expected = dataset.get("2017-02-01T09:02:00.000Z") + dataset.get("2017-02-01T09:59:59.999Z"); + assertEquals(expected, bucket.getDocCount()); + + bucket = buckets.get(1); + assertEquals("2017-02-01T10:00:00.000Z", bucket.getKeyAsString()); + expected = dataset.get("2017-02-01T10:00:00.001Z"); + assertEquals(expected, bucket.getDocCount()); + + bucket = buckets.get(2); + assertEquals("2017-02-01T13:00:00.000Z", bucket.getKeyAsString()); + expected = dataset.get("2017-02-01T13:06:00.000Z"); + assertEquals(expected, bucket.getDocCount()); + + bucket = buckets.get(3); + assertEquals("2017-02-01T14:00:00.000Z", bucket.getKeyAsString()); + expected = dataset.get("2017-02-01T14:04:00.000Z"); + assertEquals(expected, bucket.getDocCount()); + }, + false, + collectorCount -> assertEquals(0, (int) collectorCount), + false + ); + } + + public void testMultiRangeTraversalNotApplicable() throws IOException { + Map dataset = new HashMap<>(); + dataset.put("2017-02-01T09:02:00.000Z", randomIntBetween(100, 2000)); + dataset.put("2017-02-01T09:59:59.999Z", randomIntBetween(100, 2000)); + dataset.put("2017-02-01T10:00:00.001Z", randomIntBetween(100, 2000)); + dataset.put("2017-02-01T13:06:00.000Z", randomIntBetween(100, 2000)); + dataset.put("2017-02-01T14:04:00.000Z", randomIntBetween(100, 2000)); + dataset.put("2017-02-01T14:05:00.000Z", randomIntBetween(100, 2000)); + dataset.put("2017-02-01T15:59:00.000Z", randomIntBetween(100, 2000)); + dataset.put("2017-02-01T16:06:00.000Z", randomIntBetween(100, 2000)); + dataset.put("2017-02-01T16:48:00.000Z", randomIntBetween(100, 2000)); + dataset.put("2017-02-01T16:59:00.000Z", randomIntBetween(100, 2000)); + + testFilterRewriteCase( + new MatchAllDocsQuery(), + dataset, + aggregation -> aggregation.fixedInterval(new DateHistogramInterval("60m")).field(AGGREGABLE_DATE).minDocCount(1L), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(6, buckets.size()); + + Histogram.Bucket bucket = buckets.get(0); + assertEquals("2017-02-01T09:00:00.000Z", bucket.getKeyAsString()); + int expected = dataset.get("2017-02-01T09:02:00.000Z") + dataset.get("2017-02-01T09:59:59.999Z") + 4; + assertEquals(expected, bucket.getDocCount()); + + bucket = buckets.get(1); + assertEquals("2017-02-01T10:00:00.000Z", bucket.getKeyAsString()); + expected = dataset.get("2017-02-01T10:00:00.001Z"); + assertEquals(expected, bucket.getDocCount()); + + bucket = buckets.get(2); + assertEquals("2017-02-01T13:00:00.000Z", bucket.getKeyAsString()); + expected = dataset.get("2017-02-01T13:06:00.000Z"); + assertEquals(expected, bucket.getDocCount()); + + bucket = buckets.get(3); + assertEquals("2017-02-01T14:00:00.000Z", bucket.getKeyAsString()); + expected = dataset.get("2017-02-01T14:04:00.000Z") + dataset.get("2017-02-01T14:05:00.000Z"); + assertEquals(expected, bucket.getDocCount()); + + bucket = buckets.get(4); + assertEquals("2017-02-01T15:00:00.000Z", bucket.getKeyAsString()); + expected = dataset.get("2017-02-01T15:59:00.000Z"); + assertEquals(expected, bucket.getDocCount()); + + bucket = buckets.get(5); + assertEquals("2017-02-01T16:00:00.000Z", bucket.getKeyAsString()); + expected = dataset.get("2017-02-01T16:06:00.000Z") + dataset.get("2017-02-01T16:48:00.000Z") + dataset.get( + "2017-02-01T16:59:00.000Z" + ); + assertEquals(expected, bucket.getDocCount()); + }, + true, + collectCount -> assertTrue(collectCount > 0), + true + ); + } + + private void testFilterRewriteCase( + Query query, + Map dataset, + Consumer configure, + Consumer verify, + boolean useDocCountField, + Consumer verifyCollectCount, + boolean randomWrite + ) throws IOException { + DateFieldMapper.DateFieldType fieldType = aggregableDateFieldType(false, true); + + try (Directory directory = newDirectory()) { + if (randomWrite) { + try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) { + Document document = new Document(); + if (useDocCountField) { + // add the doc count field to the first document + document.add(new NumericDocValuesField(DocCountFieldMapper.NAME, 5)); + } + for (Map.Entry date : dataset.entrySet()) { + for (int i = 0; i < date.getValue(); i++) { + long instant = asLong(date.getKey(), fieldType); + document.add(new SortedNumericDocValuesField(AGGREGABLE_DATE, instant)); + document.add(new LongPoint(AGGREGABLE_DATE, instant)); + indexWriter.addDocument(document); + document.clear(); + } + } + } + } else { + // use default codec so max points in leaf is fixed to 512, to cover the node level visit and compare logic + try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig().setCodec(TestUtil.getDefaultCodec()))) { + List documents = new ArrayList<>(); + for (Map.Entry date : dataset.entrySet()) { + for (int i = 0; i < date.getValue(); i++) { + Document document = new Document(); + if (useDocCountField) { + // add the doc count field once + document.add(new NumericDocValuesField(DocCountFieldMapper.NAME, 5)); + useDocCountField = false; + } + long instant = asLong(date.getKey(), fieldType); + document.add(new SortedNumericDocValuesField(AGGREGABLE_DATE, instant)); + document.add(new LongPoint(AGGREGABLE_DATE, instant)); + documents.add(document); + } + } + indexWriter.addDocuments(documents); + } + } + + try (IndexReader indexReader = DirectoryReader.open(directory)) { + IndexSearcher indexSearcher = newSearcher(indexReader, true, true); + + DateHistogramAggregationBuilder aggregationBuilder = new DateHistogramAggregationBuilder("_name"); + if (configure != null) { + configure.accept(aggregationBuilder); + } + + CountingAggregator aggregator = createCountingAggregator(query, aggregationBuilder, indexSearcher, fieldType); + aggregator.preCollection(); + indexSearcher.search(query, aggregator); + aggregator.postCollection(); + + MultiBucketConsumerService.MultiBucketConsumer reduceBucketConsumer = new MultiBucketConsumerService.MultiBucketConsumer( + Integer.MAX_VALUE, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ); + InternalAggregation.ReduceContext context = InternalAggregation.ReduceContext.forFinalReduction( + aggregator.context().bigArrays(), + getMockScriptService(), + reduceBucketConsumer, + PipelineAggregator.PipelineTree.EMPTY + ); + InternalDateHistogram topLevel = (InternalDateHistogram) aggregator.buildTopLevel(); + InternalDateHistogram histogram = (InternalDateHistogram) topLevel.reduce(Collections.singletonList(topLevel), context); + doAssertReducedMultiBucketConsumer(histogram, reduceBucketConsumer); + + verify.accept(histogram); + + verifyCollectCount.accept(aggregator.getCollectCount().get()); + } + } + } + + protected CountingAggregator createCountingAggregator( + Query query, + AggregationBuilder builder, + IndexSearcher searcher, + MappedFieldType... fieldTypes + ) throws IOException { + return new CountingAggregator( + new AtomicInteger(), + createAggregator( + query, + builder, + searcher, + new MultiBucketConsumerService.MultiBucketConsumer( + DEFAULT_MAX_BUCKETS, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ), + fieldTypes + ) + ); + } + private static long asLong(String dateTime) { return DateFormatters.from(DateFieldMapper.getDefaultDateTimeFormatter().parse(dateTime)).toInstant().toEpochMilli(); } diff --git a/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java index e538dede07fc8..02e5d22e147d5 100644 --- a/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java @@ -126,7 +126,6 @@ import org.opensearch.search.aggregations.AggregatorFactories.Builder; import org.opensearch.search.aggregations.MultiBucketConsumerService.MultiBucketConsumer; import org.opensearch.search.aggregations.bucket.nested.NestedAggregationBuilder; -import org.opensearch.search.aggregations.bucket.terms.TermsAggregator; import org.opensearch.search.aggregations.metrics.MetricsAggregator; import org.opensearch.search.aggregations.pipeline.PipelineAggregator; import org.opensearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree; @@ -410,6 +409,7 @@ public boolean shouldCache(Query query) { ); fieldNameToType.putAll(getFieldAliases(fieldTypes)); + when(searchContext.maxAggRewriteFilters()).thenReturn(10_000); registerFieldTypes(searchContext, mapperService, fieldNameToType); doAnswer(invocation -> { /* Store the release-ables so we can release them at the end of the test case. This is important because aggregations don't @@ -1123,7 +1123,7 @@ protected static class CountingAggregator extends Aggregator { private final AtomicInteger collectCounter; public final Aggregator delegate; - public CountingAggregator(AtomicInteger collectCounter, TermsAggregator delegate) { + public CountingAggregator(AtomicInteger collectCounter, Aggregator delegate) { this.collectCounter = collectCounter; this.delegate = delegate; }