From c4d9055f16dc68bd0000ccd1004794838c77b9fc Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Fri, 12 Apr 2024 10:15:59 -0700 Subject: [PATCH 01/13] multi range traversal in BKD Signed-off-by: bowenlan-amzn --- .../bucket/FastFilterRewriteHelper.java | 437 ++++++++++++++++-- .../bucket/composite/CompositeAggregator.java | 2 +- .../AutoDateHistogramAggregator.java | 2 +- .../histogram/DateHistogramAggregator.java | 11 +- .../DateHistogramAggregatorTests.java | 49 ++ 5 files changed, 463 insertions(+), 38 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java index dde748bf0dc07..b178d7ede9aa3 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java @@ -15,7 +15,9 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.PointValues; +import org.apache.lucene.search.CollectionTerminatedException; import org.apache.lucene.search.ConstantScoreQuery; +import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.FieldExistsQuery; import org.apache.lucene.search.IndexOrDocValuesQuery; import org.apache.lucene.search.MatchAllDocsQuery; @@ -36,7 +38,10 @@ import org.opensearch.search.internal.SearchContext; import java.io.IOException; +import java.time.Instant; +import java.util.Arrays; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.OptionalLong; @@ -241,6 +246,19 @@ public static class FastFilterContext { private AggregationType aggregationType; private final SearchContext context; + private long[][] ranges; + + public int leaf; + public int inner; + public int segments; + public int optimizedSegments; + + private String fieldName; + + public void setFieldName(String fieldName) { + this.fieldName = fieldName; + } + public FastFilterContext(SearchContext context) { this.context = context; } @@ -281,6 +299,28 @@ public Weight[] buildFastFilter(LeafReaderContext leaf) throws IOException { } return filters; } + + public void buildRanges() throws IOException { + assert ranges == null : "Ranges should only be built once at shard level, but they are already built"; + this.ranges = this.aggregationType.buildRanges(context); + if (ranges != null) { + logger.debug("Ranges built for shard {}", context.indexShard().shardId()); + filtersBuiltAtShardLevel = true; + } + } + + public long[][] buildRanges(LeafReaderContext leaf) throws IOException { + long[][] ranges = this.aggregationType.buildRanges(leaf, context); + if (ranges != null) { + logger.debug("Ranges built for shard {} segment {}", context.indexShard().shardId(), leaf.ord); + } + return ranges; + } + + private void consumeDebugInfo(DebugInfoCollector debug) { + leaf += debug.leaf; + inner += debug.inner; + } } /** @@ -291,9 +331,11 @@ interface AggregationType { boolean isRewriteable(Object parent, int subAggLength); Weight[] buildFastFilter(SearchContext ctx) throws IOException; - Weight[] buildFastFilter(LeafReaderContext leaf, SearchContext ctx) throws IOException; + long[][] buildRanges(SearchContext ctx) throws IOException; + long[][] buildRanges(LeafReaderContext leaf, SearchContext ctx) throws IOException; + default int getSize() { return Integer.MAX_VALUE; } @@ -370,6 +412,47 @@ private Weight[] buildFastFilter(SearchContext context, long[] bounds) throws IO ); } + @Override + public long[][] buildRanges(SearchContext context) throws IOException { + long[] bounds = getDateHistoAggBounds(context, fieldType.name()); + logger.debug("Bounds are {} for shard {}", bounds, context.indexShard().shardId()); + return buildRanges(context, bounds); + } + + private long[][] buildRanges(SearchContext context, long[] bounds) throws IOException { + bounds = processHardBounds(bounds); + if (bounds == null) { + return null; + } + assert bounds[0] <= bounds[1] : "Low bound should be less than high bound"; + + final Rounding rounding = getRounding(bounds[0], bounds[1]); + final OptionalLong intervalOpt = Rounding.getInterval(rounding); + if (intervalOpt.isEmpty()) { + return null; + } + final long interval = intervalOpt.getAsLong(); + + // process the after key of composite agg + processAfterKey(bounds, interval); + + return FastFilterRewriteHelper.createRangesFromAgg( + context, + (DateFieldMapper.DateFieldType) fieldType, + interval, + getRoundingPrepared(), + bounds[0], + bounds[1] + ); + } + + @Override + public long[][] buildRanges(LeafReaderContext leaf, SearchContext context) throws IOException { + long[] bounds = getSegmentBounds(leaf, fieldType.name()); + logger.debug("Bounds are {} for shard {} segment {}", bounds, context.indexShard().shardId(), leaf.ord); + return buildRanges(context, bounds); + } + protected abstract Rounding getRounding(final long low, final long high); protected abstract Rounding.Prepared getRoundingPrepared(); @@ -413,7 +496,7 @@ public static long getBucketOrd(long bucketOrd) { } /** - * Try to get the bucket doc counts from the fast filters for the aggregation + * Try to get the bucket doc counts for the date histogram aggregation *

* Usage: invoked at segment level — in getLeafCollector of aggregator * @@ -424,11 +507,18 @@ public static boolean tryFastFilterAggregation( FastFilterContext fastFilterContext, final BiConsumer incrementDocCount ) throws IOException { - if (fastFilterContext == null) return false; + fastFilterContext.segments++; if (!fastFilterContext.rewriteable) { return false; } + if (ctx.reader().hasDeletions()) return false; + + PointValues values = ctx.reader().getPointValues(fastFilterContext.fieldName); + // date field is 1 dimensional + // only proceed if every document has exactly one point for this field + if (values.getDocCount() != values.size()) return false; + NumericDocValues docCountValues = DocValues.getNumeric(ctx.reader(), DocCountFieldMapper.NAME); if (docCountValues.nextDoc() != NO_MORE_DOCS) { logger.debug( @@ -444,56 +534,333 @@ public static boolean tryFastFilterAggregation( if (!fastFilterContext.filtersBuiltAtShardLevel && !segmentMatchAll(fastFilterContext.context, ctx)) { return false; } - Weight[] filters = fastFilterContext.filters; - if (filters == null) { + + long[][] ranges = fastFilterContext.ranges; + if (ranges == null) { logger.debug( "Shard {} segment {} functionally match all documents. Build the fast filter", fastFilterContext.context.indexShard().shardId(), ctx.ord ); - filters = fastFilterContext.buildFastFilter(ctx); - if (filters == null) { + ranges = fastFilterContext.buildRanges(ctx); + if (ranges == null) { return false; } } - final int[] counts = new int[filters.length]; - int i; - for (i = 0; i < filters.length; i++) { - counts[i] = filters[i].count(ctx); - if (counts[i] == -1) { - // Cannot use the optimization if any of the counts - // is -1 indicating the segment might have deleted documents - return false; + final DateFieldMapper.DateFieldType fieldType = ((AbstractDateHistogramAggregationType) fastFilterContext.aggregationType) + .getFieldType(); + int size = fastFilterContext.aggregationType.getSize(); + DebugInfoCollector debugInfo = multiRangesTraverse( + values.getPointTree(), + ranges, + incrementDocCount, + fieldType, + size + ); + fastFilterContext.consumeDebugInfo(debugInfo); + + logger.debug("Fast filter optimization applied to shard {} segment {}", fastFilterContext.context.indexShard().shardId(), ctx.ord); + fastFilterContext.optimizedSegments++; + return true; + } + + private static boolean segmentMatchAll(SearchContext ctx, LeafReaderContext leafCtx) throws IOException { + Weight weight = ctx.searcher().createWeight(ctx.query(), ScoreMode.COMPLETE_NO_SCORES, 1f); + return weight != null && weight.count(leafCtx) == leafCtx.reader().numDocs(); + } + + private static long[][] createRangesFromAgg( + final SearchContext context, + final DateFieldMapper.DateFieldType fieldType, + final long interval, + final Rounding.Prepared preparedRounding, + long low, + final long high + ) { + // Calculate the number of buckets using range and interval + long roundedLow = preparedRounding.round(fieldType.convertNanosToMillis(low)); + long prevRounded = roundedLow; + int bucketCount = 0; + while (roundedLow <= fieldType.convertNanosToMillis(high)) { + bucketCount++; + int maxNumFilterBuckets = context.maxAggRewriteFilters(); + if (bucketCount > maxNumFilterBuckets) { + logger.debug("Max number of filters reached [{}], skip the fast filter optimization", maxNumFilterBuckets); + return null; } + // Below rounding is needed as the interval could return in + // non-rounded values for something like calendar month + roundedLow = preparedRounding.round(roundedLow + interval); + if (prevRounded == roundedLow) break; // prevents getting into an infinite loop + prevRounded = roundedLow; } - int s = 0; - int size = fastFilterContext.aggregationType.getSize(); - for (i = 0; i < filters.length; i++) { - if (counts[i] > 0) { - long bucketKey = i; // the index of filters is the key for filters aggregation - if (fastFilterContext.aggregationType instanceof AbstractDateHistogramAggregationType) { - final DateFieldMapper.DateFieldType fieldType = - ((AbstractDateHistogramAggregationType) fastFilterContext.aggregationType).getFieldType(); - bucketKey = fieldType.convertNanosToMillis( - NumericUtils.sortableBytesToLong(((PointRangeQuery) filters[i].getQuery()).getLowerPoint(), 0) - ); - } - incrementDocCount.accept(bucketKey, counts[i]); - s++; - if (s > size) { + long[][] ranges = new long[bucketCount][2]; + if (bucketCount > 0) { + roundedLow = preparedRounding.round(fieldType.convertNanosToMillis(low)); + + int i = 0; + while (i < bucketCount) { + // Calculate the lower bucket bound + // final byte[] lower = new byte[8]; + // NumericUtils.longToSortableBytes(i == 0 ? low : fieldType.convertRoundedMillisToNanos(roundedLow), lower, 0); + long lower = i == 0 ? low : fieldType.convertRoundedMillisToNanos(roundedLow); + roundedLow = preparedRounding.round(roundedLow + interval); + // final byte[] upper = new byte[8]; + // NumericUtils.longToSortableBytes(i + 1 == bucketCount ? high : + // // Subtract -1 if the minimum is roundedLow as roundedLow itself + // // is included in the next bucket + // fieldType.convertRoundedMillisToNanos(roundedLow) - 1, upper, 0); + + long upper = i + 1 == bucketCount ? high : fieldType.convertRoundedMillisToNanos(roundedLow) - 1; + + ranges[i][0] = lower; + ranges[i][1] = upper; + i++; + } + } + + return ranges; + } + + private static DebugInfoCollector multiRangesTraverse( + final PointValues.PointTree tree, + final long[][] ranges, + final BiConsumer incrementDocCount, + final DateFieldMapper.DateFieldType fieldType, + final int size + ) throws IOException { + Iterator rangeIter = Arrays.stream(ranges).iterator(); + long[] activeRange = rangeIter.next(); + + // The ranges are connected and in ascending order + // make sure the first range to collect is at least cross the min value of the tree + boolean noRangeMatches = false; + while (activeRange[1] < NumericUtils.sortableBytesToLong(tree.getMinPackedValue(), 0)) { + if (rangeIter.hasNext()) { + activeRange = rangeIter.next(); + } else { + noRangeMatches = true; + break; + } + } + DebugInfoCollector debugInfo = new DebugInfoCollector(); + if (noRangeMatches) { + return debugInfo; + } + + RangeCollectorForPointTree collector = new RangeCollectorForPointTree(incrementDocCount, fieldType, rangeIter, size); + collector.setActiveRange(activeRange); + + PointValues.IntersectVisitor visitor = getIntersectVisitor(collector); + intersectWithRanges(visitor, tree, debugInfo); + collector.finalizePreviousRange(); + + return debugInfo; + } + + private static void intersectWithRanges(PointValues.IntersectVisitor visitor, PointValues.PointTree pointTree, DebugInfoCollector debug) + throws IOException { + // long min = NumericUtils.sortableBytesToLong(pointTree.getMinPackedValue(), 0); + // long max = NumericUtils.sortableBytesToLong(pointTree.getMaxPackedValue(), 0); + // maxPackedValue seems to be the max value + 1 + // System.out.println("==============="); + // System.out.println("current node range min=" + min + " max=" + max); + + PointValues.Relation r = visitor.compare(pointTree.getMinPackedValue(), pointTree.getMaxPackedValue()); + // System.out.println("relation=" + r); + + try { + switch (r) { + case CELL_INSIDE_QUERY: + pointTree.visitDocIDs(visitor); + debug.visitInner(); break; + case CELL_CROSSES_QUERY: + if (pointTree.moveToChild()) { + intersectWithRanges(visitor, pointTree, debug); + pointTree.moveToSibling(); + intersectWithRanges(visitor, pointTree, debug); + pointTree.moveToParent(); + } else { + pointTree.visitDocValues(visitor); + debug.visitLeaf(); + } + break; + case CELL_OUTSIDE_QUERY: + } + } catch (CollectionTerminatedException e) { + // ignore + logger.debug("Early terminate since no more range to collect"); + } + } + + /** + * + */ + private static PointValues.IntersectVisitor getIntersectVisitor(RangeCollectorForPointTree collector) { + return new PointValues.IntersectVisitor() { + + /** + * The doc visited is ever-increasing in terms of its value + * The node range visited is every-increasing at any level + * possible next node is sibling or parent sibling, this is the proof of ever-increasing + *

+ * the first range is either inside inner or leaf node, or cross leaf + * inside node won't change activeRange + * cross leaf will + * after the first node, the next node could change activeRange + * Compare min max of next node with next range, when its outside activeRange + * ranges are always connected, but the values may not, so should iterate ranges until range[0] >= min + *

+ * if node cross activeRange, we need to visit children recursively, we will always be able to stop at leaf or when found the inner + *

+ */ + + @Override + public void visit(int docID) throws IOException { + // System.out.println("visit docID=" + docID); + collector.count(); + } + + @Override + public void visit(int docID, byte[] packedValue) throws IOException { + long value = NumericUtils.sortableBytesToLong(packedValue, 0); + // System.out.println("value" + value + " count=" + 1); + if (value > collector.activeRange[1]) { + // need to move to next range + collector.finalizePreviousRange(); + + if (collector.iterateRangeEnd(value)) { + throw new CollectionTerminatedException(); + // return; + } + } + if (collector.activeRange[0] <= value && value <= collector.activeRange[1]) { + collector.count(); } } + + @Override + public void visit(DocIdSetIterator iterator, byte[] packedValue) throws IOException { + logger.debug("visit iterator with packedValue"); + long value = NumericUtils.sortableBytesToLong(packedValue, 0); + // System.out.println("value" + value + " count=" + count); + if (value > collector.activeRange[1]) { + collector.finalizePreviousRange(); + + if (collector.iterateRangeEnd(value)) { + throw new CollectionTerminatedException(); + // return; + } + } + if (collector.activeRange[0] <= value && value <= collector.activeRange[1]) { + for (int doc = iterator.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = iterator.nextDoc()) { + collector.count(); + } + } + } + + @Override + public PointValues.Relation compare(byte[] minPackedValue, byte[] maxPackedValue) { + long min = NumericUtils.sortableBytesToLong(minPackedValue, 0); + long max = NumericUtils.sortableBytesToLong(maxPackedValue, 0); + long queryMin = collector.activeRange[0]; + long queryMax = collector.activeRange[1]; + + boolean crosses = false; + if (collector.activeRange[1] < min) { + // need to move to next range + // finalize the results for the previous range + collector.finalizePreviousRange(); + // go to next range + if (collector.iterateRangeEnd(min)) { + throw new CollectionTerminatedException(); + // return PointValues.Relation.CELL_OUTSIDE_QUERY; + } + + // compare the next range with this node's min max again + // it cannot be outside again, can only be cross or inside + if (collector.activeRange[1] < max) { + crosses = true; + } + } else if (queryMin > min || queryMax < max) { + crosses = true; + } + + if (crosses) { + return PointValues.Relation.CELL_CROSSES_QUERY; + } else { + return PointValues.Relation.CELL_INSIDE_QUERY; + } + } + }; + } + + private static class RangeCollectorForPointTree { + private final BiConsumer incrementDocCount; + private final DateFieldMapper.DateFieldType fieldType; + private int counter = 0; + private long[] activeRange; + private final Iterator rangeIter; + private int visitedRange = 0; + private final int size; // the given size of non-zero buckets used in composite agg + + public RangeCollectorForPointTree( + BiConsumer incrementDocCount, + DateFieldMapper.DateFieldType fieldType, + Iterator rangeIter, + int size + ) { + this.incrementDocCount = incrementDocCount; + this.fieldType = fieldType; + this.rangeIter = rangeIter; + this.size = size; } - logger.debug("Fast filter optimization applied to shard {} segment {}", fastFilterContext.context.indexShard().shardId(), ctx.ord); - return true; + private void count() { + counter++; + } + + private void finalizePreviousRange() { + if (counter > 0) { + incrementDocCount.accept(fieldType.convertNanosToMillis(activeRange[0]), counter); + counter = 0; + } + } + + private void setActiveRange(long[] activeRange) { + this.activeRange = activeRange; + } + + /** + * @return true when iterator exhausted or collect enough non-zero ranges + */ + private boolean iterateRangeEnd(long value) { + // the new value may not be contiguous to the previous one + // so try to find the first next range that cross the new value + while (activeRange[1] < value) { + if (!rangeIter.hasNext()) { + return true; + } + activeRange = rangeIter.next(); + } + visitedRange++; + return visitedRange > size; + } } - private static boolean segmentMatchAll(SearchContext ctx, LeafReaderContext leafCtx) throws IOException { - Weight weight = ctx.searcher().createWeight(ctx.query(), ScoreMode.COMPLETE_NO_SCORES, 1f); - return weight != null && weight.count(leafCtx) == leafCtx.reader().numDocs(); + private static class DebugInfoCollector { + private int leaf = 0; // leaf node visited + private int inner = 0; // inner node visited + + private void visitLeaf() { + leaf++; + } + + private void visitInner() { + inner++; + } } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java index b97c814cdf645..a8c6ae6ec0c39 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -171,7 +171,7 @@ final class CompositeAggregator extends BucketsAggregator { // bucketOrds is used for saving date histogram results bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), CardinalityUpperBound.ONE); preparedRounding = ((CompositeAggregationType) fastFilterContext.getAggregationType()).getRoundingPrepared(); - fastFilterContext.buildFastFilter(); + fastFilterContext.buildRanges(); } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java index 12aefc540e75c..75337a1d11ecf 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java @@ -166,7 +166,7 @@ private AutoDateHistogramAggregator( ) ); if (fastFilterContext.isRewriteable(parent, subAggregators.length)) { - fastFilterContext.buildFastFilter(); + fastFilterContext.buildRanges(); } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java index 0e830106c8284..148403728a912 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java @@ -126,7 +126,8 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg ) ); if (fastFilterContext.isRewriteable(parent, subAggregators.length)) { - fastFilterContext.buildFastFilter(); + fastFilterContext.setFieldName(valuesSourceConfig.fieldType().name()); + fastFilterContext.buildRanges(); } } @@ -170,6 +171,8 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol ) ); if (optimized) throw new CollectionTerminatedException(); + // we will return the debug info for each segment + // or we should just cache it in the fast filter context SortedNumericDocValues values = valuesSource.longValues(ctx); return new LeafBucketCollectorBase(sub, values) { @@ -255,6 +258,12 @@ public void doClose() { @Override public void collectDebugInfo(BiConsumer add) { add.accept("total_buckets", bucketOrds.size()); + if (fastFilterContext.optimizedSegments > 0) { + add.accept("optimized_segments", fastFilterContext.optimizedSegments); + add.accept("unoptimized_segments", fastFilterContext.segments - fastFilterContext.optimizedSegments); + add.accept("leaf_visited", fastFilterContext.leaf); + add.accept("inner_visited", fastFilterContext.inner); + } } /** diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java index 2a4fbca7a8541..50ea783fe204f 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java @@ -822,6 +822,7 @@ public void testNanosIntervalSecond() throws IOException { aggregation -> aggregation.calendarInterval(DateHistogramInterval.SECOND).field(AGGREGABLE_DATE).minDocCount(1L), histogram -> { List buckets = histogram.getBuckets(); + System.out.println(buckets); assertEquals(3, buckets.size()); Histogram.Bucket bucket = buckets.get(0); @@ -1332,6 +1333,54 @@ public void testFilterRewriteOptimizationWithRangeQuery() throws IOException { ); } + public void testMultiRangeDebug() throws IOException { + testSearchCase( + new MatchAllDocsQuery(), + Arrays.asList( + "2017-02-01T09:02:00.000Z", + "2017-02-01T09:35:00.000Z", + "2017-02-01T10:15:00.000Z", + "2017-02-01T13:06:00.000Z", + "2017-02-01T14:04:00.000Z", + "2017-02-01T14:05:00.000Z", + "2017-02-01T15:59:00.000Z", + "2017-02-01T16:06:00.000Z", + "2017-02-01T16:48:00.000Z", + "2017-02-01T16:59:00.000Z" + ), + aggregation -> aggregation.fixedInterval(new DateHistogramInterval("60m")).field(AGGREGABLE_DATE).minDocCount(1L), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(6, buckets.size()); + + Histogram.Bucket bucket = buckets.get(0); + assertEquals("2017-02-01T09:00:00.000Z", bucket.getKeyAsString()); + assertEquals(2, bucket.getDocCount()); + + bucket = buckets.get(1); + assertEquals("2017-02-01T10:00:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(2); + assertEquals("2017-02-01T13:00:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(3); + assertEquals("2017-02-01T14:00:00.000Z", bucket.getKeyAsString()); + assertEquals(2, bucket.getDocCount()); + + bucket = buckets.get(4); + assertEquals("2017-02-01T15:00:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(5); + assertEquals("2017-02-01T16:00:00.000Z", bucket.getKeyAsString()); + assertEquals(3, bucket.getDocCount()); + }, + false + ); + } + public void testDocCountField() throws IOException { testSearchCase( new MatchAllDocsQuery(), From 02f5c14b8b4f056c1ded235ead55f5b0ebf43447 Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Sun, 21 Apr 2024 10:53:59 -0700 Subject: [PATCH 02/13] Add changelog Signed-off-by: bowenlan-amzn --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index dadfcbfbd8b05..2048e8e379418 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add an individual setting of rate limiter for segment replication ([#12959](https://github.com/opensearch-project/OpenSearch/pull/12959)) - [Streaming Indexing] Ensure support of the new transport by security plugin ([#13174](https://github.com/opensearch-project/OpenSearch/pull/13174)) - Add cluster setting to dynamically configure the buckets for filter rewrite optimization. ([#13179](https://github.com/opensearch-project/OpenSearch/pull/13179)) +- Support multi ranges traversal when doing date histogram rewrite optimization. ([#13317](https://github.com/opensearch-project/OpenSearch/pull/13317)) ### Dependencies - Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896)) From 1ee1070ee80d13d944a2ce35a0218c49b4b3b87b Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Sun, 21 Apr 2024 10:55:37 -0700 Subject: [PATCH 03/13] clean up Signed-off-by: bowenlan-amzn --- .../bucket/FastFilterRewriteHelper.java | 14 +++++--------- .../bucket/composite/CompositeAggregator.java | 1 + .../histogram/AutoDateHistogramAggregator.java | 1 + .../histogram/DateHistogramAggregatorTests.java | 1 - 4 files changed, 7 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java index b178d7ede9aa3..22f0606276161 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java @@ -38,7 +38,6 @@ import org.opensearch.search.internal.SearchContext; import java.io.IOException; -import java.time.Instant; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; @@ -272,7 +271,7 @@ public void setAggregationType(AggregationType aggregationType) { } public boolean isRewriteable(final Object parent, final int subAggLength) { - if (context.maxAggRewriteFilters() == 0) return false; + // if (context.maxAggRewriteFilters() == 0) return false; boolean rewriteable = aggregationType.isRewriteable(parent, subAggLength); logger.debug("Fast filter rewriteable: {} for shard {}", rewriteable, context.indexShard().shardId()); @@ -331,9 +330,11 @@ interface AggregationType { boolean isRewriteable(Object parent, int subAggLength); Weight[] buildFastFilter(SearchContext ctx) throws IOException; + Weight[] buildFastFilter(LeafReaderContext leaf, SearchContext ctx) throws IOException; long[][] buildRanges(SearchContext ctx) throws IOException; + long[][] buildRanges(LeafReaderContext leaf, SearchContext ctx) throws IOException; default int getSize() { @@ -515,6 +516,7 @@ public static boolean tryFastFilterAggregation( if (ctx.reader().hasDeletions()) return false; PointValues values = ctx.reader().getPointValues(fastFilterContext.fieldName); + if (values == null) return false; // date field is 1 dimensional // only proceed if every document has exactly one point for this field if (values.getDocCount() != values.size()) return false; @@ -551,13 +553,7 @@ public static boolean tryFastFilterAggregation( final DateFieldMapper.DateFieldType fieldType = ((AbstractDateHistogramAggregationType) fastFilterContext.aggregationType) .getFieldType(); int size = fastFilterContext.aggregationType.getSize(); - DebugInfoCollector debugInfo = multiRangesTraverse( - values.getPointTree(), - ranges, - incrementDocCount, - fieldType, - size - ); + DebugInfoCollector debugInfo = multiRangesTraverse(values.getPointTree(), ranges, incrementDocCount, fieldType, size); fastFilterContext.consumeDebugInfo(debugInfo); logger.debug("Fast filter optimization applied to shard {} segment {}", fastFilterContext.context.indexShard().shardId(), ctx.ord); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java index a8c6ae6ec0c39..786e7e47537b9 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -171,6 +171,7 @@ final class CompositeAggregator extends BucketsAggregator { // bucketOrds is used for saving date histogram results bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), CardinalityUpperBound.ONE); preparedRounding = ((CompositeAggregationType) fastFilterContext.getAggregationType()).getRoundingPrepared(); + fastFilterContext.setFieldName(sourceConfigs[0].name()); fastFilterContext.buildRanges(); } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java index 75337a1d11ecf..9e59f33fef104 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java @@ -166,6 +166,7 @@ private AutoDateHistogramAggregator( ) ); if (fastFilterContext.isRewriteable(parent, subAggregators.length)) { + fastFilterContext.setFieldName(valuesSourceConfig.fieldType().name()); fastFilterContext.buildRanges(); } } diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java index 50ea783fe204f..3858bae629ad3 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java @@ -822,7 +822,6 @@ public void testNanosIntervalSecond() throws IOException { aggregation -> aggregation.calendarInterval(DateHistogramInterval.SECOND).field(AGGREGABLE_DATE).minDocCount(1L), histogram -> { List buckets = histogram.getBuckets(); - System.out.println(buckets); assertEquals(3, buckets.size()); Histogram.Bucket bucket = buckets.get(0); From 22163c77f2f494d548cfd2e0254322453f8611a5 Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Mon, 22 Apr 2024 17:36:12 -0700 Subject: [PATCH 04/13] setup assert optimization kicks in unit test Signed-off-by: bowenlan-amzn --- .../test/search.aggregation/10_histogram.yml | 4 + .../bucket/FastFilterRewriteHelper.java | 2 +- .../DateHistogramAggregatorTests.java | 243 ++++++++++++++---- .../aggregations/AggregatorTestCase.java | 3 +- 4 files changed, 202 insertions(+), 50 deletions(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/10_histogram.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/10_histogram.yml index e7da9a0bc454c..0ab36ff470b67 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/10_histogram.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/10_histogram.yml @@ -599,6 +599,10 @@ setup: - match: { profile.shards.0.aggregations.0.type: DateHistogramAggregator } - match: { profile.shards.0.aggregations.0.description: histo } - match: { profile.shards.0.aggregations.0.debug.total_buckets: 3 } + - match: { profile.shards.0.aggregations.0.debug.optimized_segments: 1 } + - match: { profile.shards.0.aggregations.0.debug.unoptimized_segments: 0 } + - match: { profile.shards.0.aggregations.0.debug.leaf_visited: 1 } + - match: { profile.shards.0.aggregations.0.debug.inner_visited: 0 } --- "histogram with hard bounds": diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java index 22f0606276161..7b9fa4488a0a6 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java @@ -271,7 +271,7 @@ public void setAggregationType(AggregationType aggregationType) { } public boolean isRewriteable(final Object parent, final int subAggLength) { - // if (context.maxAggRewriteFilters() == 0) return false; + if (context.maxAggRewriteFilters() == 0) return false; boolean rewriteable = aggregationType.isRewriteable(parent, subAggLength); logger.debug("Fast filter rewriteable: {} for shard {}", rewriteable, context.indexShard().shardId()); diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java index 3858bae629ad3..4510dab98299a 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java @@ -38,6 +38,7 @@ import org.apache.lucene.document.SortedNumericDocValuesField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.MatchNoDocsQuery; @@ -45,12 +46,18 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.tests.index.RandomIndexWriter; import org.opensearch.common.time.DateFormatters; +import org.opensearch.core.common.breaker.CircuitBreaker; +import org.opensearch.core.indices.breaker.NoneCircuitBreakerService; import org.opensearch.index.mapper.DateFieldMapper; import org.opensearch.index.mapper.DocCountFieldMapper; +import org.opensearch.index.mapper.MappedFieldType; import org.opensearch.search.aggregations.AggregationBuilder; import org.opensearch.search.aggregations.BucketOrder; +import org.opensearch.search.aggregations.InternalAggregation; +import org.opensearch.search.aggregations.MultiBucketConsumerService; import org.opensearch.search.aggregations.bucket.terms.StringTerms; import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.opensearch.search.aggregations.pipeline.PipelineAggregator; import org.opensearch.search.aggregations.support.AggregationInspectionHelper; import java.io.IOException; @@ -58,9 +65,11 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import static java.util.stream.Collectors.toList; +import static org.opensearch.test.InternalAggregationTestCase.DEFAULT_MAX_BUCKETS; import static org.hamcrest.Matchers.equalTo; public class DateHistogramAggregatorTests extends DateHistogramAggregatorTestCase { @@ -1332,54 +1341,6 @@ public void testFilterRewriteOptimizationWithRangeQuery() throws IOException { ); } - public void testMultiRangeDebug() throws IOException { - testSearchCase( - new MatchAllDocsQuery(), - Arrays.asList( - "2017-02-01T09:02:00.000Z", - "2017-02-01T09:35:00.000Z", - "2017-02-01T10:15:00.000Z", - "2017-02-01T13:06:00.000Z", - "2017-02-01T14:04:00.000Z", - "2017-02-01T14:05:00.000Z", - "2017-02-01T15:59:00.000Z", - "2017-02-01T16:06:00.000Z", - "2017-02-01T16:48:00.000Z", - "2017-02-01T16:59:00.000Z" - ), - aggregation -> aggregation.fixedInterval(new DateHistogramInterval("60m")).field(AGGREGABLE_DATE).minDocCount(1L), - histogram -> { - List buckets = histogram.getBuckets(); - assertEquals(6, buckets.size()); - - Histogram.Bucket bucket = buckets.get(0); - assertEquals("2017-02-01T09:00:00.000Z", bucket.getKeyAsString()); - assertEquals(2, bucket.getDocCount()); - - bucket = buckets.get(1); - assertEquals("2017-02-01T10:00:00.000Z", bucket.getKeyAsString()); - assertEquals(1, bucket.getDocCount()); - - bucket = buckets.get(2); - assertEquals("2017-02-01T13:00:00.000Z", bucket.getKeyAsString()); - assertEquals(1, bucket.getDocCount()); - - bucket = buckets.get(3); - assertEquals("2017-02-01T14:00:00.000Z", bucket.getKeyAsString()); - assertEquals(2, bucket.getDocCount()); - - bucket = buckets.get(4); - assertEquals("2017-02-01T15:00:00.000Z", bucket.getKeyAsString()); - assertEquals(1, bucket.getDocCount()); - - bucket = buckets.get(5); - assertEquals("2017-02-01T16:00:00.000Z", bucket.getKeyAsString()); - assertEquals(3, bucket.getDocCount()); - }, - false - ); - } - public void testDocCountField() throws IOException { testSearchCase( new MatchAllDocsQuery(), @@ -1498,6 +1459,192 @@ private void testSearchCase( } } + public void testMultiRangeDebug() throws IOException { + testFilterRewrite( + new MatchAllDocsQuery(), + Arrays.asList( + "2017-02-01T09:02:00.000Z", + "2017-02-01T09:35:00.000Z", + "2017-02-01T10:15:00.000Z", + "2017-02-01T13:06:00.000Z", + "2017-02-01T14:04:00.000Z", + "2017-02-01T14:05:00.000Z", + "2017-02-01T15:59:00.000Z", + "2017-02-01T16:06:00.000Z", + "2017-02-01T16:48:00.000Z", + "2017-02-01T16:59:00.000Z" + ), + aggregation -> aggregation.fixedInterval(new DateHistogramInterval("60m")).field(AGGREGABLE_DATE).minDocCount(1L), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(6, buckets.size()); + + Histogram.Bucket bucket = buckets.get(0); + assertEquals("2017-02-01T09:00:00.000Z", bucket.getKeyAsString()); + assertEquals(2, bucket.getDocCount()); + + bucket = buckets.get(1); + assertEquals("2017-02-01T10:00:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(2); + assertEquals("2017-02-01T13:00:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(3); + assertEquals("2017-02-01T14:00:00.000Z", bucket.getKeyAsString()); + assertEquals(2, bucket.getDocCount()); + + bucket = buckets.get(4); + assertEquals("2017-02-01T15:00:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(5); + assertEquals("2017-02-01T16:00:00.000Z", bucket.getKeyAsString()); + assertEquals(3, bucket.getDocCount()); + }, + false, + 0 + ); + + testFilterRewrite( + new MatchAllDocsQuery(), + Arrays.asList( + "2017-02-01T09:02:00.000Z", + "2017-02-01T09:35:00.000Z", + "2017-02-01T10:15:00.000Z", + "2017-02-01T13:06:00.000Z", + "2017-02-01T14:04:00.000Z", + "2017-02-01T14:05:00.000Z", + "2017-02-01T15:59:00.000Z", + "2017-02-01T16:06:00.000Z", + "2017-02-01T16:48:00.000Z", + "2017-02-01T16:59:00.000Z" + ), + aggregation -> aggregation.fixedInterval(new DateHistogramInterval("60m")).field(AGGREGABLE_DATE).minDocCount(1L), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(6, buckets.size()); + + Histogram.Bucket bucket = buckets.get(0); + assertEquals("2017-02-01T09:00:00.000Z", bucket.getKeyAsString()); + assertEquals(6, bucket.getDocCount()); + + bucket = buckets.get(1); + assertEquals("2017-02-01T10:00:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(2); + assertEquals("2017-02-01T13:00:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(3); + assertEquals("2017-02-01T14:00:00.000Z", bucket.getKeyAsString()); + assertEquals(2, bucket.getDocCount()); + + bucket = buckets.get(4); + assertEquals("2017-02-01T15:00:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(5); + assertEquals("2017-02-01T16:00:00.000Z", bucket.getKeyAsString()); + assertEquals(3, bucket.getDocCount()); + }, + true, + 10 + ); + } + + private void testFilterRewrite( + Query query, + List dataset, + Consumer configure, + Consumer verify, + boolean useDocCountField, + int actualCollectCount + ) throws IOException { + + DateFieldMapper.DateFieldType fieldType = aggregableDateFieldType(randomBoolean(), true); + + try (Directory directory = newDirectory()) { + + try ( + RandomIndexWriter indexWriter = new RandomIndexWriter( + random(), + directory, + newIndexWriterConfig().setMergePolicy(NoMergePolicy.INSTANCE) + ) + ) { + Document document = new Document(); + if (useDocCountField) { + // add the doc count field to the first document + document.add(new NumericDocValuesField(DocCountFieldMapper.NAME, 5)); + } + for (String date : dataset) { + long instant = asLong(date, fieldType); + document.add(new SortedNumericDocValuesField(AGGREGABLE_DATE, instant)); + document.add(new LongPoint(AGGREGABLE_DATE, instant)); + document.add(new LongPoint(SEARCHABLE_DATE, instant)); + indexWriter.addDocument(document); + document.clear(); + } + } + + try (IndexReader indexReader = DirectoryReader.open(directory)) { + IndexSearcher indexSearcher = newSearcher(indexReader, true, true); + + DateHistogramAggregationBuilder aggregationBuilder = new DateHistogramAggregationBuilder("_name"); + if (configure != null) { + configure.accept(aggregationBuilder); + } + + CountingAggregator aggregator = createCountingAggregator(query, aggregationBuilder, indexSearcher, fieldType); + aggregator.preCollection(); + indexSearcher.search(query, aggregator); + aggregator.postCollection(); + + MultiBucketConsumerService.MultiBucketConsumer reduceBucketConsumer = new MultiBucketConsumerService.MultiBucketConsumer( + Integer.MAX_VALUE, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ); + InternalAggregation.ReduceContext context = InternalAggregation.ReduceContext.forFinalReduction( + aggregator.context().bigArrays(), + getMockScriptService(), + reduceBucketConsumer, + PipelineAggregator.PipelineTree.EMPTY + ); + InternalDateHistogram topLevel = (InternalDateHistogram) aggregator.buildTopLevel(); + InternalDateHistogram histogram = (InternalDateHistogram) topLevel.reduce(Collections.singletonList(topLevel), context); + doAssertReducedMultiBucketConsumer(histogram, reduceBucketConsumer); + + verify.accept(histogram); + + assertEquals(aggregator.getCollectCount().get(), actualCollectCount); + } + } + } + + protected CountingAggregator createCountingAggregator( + Query query, + AggregationBuilder builder, + IndexSearcher searcher, + MappedFieldType... fieldTypes + ) throws IOException { + return new CountingAggregator( + new AtomicInteger(), + createAggregator( + query, + builder, + searcher, + new MultiBucketConsumerService.MultiBucketConsumer( + DEFAULT_MAX_BUCKETS, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ), + fieldTypes + ) + ); + } + private static long asLong(String dateTime) { return DateFormatters.from(DateFieldMapper.getDefaultDateTimeFormatter().parse(dateTime)).toInstant().toEpochMilli(); } diff --git a/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java index e538dede07fc8..22ec532f40e7d 100644 --- a/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java @@ -410,6 +410,7 @@ public boolean shouldCache(Query query) { ); fieldNameToType.putAll(getFieldAliases(fieldTypes)); + when(searchContext.maxAggRewriteFilters()).thenReturn(10_000); registerFieldTypes(searchContext, mapperService, fieldNameToType); doAnswer(invocation -> { /* Store the release-ables so we can release them at the end of the test case. This is important because aggregations don't @@ -1123,7 +1124,7 @@ protected static class CountingAggregator extends Aggregator { private final AtomicInteger collectCounter; public final Aggregator delegate; - public CountingAggregator(AtomicInteger collectCounter, TermsAggregator delegate) { + public CountingAggregator(AtomicInteger collectCounter, Aggregator delegate) { this.collectCounter = collectCounter; this.delegate = delegate; } From ba7c54939190f69d0b35f52fca7a75da9819698b Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Mon, 22 Apr 2024 18:08:56 -0700 Subject: [PATCH 05/13] clean up Signed-off-by: bowenlan-amzn --- .../bucket/FastFilterRewriteHelper.java | 212 ++---------------- 1 file changed, 24 insertions(+), 188 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java index 7b9fa4488a0a6..2f2d717340158 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java @@ -10,7 +10,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.lucene.document.LongPoint; import org.apache.lucene.index.DocValues; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.NumericDocValues; @@ -132,9 +131,10 @@ private static long[] getSegmentBounds(final LeafReaderContext context, final St } /** - * This method also acts as a pre-condition check for the optimization + * Gets the min and max bounds of the field for the shard search + * Depending on the query part, the bounds are computed differently * - * @return null if the processed query not as expected + * @return null if the processed query not supported by the optimization */ public static long[] getDateHistoAggBounds(final SearchContext context, final String fieldName) throws IOException { final Query cq = unwrapIntoConcreteQuery(context.query()); @@ -171,67 +171,6 @@ private static long[] getBoundsWithRangeQuery(PointRangeQuery prq, String fieldN return null; } - /** - * Creates the date range filters for aggregations using the interval, min/max - * bounds and prepared rounding - */ - private static Weight[] createFilterForAggregations( - final SearchContext context, - final DateFieldMapper.DateFieldType fieldType, - final long interval, - final Rounding.Prepared preparedRounding, - long low, - final long high - ) throws IOException { - // Calculate the number of buckets using range and interval - long roundedLow = preparedRounding.round(fieldType.convertNanosToMillis(low)); - long prevRounded = roundedLow; - int bucketCount = 0; - while (roundedLow <= fieldType.convertNanosToMillis(high)) { - bucketCount++; - int maxNumFilterBuckets = context.maxAggRewriteFilters(); - if (bucketCount > maxNumFilterBuckets) { - logger.debug("Max number of filters reached [{}], skip the fast filter optimization", maxNumFilterBuckets); - return null; - } - // Below rounding is needed as the interval could return in - // non-rounded values for something like calendar month - roundedLow = preparedRounding.round(roundedLow + interval); - if (prevRounded == roundedLow) break; // prevents getting into an infinite loop - prevRounded = roundedLow; - } - - Weight[] filters = null; - if (bucketCount > 0) { - filters = new Weight[bucketCount]; - roundedLow = preparedRounding.round(fieldType.convertNanosToMillis(low)); - - int i = 0; - while (i < bucketCount) { - // Calculate the lower bucket bound - final byte[] lower = new byte[8]; - NumericUtils.longToSortableBytes(i == 0 ? low : fieldType.convertRoundedMillisToNanos(roundedLow), lower, 0); - - // Calculate the upper bucket bound - roundedLow = preparedRounding.round(roundedLow + interval); - final byte[] upper = new byte[8]; - NumericUtils.longToSortableBytes(i + 1 == bucketCount ? high : - // Subtract -1 if the minimum is roundedLow as roundedLow itself - // is included in the next bucket - fieldType.convertRoundedMillisToNanos(roundedLow) - 1, upper, 0); - - filters[i++] = context.searcher().createWeight(new PointRangeQuery(fieldType.name(), lower, upper, 1) { - @Override - protected String toString(int dimension, byte[] value) { - return Long.toString(LongPoint.decodeDimension(value, 0)); - } - }, ScoreMode.COMPLETE_NO_SCORES, 1); - } - } - - return filters; - } - /** * Context object for fast filter optimization *

@@ -239,21 +178,20 @@ protected String toString(int dimension, byte[] value) { */ public static class FastFilterContext { private boolean rewriteable = false; - private Weight[] filters = null; - private boolean filtersBuiltAtShardLevel = false; + private boolean rangesBuiltAtShardLevel = false; private AggregationType aggregationType; private final SearchContext context; + private String fieldName; private long[][] ranges; + // debug info related fields public int leaf; public int inner; public int segments; public int optimizedSegments; - private String fieldName; - public void setFieldName(String fieldName) { this.fieldName = fieldName; } @@ -279,32 +217,12 @@ public boolean isRewriteable(final Object parent, final int subAggLength) { return rewriteable; } - public void buildFastFilter() throws IOException { - assert filters == null : "Filters should only be built once, but they are already built"; - this.filters = this.aggregationType.buildFastFilter(context); - if (filters != null) { - logger.debug("Fast filter built for shard {}", context.indexShard().shardId()); - filtersBuiltAtShardLevel = true; - } - } - - /** - * Built filters for a segment - */ - public Weight[] buildFastFilter(LeafReaderContext leaf) throws IOException { - Weight[] filters = this.aggregationType.buildFastFilter(leaf, context); - if (filters != null) { - logger.debug("Fast filter built for shard {} segment {}", context.indexShard().shardId(), leaf.ord); - } - return filters; - } - public void buildRanges() throws IOException { assert ranges == null : "Ranges should only be built once at shard level, but they are already built"; this.ranges = this.aggregationType.buildRanges(context); if (ranges != null) { logger.debug("Ranges built for shard {}", context.indexShard().shardId()); - filtersBuiltAtShardLevel = true; + rangesBuiltAtShardLevel = true; } } @@ -316,7 +234,7 @@ public long[][] buildRanges(LeafReaderContext leaf) throws IOException { return ranges; } - private void consumeDebugInfo(DebugInfoCollector debug) { + private void consumeDebugInfo(DebugInfo debug) { leaf += debug.leaf; inner += debug.inner; } @@ -326,13 +244,8 @@ private void consumeDebugInfo(DebugInfoCollector debug) { * Different types have different pre-conditions, filter building logic, etc. */ interface AggregationType { - boolean isRewriteable(Object parent, int subAggLength); - Weight[] buildFastFilter(SearchContext ctx) throws IOException; - - Weight[] buildFastFilter(LeafReaderContext leaf, SearchContext ctx) throws IOException; - long[][] buildRanges(SearchContext ctx) throws IOException; long[][] buildRanges(LeafReaderContext leaf, SearchContext ctx) throws IOException; @@ -372,47 +285,6 @@ public boolean isRewriteable(Object parent, int subAggLength) { return false; } - @Override - public Weight[] buildFastFilter(SearchContext context) throws IOException { - long[] bounds = getDateHistoAggBounds(context, fieldType.name()); - logger.debug("Bounds are {} for shard {}", bounds, context.indexShard().shardId()); - return buildFastFilter(context, bounds); - } - - @Override - public Weight[] buildFastFilter(LeafReaderContext leaf, SearchContext context) throws IOException { - long[] bounds = getSegmentBounds(leaf, fieldType.name()); - logger.debug("Bounds are {} for shard {} segment {}", bounds, context.indexShard().shardId(), leaf.ord); - return buildFastFilter(context, bounds); - } - - private Weight[] buildFastFilter(SearchContext context, long[] bounds) throws IOException { - bounds = processHardBounds(bounds); - if (bounds == null) { - return null; - } - assert bounds[0] <= bounds[1] : "Low bound should be less than high bound"; - - final Rounding rounding = getRounding(bounds[0], bounds[1]); - final OptionalLong intervalOpt = Rounding.getInterval(rounding); - if (intervalOpt.isEmpty()) { - return null; - } - final long interval = intervalOpt.getAsLong(); - - // process the after key of composite agg - processAfterKey(bounds, interval); - - return FastFilterRewriteHelper.createFilterForAggregations( - context, - (DateFieldMapper.DateFieldType) fieldType, - interval, - getRoundingPrepared(), - bounds[0], - bounds[1] - ); - } - @Override public long[][] buildRanges(SearchContext context) throws IOException { long[] bounds = getDateHistoAggBounds(context, fieldType.name()); @@ -517,8 +389,7 @@ public static boolean tryFastFilterAggregation( PointValues values = ctx.reader().getPointValues(fastFilterContext.fieldName); if (values == null) return false; - // date field is 1 dimensional - // only proceed if every document has exactly one point for this field + // only proceed if every document corresponds to exactly one point if (values.getDocCount() != values.size()) return false; NumericDocValues docCountValues = DocValues.getNumeric(ctx.reader(), DocCountFieldMapper.NAME); @@ -531,12 +402,11 @@ public static boolean tryFastFilterAggregation( return false; } - // if no filters built at shard level (see getDateHistoAggBounds method for possible reasons) - // check if the query is functionally match-all at segment level - if (!fastFilterContext.filtersBuiltAtShardLevel && !segmentMatchAll(fastFilterContext.context, ctx)) { + // even if no ranges built at shard level, we can still perform the optimization + // when functionally match-all at segment level + if (!fastFilterContext.rangesBuiltAtShardLevel && !segmentMatchAll(fastFilterContext.context, ctx)) { return false; } - long[][] ranges = fastFilterContext.ranges; if (ranges == null) { logger.debug( @@ -553,7 +423,7 @@ public static boolean tryFastFilterAggregation( final DateFieldMapper.DateFieldType fieldType = ((AbstractDateHistogramAggregationType) fastFilterContext.aggregationType) .getFieldType(); int size = fastFilterContext.aggregationType.getSize(); - DebugInfoCollector debugInfo = multiRangesTraverse(values.getPointTree(), ranges, incrementDocCount, fieldType, size); + DebugInfo debugInfo = multiRangesTraverse(values.getPointTree(), ranges, incrementDocCount, fieldType, size); fastFilterContext.consumeDebugInfo(debugInfo); logger.debug("Fast filter optimization applied to shard {} segment {}", fastFilterContext.context.indexShard().shardId(), ctx.ord); @@ -566,6 +436,10 @@ private static boolean segmentMatchAll(SearchContext ctx, LeafReaderContext leaf return weight != null && weight.count(leafCtx) == leafCtx.reader().numDocs(); } + /** + * Creates the date ranges from date histo aggregations using its interval, + * and min/max boundaries + */ private static long[][] createRangesFromAgg( final SearchContext context, final DateFieldMapper.DateFieldType fieldType, @@ -599,16 +473,11 @@ private static long[][] createRangesFromAgg( int i = 0; while (i < bucketCount) { // Calculate the lower bucket bound - // final byte[] lower = new byte[8]; - // NumericUtils.longToSortableBytes(i == 0 ? low : fieldType.convertRoundedMillisToNanos(roundedLow), lower, 0); long lower = i == 0 ? low : fieldType.convertRoundedMillisToNanos(roundedLow); roundedLow = preparedRounding.round(roundedLow + interval); - // final byte[] upper = new byte[8]; - // NumericUtils.longToSortableBytes(i + 1 == bucketCount ? high : - // // Subtract -1 if the minimum is roundedLow as roundedLow itself - // // is included in the next bucket - // fieldType.convertRoundedMillisToNanos(roundedLow) - 1, upper, 0); + // Subtract -1 if the minimum is roundedLow as roundedLow itself + // is included in the next bucket long upper = i + 1 == bucketCount ? high : fieldType.convertRoundedMillisToNanos(roundedLow) - 1; ranges[i][0] = lower; @@ -620,7 +489,7 @@ private static long[][] createRangesFromAgg( return ranges; } - private static DebugInfoCollector multiRangesTraverse( + private static DebugInfo multiRangesTraverse( final PointValues.PointTree tree, final long[][] ranges, final BiConsumer incrementDocCount, @@ -631,7 +500,7 @@ private static DebugInfoCollector multiRangesTraverse( long[] activeRange = rangeIter.next(); // The ranges are connected and in ascending order - // make sure the first range to collect is at least cross the min value of the tree + // make sure the first range at least crosses the min value of the tree boolean noRangeMatches = false; while (activeRange[1] < NumericUtils.sortableBytesToLong(tree.getMinPackedValue(), 0)) { if (rangeIter.hasNext()) { @@ -641,7 +510,7 @@ private static DebugInfoCollector multiRangesTraverse( break; } } - DebugInfoCollector debugInfo = new DebugInfoCollector(); + DebugInfo debugInfo = new DebugInfo(); if (noRangeMatches) { return debugInfo; } @@ -656,16 +525,9 @@ private static DebugInfoCollector multiRangesTraverse( return debugInfo; } - private static void intersectWithRanges(PointValues.IntersectVisitor visitor, PointValues.PointTree pointTree, DebugInfoCollector debug) + private static void intersectWithRanges(PointValues.IntersectVisitor visitor, PointValues.PointTree pointTree, DebugInfo debug) throws IOException { - // long min = NumericUtils.sortableBytesToLong(pointTree.getMinPackedValue(), 0); - // long max = NumericUtils.sortableBytesToLong(pointTree.getMaxPackedValue(), 0); - // maxPackedValue seems to be the max value + 1 - // System.out.println("==============="); - // System.out.println("current node range min=" + min + " max=" + max); - PointValues.Relation r = visitor.compare(pointTree.getMinPackedValue(), pointTree.getMaxPackedValue()); - // System.out.println("relation=" + r); try { switch (r) { @@ -687,7 +549,6 @@ private static void intersectWithRanges(PointValues.IntersectVisitor visitor, Po case CELL_OUTSIDE_QUERY: } } catch (CollectionTerminatedException e) { - // ignore logger.debug("Early terminate since no more range to collect"); } } @@ -697,40 +558,20 @@ private static void intersectWithRanges(PointValues.IntersectVisitor visitor, Po */ private static PointValues.IntersectVisitor getIntersectVisitor(RangeCollectorForPointTree collector) { return new PointValues.IntersectVisitor() { - - /** - * The doc visited is ever-increasing in terms of its value - * The node range visited is every-increasing at any level - * possible next node is sibling or parent sibling, this is the proof of ever-increasing - *

- * the first range is either inside inner or leaf node, or cross leaf - * inside node won't change activeRange - * cross leaf will - * after the first node, the next node could change activeRange - * Compare min max of next node with next range, when its outside activeRange - * ranges are always connected, but the values may not, so should iterate ranges until range[0] >= min - *

- * if node cross activeRange, we need to visit children recursively, we will always be able to stop at leaf or when found the inner - *

- */ - @Override public void visit(int docID) throws IOException { - // System.out.println("visit docID=" + docID); collector.count(); } @Override public void visit(int docID, byte[] packedValue) throws IOException { long value = NumericUtils.sortableBytesToLong(packedValue, 0); - // System.out.println("value" + value + " count=" + 1); if (value > collector.activeRange[1]) { // need to move to next range collector.finalizePreviousRange(); if (collector.iterateRangeEnd(value)) { throw new CollectionTerminatedException(); - // return; } } if (collector.activeRange[0] <= value && value <= collector.activeRange[1]) { @@ -740,15 +581,12 @@ public void visit(int docID, byte[] packedValue) throws IOException { @Override public void visit(DocIdSetIterator iterator, byte[] packedValue) throws IOException { - logger.debug("visit iterator with packedValue"); long value = NumericUtils.sortableBytesToLong(packedValue, 0); - // System.out.println("value" + value + " count=" + count); if (value > collector.activeRange[1]) { collector.finalizePreviousRange(); if (collector.iterateRangeEnd(value)) { throw new CollectionTerminatedException(); - // return; } } if (collector.activeRange[0] <= value && value <= collector.activeRange[1]) { @@ -768,9 +606,7 @@ public PointValues.Relation compare(byte[] minPackedValue, byte[] maxPackedValue boolean crosses = false; if (collector.activeRange[1] < min) { // need to move to next range - // finalize the results for the previous range collector.finalizePreviousRange(); - // go to next range if (collector.iterateRangeEnd(min)) { throw new CollectionTerminatedException(); // return PointValues.Relation.CELL_OUTSIDE_QUERY; @@ -847,7 +683,7 @@ private boolean iterateRangeEnd(long value) { } } - private static class DebugInfoCollector { + private static class DebugInfo { private int leaf = 0; // leaf node visited private int inner = 0; // inner node visited From 59a80224f19cfc4ef48af0d96afc09f88e532a07 Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Mon, 22 Apr 2024 18:36:16 -0700 Subject: [PATCH 06/13] add debug info Signed-off-by: bowenlan-amzn --- .../bucket/composite/CompositeAggregator.java | 13 ++++++++++++- .../histogram/AutoDateHistogramAggregator.java | 11 +++++++++++ .../bucket/histogram/DateHistogramAggregator.java | 2 -- .../search/aggregations/AggregatorTestCase.java | 1 - 4 files changed, 23 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java index 786e7e47537b9..1a3095b1f494b 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -87,6 +87,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.BiConsumer; import java.util.function.LongUnaryOperator; import java.util.stream.Collectors; @@ -171,7 +172,7 @@ final class CompositeAggregator extends BucketsAggregator { // bucketOrds is used for saving date histogram results bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), CardinalityUpperBound.ONE); preparedRounding = ((CompositeAggregationType) fastFilterContext.getAggregationType()).getRoundingPrepared(); - fastFilterContext.setFieldName(sourceConfigs[0].name()); + fastFilterContext.setFieldName(sourceConfigs[0].fieldType().name()); fastFilterContext.buildRanges(); } } @@ -707,4 +708,14 @@ private static class Entry { this.docIdSet = docIdSet; } } + + @Override + public void collectDebugInfo(BiConsumer add) { + if (fastFilterContext.optimizedSegments > 0) { + add.accept("optimized_segments", fastFilterContext.optimizedSegments); + add.accept("unoptimized_segments", fastFilterContext.segments - fastFilterContext.optimizedSegments); + add.accept("leaf_visited", fastFilterContext.leaf); + add.accept("inner_visited", fastFilterContext.inner); + } + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java index 9e59f33fef104..f326426800909 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java @@ -308,6 +308,17 @@ protected final void merge(long[] mergeMap, long newNumBuckets) { } } + @Override + public void collectDebugInfo(BiConsumer add) { + super.collectDebugInfo(add); + if (fastFilterContext.optimizedSegments > 0) { + add.accept("optimized_segments", fastFilterContext.optimizedSegments); + add.accept("unoptimized_segments", fastFilterContext.segments - fastFilterContext.optimizedSegments); + add.accept("leaf_visited", fastFilterContext.leaf); + add.accept("inner_visited", fastFilterContext.inner); + } + } + /** * Initially it uses the most fine grained rounding configuration possible * but as more data arrives it rebuckets the data until it "fits" in the diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java index 148403728a912..dd4ee9196fd62 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java @@ -171,8 +171,6 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol ) ); if (optimized) throw new CollectionTerminatedException(); - // we will return the debug info for each segment - // or we should just cache it in the fast filter context SortedNumericDocValues values = valuesSource.longValues(ctx); return new LeafBucketCollectorBase(sub, values) { diff --git a/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java index 22ec532f40e7d..02e5d22e147d5 100644 --- a/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java @@ -126,7 +126,6 @@ import org.opensearch.search.aggregations.AggregatorFactories.Builder; import org.opensearch.search.aggregations.MultiBucketConsumerService.MultiBucketConsumer; import org.opensearch.search.aggregations.bucket.nested.NestedAggregationBuilder; -import org.opensearch.search.aggregations.bucket.terms.TermsAggregator; import org.opensearch.search.aggregations.metrics.MetricsAggregator; import org.opensearch.search.aggregations.pipeline.PipelineAggregator; import org.opensearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree; From 43669259366d2a36463e5941581e707d8ee95522 Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Thu, 25 Apr 2024 10:35:11 -0700 Subject: [PATCH 07/13] update compare to use byte array Signed-off-by: bowenlan-amzn --- .../bucket/FastFilterRewriteHelper.java | 188 ++++++++++-------- 1 file changed, 106 insertions(+), 82 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java index 2f2d717340158..efce51cd06814 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java @@ -24,7 +24,9 @@ import org.apache.lucene.search.Query; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.Weight; +import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.NumericUtils; +import org.opensearch.common.CheckedRunnable; import org.opensearch.common.Rounding; import org.opensearch.common.lucene.search.function.FunctionScoreQuery; import org.opensearch.index.mapper.DateFieldMapper; @@ -496,132 +498,137 @@ private static DebugInfo multiRangesTraverse( final DateFieldMapper.DateFieldType fieldType, final int size ) throws IOException { + // ranges are connected and in ascending order Iterator rangeIter = Arrays.stream(ranges).iterator(); long[] activeRange = rangeIter.next(); - // The ranges are connected and in ascending order // make sure the first range at least crosses the min value of the tree boolean noRangeMatches = false; - while (activeRange[1] < NumericUtils.sortableBytesToLong(tree.getMinPackedValue(), 0)) { - if (rangeIter.hasNext()) { - activeRange = rangeIter.next(); - } else { - noRangeMatches = true; - break; + if (activeRange[0] > NumericUtils.sortableBytesToLong(tree.getMaxPackedValue(), 0)) { + noRangeMatches = true; + } else { + while (activeRange[1] < NumericUtils.sortableBytesToLong(tree.getMinPackedValue(), 0)) { + if (rangeIter.hasNext()) { + activeRange = rangeIter.next(); + } else { + noRangeMatches = true; + break; + } } } DebugInfo debugInfo = new DebugInfo(); if (noRangeMatches) { + logger.debug("No ranges match the query, skip the fast filter optimization"); return debugInfo; } - RangeCollectorForPointTree collector = new RangeCollectorForPointTree(incrementDocCount, fieldType, rangeIter, size); - collector.setActiveRange(activeRange); + RangeCollectorForPointTree collector = new RangeCollectorForPointTree(incrementDocCount, fieldType, rangeIter, size, activeRange); - PointValues.IntersectVisitor visitor = getIntersectVisitor(collector); - intersectWithRanges(visitor, tree, debugInfo); + final ArrayUtil.ByteArrayComparator comparator = ArrayUtil.getUnsignedComparator(8); + PointValues.IntersectVisitor visitor = getIntersectVisitor(collector, comparator); + try { + intersectWithRanges(visitor, tree, collector, debugInfo); + } catch (CollectionTerminatedException e) { + logger.debug("Early terminate since no more range to collect"); + } collector.finalizePreviousRange(); return debugInfo; } - private static void intersectWithRanges(PointValues.IntersectVisitor visitor, PointValues.PointTree pointTree, DebugInfo debug) - throws IOException { + private static void intersectWithRanges( + PointValues.IntersectVisitor visitor, + PointValues.PointTree pointTree, + RangeCollectorForPointTree collector, + DebugInfo debug + ) throws IOException { PointValues.Relation r = visitor.compare(pointTree.getMinPackedValue(), pointTree.getMaxPackedValue()); - try { - switch (r) { - case CELL_INSIDE_QUERY: - pointTree.visitDocIDs(visitor); - debug.visitInner(); - break; - case CELL_CROSSES_QUERY: - if (pointTree.moveToChild()) { - intersectWithRanges(visitor, pointTree, debug); - pointTree.moveToSibling(); - intersectWithRanges(visitor, pointTree, debug); - pointTree.moveToParent(); - } else { - pointTree.visitDocValues(visitor); - debug.visitLeaf(); - } - break; - case CELL_OUTSIDE_QUERY: - } - } catch (CollectionTerminatedException e) { - logger.debug("Early terminate since no more range to collect"); + switch (r) { + case CELL_INSIDE_QUERY: + collector.countNode((int) pointTree.size()); + debug.visitInner(); + break; + case CELL_CROSSES_QUERY: + if (pointTree.moveToChild()) { + intersectWithRanges(visitor, pointTree, collector, debug); + pointTree.moveToSibling(); + intersectWithRanges(visitor, pointTree, collector, debug); + pointTree.moveToParent(); + } else { + pointTree.visitDocValues(visitor); + debug.visitLeaf(); + } + break; + case CELL_OUTSIDE_QUERY: } } - /** - * - */ - private static PointValues.IntersectVisitor getIntersectVisitor(RangeCollectorForPointTree collector) { + private static PointValues.IntersectVisitor getIntersectVisitor( + RangeCollectorForPointTree collector, + ArrayUtil.ByteArrayComparator comparator + ) { return new PointValues.IntersectVisitor() { @Override public void visit(int docID) throws IOException { - collector.count(); + // this branch should be unreachable + throw new UnsupportedOperationException( + "This IntersectVisitor does not perform any actions on a " + "docID=" + docID + " node being visited" + ); } @Override public void visit(int docID, byte[] packedValue) throws IOException { - long value = NumericUtils.sortableBytesToLong(packedValue, 0); - if (value > collector.activeRange[1]) { - // need to move to next range - collector.finalizePreviousRange(); - - if (collector.iterateRangeEnd(value)) { - throw new CollectionTerminatedException(); - } - } - if (collector.activeRange[0] <= value && value <= collector.activeRange[1]) { - collector.count(); - } + visitPoints(packedValue, collector::count); } @Override public void visit(DocIdSetIterator iterator, byte[] packedValue) throws IOException { - long value = NumericUtils.sortableBytesToLong(packedValue, 0); - if (value > collector.activeRange[1]) { - collector.finalizePreviousRange(); + visitPoints(packedValue, () -> { + for (int doc = iterator.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = iterator.nextDoc()) { + collector.count(); + } + }); + } - if (collector.iterateRangeEnd(value)) { + private void visitPoints(byte[] packedValue, CheckedRunnable collect) throws IOException { + if (comparator.compare(packedValue, 0, collector.activeRangeAsByteArray[1], 0) > 0) { + // need to move to next range + collector.finalizePreviousRange(); + if (collector.iterateRangeEnd(packedValue, comparator)) { throw new CollectionTerminatedException(); } } - if (collector.activeRange[0] <= value && value <= collector.activeRange[1]) { - for (int doc = iterator.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = iterator.nextDoc()) { - collector.count(); - } + + if (pointCompare(collector.activeRangeAsByteArray[0], collector.activeRangeAsByteArray[1], packedValue)) { + collect.run(); } } + private boolean pointCompare(byte[] lower, byte[] upper, byte[] packedValue) { + if (comparator.compare(packedValue, 0, lower, 0) < 0) { + return false; + } + return comparator.compare(packedValue, 0, upper, 0) <= 0; + } + @Override public PointValues.Relation compare(byte[] minPackedValue, byte[] maxPackedValue) { - long min = NumericUtils.sortableBytesToLong(minPackedValue, 0); - long max = NumericUtils.sortableBytesToLong(maxPackedValue, 0); - long queryMin = collector.activeRange[0]; - long queryMax = collector.activeRange[1]; + byte[] rangeMin = collector.activeRangeAsByteArray[0]; + byte[] rangeMax = collector.activeRangeAsByteArray[1]; - boolean crosses = false; - if (collector.activeRange[1] < min) { - // need to move to next range + if (comparator.compare(rangeMax, 0, minPackedValue, 0) < 0) { collector.finalizePreviousRange(); - if (collector.iterateRangeEnd(min)) { + if (collector.iterateRangeEnd(minPackedValue, comparator)) { throw new CollectionTerminatedException(); - // return PointValues.Relation.CELL_OUTSIDE_QUERY; } // compare the next range with this node's min max again - // it cannot be outside again, can only be cross or inside - if (collector.activeRange[1] < max) { - crosses = true; - } - } else if (queryMin > min || queryMax < max) { - crosses = true; + // new rangeMin = previous rangeMax + 1 <= min + rangeMax = collector.activeRangeAsByteArray[1]; } - if (crosses) { + if (comparator.compare(rangeMin, 0, minPackedValue, 0) > 0 || comparator.compare(rangeMax, 0, maxPackedValue, 0) < 0) { return PointValues.Relation.CELL_CROSSES_QUERY; } else { return PointValues.Relation.CELL_INSIDE_QUERY; @@ -634,8 +641,11 @@ private static class RangeCollectorForPointTree { private final BiConsumer incrementDocCount; private final DateFieldMapper.DateFieldType fieldType; private int counter = 0; + private long[] activeRange; + private byte[][] activeRangeAsByteArray; private final Iterator rangeIter; + private int visitedRange = 0; private final int size; // the given size of non-zero buckets used in composite agg @@ -643,44 +653,58 @@ public RangeCollectorForPointTree( BiConsumer incrementDocCount, DateFieldMapper.DateFieldType fieldType, Iterator rangeIter, - int size + int size, + long[] activeRange ) { this.incrementDocCount = incrementDocCount; this.fieldType = fieldType; this.rangeIter = rangeIter; this.size = size; + this.activeRange = activeRange; + this.activeRangeAsByteArray = activeRangeAsByteArray(); } private void count() { counter++; } + private void countNode(int count) { + counter += count; + } + private void finalizePreviousRange() { if (counter > 0) { + logger.debug("finalize previous range: {}", activeRange[0]); + logger.debug("counter: {}", counter); incrementDocCount.accept(fieldType.convertNanosToMillis(activeRange[0]), counter); counter = 0; } } - private void setActiveRange(long[] activeRange) { - this.activeRange = activeRange; - } - /** * @return true when iterator exhausted or collect enough non-zero ranges */ - private boolean iterateRangeEnd(long value) { + private boolean iterateRangeEnd(byte[] value, ArrayUtil.ByteArrayComparator comparator) { // the new value may not be contiguous to the previous one // so try to find the first next range that cross the new value - while (activeRange[1] < value) { + while (comparator.compare(activeRangeAsByteArray[1], 0, value, 0) < 0) { if (!rangeIter.hasNext()) { return true; } activeRange = rangeIter.next(); + activeRangeAsByteArray = activeRangeAsByteArray(); } visitedRange++; return visitedRange > size; } + + private byte[][] activeRangeAsByteArray() { + byte[] lower = new byte[8]; + byte[] upper = new byte[8]; + NumericUtils.longToSortableBytes(activeRange[0], lower, 0); + NumericUtils.longToSortableBytes(activeRange[1], upper, 0); + return new byte[][] { lower, upper }; + } } private static class DebugInfo { From 3cc4693d80b58900290ffc335df87778b24f3567 Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Fri, 26 Apr 2024 10:40:30 -0700 Subject: [PATCH 08/13] rest level tests Signed-off-by: bowenlan-amzn --- .../test/search.aggregation/10_histogram.yml | 60 +++++++++++++++++-- .../test/search.aggregation/230_composite.yml | 58 ++++++++++++++++++ .../330_auto_date_histogram.yml | 6 +- 3 files changed, 117 insertions(+), 7 deletions(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/10_histogram.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/10_histogram.yml index 0ab36ff470b67..1de1801d1dbc1 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/10_histogram.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/10_histogram.yml @@ -599,10 +599,6 @@ setup: - match: { profile.shards.0.aggregations.0.type: DateHistogramAggregator } - match: { profile.shards.0.aggregations.0.description: histo } - match: { profile.shards.0.aggregations.0.debug.total_buckets: 3 } - - match: { profile.shards.0.aggregations.0.debug.optimized_segments: 1 } - - match: { profile.shards.0.aggregations.0.debug.unoptimized_segments: 0 } - - match: { profile.shards.0.aggregations.0.debug.leaf_visited: 1 } - - match: { profile.shards.0.aggregations.0.debug.inner_visited: 0 } --- "histogram with hard bounds": @@ -648,3 +644,59 @@ setup: - match: { aggregations.histo.buckets.0.doc_count: 1 } - match: { aggregations.histo.buckets.20.key: 20 } - match: { aggregations.histo.buckets.20.doc_count: 1 } + +--- +"date_histogram profiler shows filter rewrite info": + - skip: + version: " - 2.99.99" + reason: debug info for filter rewrite added in 3.0.0 + + - do: + indices.create: + index: test_2 + body: + settings: + number_of_replicas: 0 + number_of_shards: 1 + mappings: + properties: + date: + type: date + + - do: + bulk: + index: test_2 + refresh: true + body: + - '{"index": {}}' + - '{"date": "2016-01-01"}' + - '{"index": {}}' + - '{"date": "2016-01-02"}' + - '{"index": {}}' + - '{"date": "2016-02-01"}' + - '{"index": {}}' + - '{"date": "2016-03-01"}' + + - do: + search: + index: test_2 + body: + size: 0 + profile: true + aggs: + histo: + date_histogram: + field: date + calendar_interval: month + + - match: { hits.total.value: 4 } + - length: { aggregations.histo.buckets: 3 } + - match: { aggregations.histo.buckets.0.key_as_string: "2016-01-01T00:00:00.000Z" } + - match: { aggregations.histo.buckets.0.doc_count: 2 } + - match: { profile.shards.0.aggregations.0.type: DateHistogramAggregator } + - match: { profile.shards.0.aggregations.0.description: histo } + - match: { profile.shards.0.aggregations.0.debug.total_buckets: 3 } + - match: { profile.shards.0.aggregations.0.debug.optimized_segments: 1 } + - match: { profile.shards.0.aggregations.0.debug.unoptimized_segments: 0 } + - match: { profile.shards.0.aggregations.0.debug.leaf_visited: 1 } + - match: { profile.shards.0.aggregations.0.debug.inner_visited: 0 } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/230_composite.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/230_composite.yml index 2808be8cd7045..310bd11b47466 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/230_composite.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/230_composite.yml @@ -1069,3 +1069,61 @@ setup: - match: { aggregations.test.buckets.1.doc_count: 2 } - match: { aggregations.test.buckets.2.key.kw: null } - match: { aggregations.test.buckets.2.doc_count: 2 } + +--- +"composite aggregation date_histogram profile shows filter rewrite info": + - skip: + version: " - 2.99.99" + reason: debug info for filter rewrite added in 3.0.0 + + - do: + indices.create: + index: test_2 + body: + settings: + number_of_replicas: 0 + number_of_shards: 1 + mappings: + properties: + date: + type: date + - do: + bulk: + index: test_2 + refresh: true + body: + - '{"index": {}}' + - '{"date": "2016-01-01"}' + - '{"index": {}}' + - '{"date": "2016-01-02"}' + - '{"index": {}}' + - '{"date": "2016-02-01"}' + - '{"index": {}}' + - '{"date": "2016-03-01"}' + - do: + search: + index: test_2 + body: + size: 0 + profile: true + aggregations: + test: + composite: + sources: [ + { + "date": { + "date_histogram": { + "field": "date", + "calendar_interval": "1d", + "format": "strict_date" + } + } + } + ] + + - match: { hits.total.value: 4 } + - length: { aggregations.test.buckets: 4 } + - match: { profile.shards.0.aggregations.0.debug.optimized_segments: 1 } + - match: { profile.shards.0.aggregations.0.debug.unoptimized_segments: 0 } + - match: { profile.shards.0.aggregations.0.debug.leaf_visited: 1 } + - match: { profile.shards.0.aggregations.0.debug.inner_visited: 0 } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/330_auto_date_histogram.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/330_auto_date_histogram.yml index 6b5e06a549be3..83dc3d8150b88 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/330_auto_date_histogram.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/330_auto_date_histogram.yml @@ -79,10 +79,10 @@ setup: - match: { aggregations.histo_avg_v.value: 5 } --- -"profile at top level": +"auto_date_histogram profile shows filter rewrite info": - skip: - version: " - 7.99.99" - reason: Debug information added in 8.0.0 (to be backported to 7.9.0) + version: " - 2.99.99" + reason: debug info for filter rewrite added in 3.0.0 - do: search: From e13cd4b0ec6c9403eb7481a69e2bfda3116838a0 Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Sun, 28 Apr 2024 19:08:53 -0700 Subject: [PATCH 09/13] address comments Signed-off-by: bowenlan-amzn --- .../bucket/FastFilterRewriteHelper.java | 89 +++++++++++-------- .../bucket/composite/CompositeAggregator.java | 5 +- 2 files changed, 52 insertions(+), 42 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java index efce51cd06814..9529d678b0022 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java @@ -33,6 +33,7 @@ import org.opensearch.index.mapper.DocCountFieldMapper; import org.opensearch.index.mapper.MappedFieldType; import org.opensearch.index.query.DateRangeIncludingNowQuery; +import org.opensearch.search.aggregations.bucket.composite.CompositeAggregator; import org.opensearch.search.aggregations.bucket.composite.CompositeValuesSourceConfig; import org.opensearch.search.aggregations.bucket.composite.RoundingValuesSource; import org.opensearch.search.aggregations.bucket.histogram.LongBounds; @@ -46,6 +47,7 @@ import java.util.Map; import java.util.OptionalLong; import java.util.function.BiConsumer; +import java.util.function.BiFunction; import java.util.function.Function; import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; @@ -251,10 +253,6 @@ interface AggregationType { long[][] buildRanges(SearchContext ctx) throws IOException; long[][] buildRanges(LeafReaderContext leaf, SearchContext ctx) throws IOException; - - default int getSize() { - return Integer.MAX_VALUE; - } } /** @@ -422,14 +420,19 @@ public static boolean tryFastFilterAggregation( } } - final DateFieldMapper.DateFieldType fieldType = ((AbstractDateHistogramAggregationType) fastFilterContext.aggregationType) - .getFieldType(); - int size = fastFilterContext.aggregationType.getSize(); + final AggregationType aggregationType = fastFilterContext.aggregationType; + assert aggregationType instanceof AbstractDateHistogramAggregationType; + final DateFieldMapper.DateFieldType fieldType = ((AbstractDateHistogramAggregationType) aggregationType).getFieldType(); + int size = Integer.MAX_VALUE; + if (aggregationType instanceof CompositeAggregator.CompositeAggregationType) { + size = ((CompositeAggregator.CompositeAggregationType) aggregationType).getSize(); + } DebugInfo debugInfo = multiRangesTraverse(values.getPointTree(), ranges, incrementDocCount, fieldType, size); fastFilterContext.consumeDebugInfo(debugInfo); - logger.debug("Fast filter optimization applied to shard {} segment {}", fastFilterContext.context.indexShard().shardId(), ctx.ord); fastFilterContext.optimizedSegments++; + logger.debug("Fast filter optimization applied to shard {} segment {}", fastFilterContext.context.indexShard().shardId(), ctx.ord); + logger.debug("crossed leaf nodes: {}, inner nodes: {}", fastFilterContext.leaf, fastFilterContext.inner); return true; } @@ -491,38 +494,41 @@ private static long[][] createRangesFromAgg( return ranges; } + /** + * @param maxNumNonZeroRanges the number of non-zero ranges to collect + */ private static DebugInfo multiRangesTraverse( final PointValues.PointTree tree, final long[][] ranges, final BiConsumer incrementDocCount, final DateFieldMapper.DateFieldType fieldType, - final int size + final int maxNumNonZeroRanges ) throws IOException { // ranges are connected and in ascending order Iterator rangeIter = Arrays.stream(ranges).iterator(); long[] activeRange = rangeIter.next(); // make sure the first range at least crosses the min value of the tree - boolean noRangeMatches = false; - if (activeRange[0] > NumericUtils.sortableBytesToLong(tree.getMaxPackedValue(), 0)) { - noRangeMatches = true; - } else { - while (activeRange[1] < NumericUtils.sortableBytesToLong(tree.getMinPackedValue(), 0)) { - if (rangeIter.hasNext()) { - activeRange = rangeIter.next(); - } else { - noRangeMatches = true; - break; - } - } - } DebugInfo debugInfo = new DebugInfo(); - if (noRangeMatches) { + if (activeRange[0] > NumericUtils.sortableBytesToLong(tree.getMaxPackedValue(), 0)) { logger.debug("No ranges match the query, skip the fast filter optimization"); return debugInfo; } + while (activeRange[1] < NumericUtils.sortableBytesToLong(tree.getMinPackedValue(), 0)) { + if (!rangeIter.hasNext()) { + logger.debug("No ranges match the query, skip the fast filter optimization"); + return debugInfo; + } + activeRange = rangeIter.next(); + } - RangeCollectorForPointTree collector = new RangeCollectorForPointTree(incrementDocCount, fieldType, rangeIter, size, activeRange); + RangeCollectorForPointTree collector = new RangeCollectorForPointTree( + incrementDocCount, + fieldType, + rangeIter, + maxNumNonZeroRanges, + activeRange + ); final ArrayUtil.ByteArrayComparator comparator = ArrayUtil.getUnsignedComparator(8); PointValues.IntersectVisitor visitor = getIntersectVisitor(collector, comparator); @@ -551,9 +557,9 @@ private static void intersectWithRanges( break; case CELL_CROSSES_QUERY: if (pointTree.moveToChild()) { - intersectWithRanges(visitor, pointTree, collector, debug); - pointTree.moveToSibling(); - intersectWithRanges(visitor, pointTree, collector, debug); + do { + intersectWithRanges(visitor, pointTree, collector, debug); + } while (pointTree.moveToSibling()); pointTree.moveToParent(); } else { pointTree.visitDocValues(visitor); @@ -595,7 +601,7 @@ private void visitPoints(byte[] packedValue, CheckedRunnable collec if (comparator.compare(packedValue, 0, collector.activeRangeAsByteArray[1], 0) > 0) { // need to move to next range collector.finalizePreviousRange(); - if (collector.iterateRangeEnd(packedValue, comparator)) { + if (collector.iterateRangeEnd(packedValue, this::compareByteValue)) { throw new CollectionTerminatedException(); } } @@ -606,10 +612,15 @@ private void visitPoints(byte[] packedValue, CheckedRunnable collec } private boolean pointCompare(byte[] lower, byte[] upper, byte[] packedValue) { - if (comparator.compare(packedValue, 0, lower, 0) < 0) { + if (compareByteValue(packedValue, lower) < 0) { + return false; } - return comparator.compare(packedValue, 0, upper, 0) <= 0; + return compareByteValue(packedValue, upper) <= 0; + } + + private int compareByteValue(byte[] value1, byte[] value2) { + return comparator.compare(value1, 0, value2, 0); } @Override @@ -617,9 +628,9 @@ public PointValues.Relation compare(byte[] minPackedValue, byte[] maxPackedValue byte[] rangeMin = collector.activeRangeAsByteArray[0]; byte[] rangeMax = collector.activeRangeAsByteArray[1]; - if (comparator.compare(rangeMax, 0, minPackedValue, 0) < 0) { + if (compareByteValue(rangeMax, minPackedValue) < 0) { collector.finalizePreviousRange(); - if (collector.iterateRangeEnd(minPackedValue, comparator)) { + if (collector.iterateRangeEnd(minPackedValue, this::compareByteValue)) { throw new CollectionTerminatedException(); } @@ -628,7 +639,7 @@ public PointValues.Relation compare(byte[] minPackedValue, byte[] maxPackedValue rangeMax = collector.activeRangeAsByteArray[1]; } - if (comparator.compare(rangeMin, 0, minPackedValue, 0) > 0 || comparator.compare(rangeMax, 0, maxPackedValue, 0) < 0) { + if (compareByteValue(rangeMin, minPackedValue) > 0 || compareByteValue(rangeMax, maxPackedValue) < 0) { return PointValues.Relation.CELL_CROSSES_QUERY; } else { return PointValues.Relation.CELL_INSIDE_QUERY; @@ -647,19 +658,19 @@ private static class RangeCollectorForPointTree { private final Iterator rangeIter; private int visitedRange = 0; - private final int size; // the given size of non-zero buckets used in composite agg + private final int maxNumNonZeroRange; public RangeCollectorForPointTree( BiConsumer incrementDocCount, DateFieldMapper.DateFieldType fieldType, Iterator rangeIter, - int size, + int maxNumNonZeroRange, long[] activeRange ) { this.incrementDocCount = incrementDocCount; this.fieldType = fieldType; this.rangeIter = rangeIter; - this.size = size; + this.maxNumNonZeroRange = maxNumNonZeroRange; this.activeRange = activeRange; this.activeRangeAsByteArray = activeRangeAsByteArray(); } @@ -684,10 +695,10 @@ private void finalizePreviousRange() { /** * @return true when iterator exhausted or collect enough non-zero ranges */ - private boolean iterateRangeEnd(byte[] value, ArrayUtil.ByteArrayComparator comparator) { + private boolean iterateRangeEnd(byte[] value, BiFunction comparator) { // the new value may not be contiguous to the previous one // so try to find the first next range that cross the new value - while (comparator.compare(activeRangeAsByteArray[1], 0, value, 0) < 0) { + while (comparator.apply(activeRangeAsByteArray[1], value) < 0) { if (!rangeIter.hasNext()) { return true; } @@ -695,7 +706,7 @@ private boolean iterateRangeEnd(byte[] value, ArrayUtil.ByteArrayComparator comp activeRangeAsByteArray = activeRangeAsByteArray(); } visitedRange++; - return visitedRange > size; + return visitedRange > maxNumNonZeroRange; } private byte[][] activeRangeAsByteArray() { diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java index 1a3095b1f494b..3713d8f83990d 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -98,7 +98,7 @@ * * @opensearch.internal */ -final class CompositeAggregator extends BucketsAggregator { +public final class CompositeAggregator extends BucketsAggregator { private final int size; private final List sourceNames; private final int[] reverseMuls; @@ -180,7 +180,7 @@ final class CompositeAggregator extends BucketsAggregator { /** * Currently the filter rewrite is only supported for date histograms */ - private class CompositeAggregationType extends FastFilterRewriteHelper.AbstractDateHistogramAggregationType { + public class CompositeAggregationType extends FastFilterRewriteHelper.AbstractDateHistogramAggregationType { private final RoundingValuesSource valuesSource; private long afterKey = -1L; @@ -212,7 +212,6 @@ protected void processAfterKey(long[] bound, long interval) { } } - @Override public int getSize() { return size; } From 24fac6b1ee4a97f94016cfb367fc6b400ab26c19 Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Wed, 1 May 2024 14:37:24 -0700 Subject: [PATCH 10/13] unite test increase coverage local execution time 2s330ms Signed-off-by: bowenlan-amzn --- .../DateHistogramAggregatorTests.java | 130 +++++++++--------- 1 file changed, 62 insertions(+), 68 deletions(-) diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java index 4510dab98299a..dbc951b97ff0a 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java @@ -38,7 +38,6 @@ import org.apache.lucene.document.SortedNumericDocValuesField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; -import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.MatchNoDocsQuery; @@ -64,7 +63,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; @@ -1459,68 +1460,60 @@ private void testSearchCase( } } - public void testMultiRangeDebug() throws IOException { - testFilterRewrite( - new MatchAllDocsQuery(), - Arrays.asList( - "2017-02-01T09:02:00.000Z", - "2017-02-01T09:35:00.000Z", - "2017-02-01T10:15:00.000Z", - "2017-02-01T13:06:00.000Z", - "2017-02-01T14:04:00.000Z", - "2017-02-01T14:05:00.000Z", - "2017-02-01T15:59:00.000Z", - "2017-02-01T16:06:00.000Z", - "2017-02-01T16:48:00.000Z", - "2017-02-01T16:59:00.000Z" - ), + public void testMultiRangeTraversal() throws IOException { + + Map dataset = new HashMap<>(); + dataset.put("2017-02-01T09:02:00.000Z", randomIntBetween(100, 2000)); + dataset.put("2017-02-01T09:59:59.999Z", randomIntBetween(100, 2000)); + dataset.put("2017-02-01T10:00:00.001Z", randomIntBetween(100, 2000)); + dataset.put("2017-02-01T13:06:00.000Z", randomIntBetween(100, 2000)); + dataset.put("2017-02-01T14:04:00.000Z", randomIntBetween(100, 2000)); + dataset.put("2017-02-01T14:05:00.000Z", randomIntBetween(100, 2000)); + dataset.put("2017-02-01T15:59:00.000Z", randomIntBetween(100, 2000)); + dataset.put("2017-02-01T16:06:00.000Z", randomIntBetween(100, 2000)); + dataset.put("2017-02-01T16:48:00.000Z", randomIntBetween(100, 2000)); + dataset.put("2017-02-01T16:59:00.000Z", randomIntBetween(100, 2000)); + + testFilterRewriteCase( + LongPoint.newRangeQuery(AGGREGABLE_DATE, asLong("2017-01-01T09:00:00.000Z"), asLong("2017-02-01T16:00:00.000Z")), + dataset, aggregation -> aggregation.fixedInterval(new DateHistogramInterval("60m")).field(AGGREGABLE_DATE).minDocCount(1L), histogram -> { List buckets = histogram.getBuckets(); - assertEquals(6, buckets.size()); + assertEquals(5, buckets.size()); Histogram.Bucket bucket = buckets.get(0); assertEquals("2017-02-01T09:00:00.000Z", bucket.getKeyAsString()); - assertEquals(2, bucket.getDocCount()); + int expected = dataset.get("2017-02-01T09:02:00.000Z") + dataset.get("2017-02-01T09:59:59.999Z"); + assertEquals(expected, bucket.getDocCount()); bucket = buckets.get(1); assertEquals("2017-02-01T10:00:00.000Z", bucket.getKeyAsString()); - assertEquals(1, bucket.getDocCount()); + expected = dataset.get("2017-02-01T10:00:00.001Z"); + assertEquals(expected, bucket.getDocCount()); bucket = buckets.get(2); assertEquals("2017-02-01T13:00:00.000Z", bucket.getKeyAsString()); - assertEquals(1, bucket.getDocCount()); + expected = dataset.get("2017-02-01T13:06:00.000Z"); + assertEquals(expected, bucket.getDocCount()); bucket = buckets.get(3); assertEquals("2017-02-01T14:00:00.000Z", bucket.getKeyAsString()); - assertEquals(2, bucket.getDocCount()); + expected = dataset.get("2017-02-01T14:04:00.000Z") + dataset.get("2017-02-01T14:05:00.000Z"); + assertEquals(expected, bucket.getDocCount()); bucket = buckets.get(4); assertEquals("2017-02-01T15:00:00.000Z", bucket.getKeyAsString()); - assertEquals(1, bucket.getDocCount()); - - bucket = buckets.get(5); - assertEquals("2017-02-01T16:00:00.000Z", bucket.getKeyAsString()); - assertEquals(3, bucket.getDocCount()); + expected = dataset.get("2017-02-01T15:59:00.000Z"); + assertEquals(expected, bucket.getDocCount()); }, false, - 0 + collectorCount -> assertEquals(0, (int) collectorCount) ); - testFilterRewrite( + testFilterRewriteCase( new MatchAllDocsQuery(), - Arrays.asList( - "2017-02-01T09:02:00.000Z", - "2017-02-01T09:35:00.000Z", - "2017-02-01T10:15:00.000Z", - "2017-02-01T13:06:00.000Z", - "2017-02-01T14:04:00.000Z", - "2017-02-01T14:05:00.000Z", - "2017-02-01T15:59:00.000Z", - "2017-02-01T16:06:00.000Z", - "2017-02-01T16:48:00.000Z", - "2017-02-01T16:59:00.000Z" - ), + dataset, aggregation -> aggregation.fixedInterval(new DateHistogramInterval("60m")).field(AGGREGABLE_DATE).minDocCount(1L), histogram -> { List buckets = histogram.getBuckets(); @@ -1528,65 +1521,66 @@ public void testMultiRangeDebug() throws IOException { Histogram.Bucket bucket = buckets.get(0); assertEquals("2017-02-01T09:00:00.000Z", bucket.getKeyAsString()); - assertEquals(6, bucket.getDocCount()); + int expected = dataset.get("2017-02-01T09:02:00.000Z") + dataset.get("2017-02-01T09:59:59.999Z") + 4; + assertEquals(expected, bucket.getDocCount()); bucket = buckets.get(1); assertEquals("2017-02-01T10:00:00.000Z", bucket.getKeyAsString()); - assertEquals(1, bucket.getDocCount()); + expected = dataset.get("2017-02-01T10:00:00.001Z"); + assertEquals(expected, bucket.getDocCount()); bucket = buckets.get(2); assertEquals("2017-02-01T13:00:00.000Z", bucket.getKeyAsString()); - assertEquals(1, bucket.getDocCount()); + expected = dataset.get("2017-02-01T13:06:00.000Z"); + assertEquals(expected, bucket.getDocCount()); bucket = buckets.get(3); assertEquals("2017-02-01T14:00:00.000Z", bucket.getKeyAsString()); - assertEquals(2, bucket.getDocCount()); + expected = dataset.get("2017-02-01T14:04:00.000Z") + dataset.get("2017-02-01T14:05:00.000Z"); + assertEquals(expected, bucket.getDocCount()); bucket = buckets.get(4); assertEquals("2017-02-01T15:00:00.000Z", bucket.getKeyAsString()); - assertEquals(1, bucket.getDocCount()); + expected = dataset.get("2017-02-01T15:59:00.000Z"); + assertEquals(expected, bucket.getDocCount()); bucket = buckets.get(5); assertEquals("2017-02-01T16:00:00.000Z", bucket.getKeyAsString()); - assertEquals(3, bucket.getDocCount()); + expected = dataset.get("2017-02-01T16:06:00.000Z") + dataset.get("2017-02-01T16:48:00.000Z") + dataset.get( + "2017-02-01T16:59:00.000Z" + ); + assertEquals(expected, bucket.getDocCount()); }, true, - 10 + collectCount -> assertTrue(collectCount > 0) ); } - private void testFilterRewrite( + private void testFilterRewriteCase( Query query, - List dataset, + Map dataset, Consumer configure, Consumer verify, boolean useDocCountField, - int actualCollectCount + Consumer verifyCollectCount ) throws IOException { - - DateFieldMapper.DateFieldType fieldType = aggregableDateFieldType(randomBoolean(), true); + DateFieldMapper.DateFieldType fieldType = aggregableDateFieldType(false, true); try (Directory directory = newDirectory()) { - - try ( - RandomIndexWriter indexWriter = new RandomIndexWriter( - random(), - directory, - newIndexWriterConfig().setMergePolicy(NoMergePolicy.INSTANCE) - ) - ) { + try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) { Document document = new Document(); if (useDocCountField) { // add the doc count field to the first document document.add(new NumericDocValuesField(DocCountFieldMapper.NAME, 5)); } - for (String date : dataset) { - long instant = asLong(date, fieldType); - document.add(new SortedNumericDocValuesField(AGGREGABLE_DATE, instant)); - document.add(new LongPoint(AGGREGABLE_DATE, instant)); - document.add(new LongPoint(SEARCHABLE_DATE, instant)); - indexWriter.addDocument(document); - document.clear(); + for (Map.Entry date : dataset.entrySet()) { + for (int i = 0; i < date.getValue(); i++) { + long instant = asLong(date.getKey(), fieldType); + document.add(new SortedNumericDocValuesField(AGGREGABLE_DATE, instant)); + document.add(new LongPoint(AGGREGABLE_DATE, instant)); + indexWriter.addDocument(document); + document.clear(); + } } } @@ -1619,7 +1613,7 @@ private void testFilterRewrite( verify.accept(histogram); - assertEquals(aggregator.getCollectCount().get(), actualCollectCount); + verifyCollectCount.accept(aggregator.getCollectCount().get()); } } } From 64daab840b954d3dc86cef5d963f53e5301986ee Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Wed, 1 May 2024 15:05:33 -0700 Subject: [PATCH 11/13] update rest tests Signed-off-by: bowenlan-amzn --- .../test/search.aggregation/10_histogram.yml | 2 +- .../test/search.aggregation/230_composite.yml | 2 +- .../330_auto_date_histogram.yml | 28 ++++++++++++++++++- 3 files changed, 29 insertions(+), 3 deletions(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/10_histogram.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/10_histogram.yml index 1de1801d1dbc1..fa71137912a91 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/10_histogram.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/10_histogram.yml @@ -649,7 +649,7 @@ setup: "date_histogram profiler shows filter rewrite info": - skip: version: " - 2.99.99" - reason: debug info for filter rewrite added in 3.0.0 + reason: debug info for filter rewrite added in 3.0.0 (to be backported to 2.14.0) - do: indices.create: diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/230_composite.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/230_composite.yml index 310bd11b47466..3a0099dae3b33 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/230_composite.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/230_composite.yml @@ -1074,7 +1074,7 @@ setup: "composite aggregation date_histogram profile shows filter rewrite info": - skip: version: " - 2.99.99" - reason: debug info for filter rewrite added in 3.0.0 + reason: debug info for filter rewrite added in 3.0.0 (to be backported to 2.14.0) - do: indices.create: diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/330_auto_date_histogram.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/330_auto_date_histogram.yml index 83dc3d8150b88..1356eac41ae79 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/330_auto_date_histogram.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/330_auto_date_histogram.yml @@ -78,11 +78,33 @@ setup: - match: { aggregations.histo.buckets.1.v.value: 7 } - match: { aggregations.histo_avg_v.value: 5 } +--- +"profile at top level": + - skip: + version: " - 7.99.99" + reason: Debug information added in 8.0.0 (to be backported to 7.9.0) + + - do: + search: + body: + profile: true + size: 0 + aggs: + histo: + auto_date_histogram: + field: date + buckets: 2 + + - match: { hits.total.value: 4 } + - length: { aggregations.histo.buckets: 2 } + - match: { profile.shards.0.aggregations.0.type: AutoDateHistogramAggregator.FromSingle } + - match: { profile.shards.0.aggregations.0.debug.surviving_buckets: 4 } + --- "auto_date_histogram profile shows filter rewrite info": - skip: version: " - 2.99.99" - reason: debug info for filter rewrite added in 3.0.0 + reason: debug info for filter rewrite added in 3.0.0 (to be backported to 2.14.0) - do: search: @@ -99,3 +121,7 @@ setup: - length: { aggregations.histo.buckets: 2 } - match: { profile.shards.0.aggregations.0.type: AutoDateHistogramAggregator.FromSingle } - match: { profile.shards.0.aggregations.0.debug.surviving_buckets: 4 } + - match: { profile.shards.0.aggregations.0.debug.optimized_segments: 1 } + - match: { profile.shards.0.aggregations.0.debug.unoptimized_segments: 0 } + - match: { profile.shards.0.aggregations.0.debug.leaf_visited: 1 } + - match: { profile.shards.0.aggregations.0.debug.inner_visited: 0 } From d53a3789be230e86bf591fa10b92e41f0fcfa06f Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Thu, 2 May 2024 01:04:52 -0700 Subject: [PATCH 12/13] Update the default value of max_aggregation_rewrite_filters to 3000 This threshold should enable the optimization for these 4 benchmark workloads: nyc_taxis, big5, pmc, http_logs Signed-off-by: bowenlan-amzn --- server/src/main/java/org/opensearch/search/SearchService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index 6b3620e65a271..744d3a19f1593 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -275,7 +275,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv // value 0 means rewrite filters optimization in aggregations will be disabled public static final Setting MAX_AGGREGATION_REWRITE_FILTERS = Setting.intSetting( "search.max_aggregation_rewrite_filters", - 72, + 3000, 0, Property.Dynamic, Property.NodeScope From c605a03ac791dee55e0a50611ca75704ef9d18a7 Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Thu, 2 May 2024 13:23:26 -0700 Subject: [PATCH 13/13] Increase code cov Signed-off-by: bowenlan-amzn --- .../bucket/FastFilterRewriteHelper.java | 2 - .../DateHistogramAggregatorTests.java | 117 +++++++++++++++--- 2 files changed, 101 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java index 9529d678b0022..c8ce39a52f869 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java @@ -613,7 +613,6 @@ private void visitPoints(byte[] packedValue, CheckedRunnable collec private boolean pointCompare(byte[] lower, byte[] upper, byte[] packedValue) { if (compareByteValue(packedValue, lower) < 0) { - return false; } return compareByteValue(packedValue, upper) <= 0; @@ -633,7 +632,6 @@ public PointValues.Relation compare(byte[] minPackedValue, byte[] maxPackedValue if (collector.iterateRangeEnd(minPackedValue, this::compareByteValue)) { throw new CollectionTerminatedException(); } - // compare the next range with this node's min max again // new rangeMin = previous rangeMax + 1 <= min rangeMax = collector.activeRangeAsByteArray[1]; diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java index dbc951b97ff0a..cf95999ec5086 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java @@ -38,12 +38,15 @@ import org.apache.lucene.document.SortedNumericDocValuesField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.MatchNoDocsQuery; import org.apache.lucene.search.Query; import org.apache.lucene.store.Directory; import org.apache.lucene.tests.index.RandomIndexWriter; +import org.apache.lucene.tests.util.TestUtil; import org.opensearch.common.time.DateFormatters; import org.opensearch.core.common.breaker.CircuitBreaker; import org.opensearch.core.indices.breaker.NoneCircuitBreakerService; @@ -1461,7 +1464,6 @@ private void testSearchCase( } public void testMultiRangeTraversal() throws IOException { - Map dataset = new HashMap<>(); dataset.put("2017-02-01T09:02:00.000Z", randomIntBetween(100, 2000)); dataset.put("2017-02-01T09:59:59.999Z", randomIntBetween(100, 2000)); @@ -1508,8 +1510,67 @@ public void testMultiRangeTraversal() throws IOException { assertEquals(expected, bucket.getDocCount()); }, false, - collectorCount -> assertEquals(0, (int) collectorCount) + collectorCount -> assertEquals(0, (int) collectorCount), + true ); + } + + public void testMultiRangeTraversalFixedData() throws IOException { + Map dataset = new HashMap<>(); + dataset.put("2017-02-01T09:02:00.000Z", 512); + dataset.put("2017-02-01T09:59:59.999Z", 256); + dataset.put("2017-02-01T10:00:00.001Z", 256); + dataset.put("2017-02-01T13:06:00.000Z", 512); + dataset.put("2017-02-01T14:04:00.000Z", 256); + dataset.put("2017-02-01T14:05:00.000Z", 256); + dataset.put("2017-02-01T15:59:00.000Z", 768); + + testFilterRewriteCase( + LongPoint.newRangeQuery(AGGREGABLE_DATE, asLong("2017-01-01T09:00:00.000Z"), asLong("2017-02-01T14:04:01.000Z")), + dataset, + aggregation -> aggregation.fixedInterval(new DateHistogramInterval("60m")).field(AGGREGABLE_DATE).minDocCount(1L), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(4, buckets.size()); + + Histogram.Bucket bucket = buckets.get(0); + assertEquals("2017-02-01T09:00:00.000Z", bucket.getKeyAsString()); + int expected = dataset.get("2017-02-01T09:02:00.000Z") + dataset.get("2017-02-01T09:59:59.999Z"); + assertEquals(expected, bucket.getDocCount()); + + bucket = buckets.get(1); + assertEquals("2017-02-01T10:00:00.000Z", bucket.getKeyAsString()); + expected = dataset.get("2017-02-01T10:00:00.001Z"); + assertEquals(expected, bucket.getDocCount()); + + bucket = buckets.get(2); + assertEquals("2017-02-01T13:00:00.000Z", bucket.getKeyAsString()); + expected = dataset.get("2017-02-01T13:06:00.000Z"); + assertEquals(expected, bucket.getDocCount()); + + bucket = buckets.get(3); + assertEquals("2017-02-01T14:00:00.000Z", bucket.getKeyAsString()); + expected = dataset.get("2017-02-01T14:04:00.000Z"); + assertEquals(expected, bucket.getDocCount()); + }, + false, + collectorCount -> assertEquals(0, (int) collectorCount), + false + ); + } + + public void testMultiRangeTraversalNotApplicable() throws IOException { + Map dataset = new HashMap<>(); + dataset.put("2017-02-01T09:02:00.000Z", randomIntBetween(100, 2000)); + dataset.put("2017-02-01T09:59:59.999Z", randomIntBetween(100, 2000)); + dataset.put("2017-02-01T10:00:00.001Z", randomIntBetween(100, 2000)); + dataset.put("2017-02-01T13:06:00.000Z", randomIntBetween(100, 2000)); + dataset.put("2017-02-01T14:04:00.000Z", randomIntBetween(100, 2000)); + dataset.put("2017-02-01T14:05:00.000Z", randomIntBetween(100, 2000)); + dataset.put("2017-02-01T15:59:00.000Z", randomIntBetween(100, 2000)); + dataset.put("2017-02-01T16:06:00.000Z", randomIntBetween(100, 2000)); + dataset.put("2017-02-01T16:48:00.000Z", randomIntBetween(100, 2000)); + dataset.put("2017-02-01T16:59:00.000Z", randomIntBetween(100, 2000)); testFilterRewriteCase( new MatchAllDocsQuery(), @@ -1552,7 +1613,8 @@ public void testMultiRangeTraversal() throws IOException { assertEquals(expected, bucket.getDocCount()); }, true, - collectCount -> assertTrue(collectCount > 0) + collectCount -> assertTrue(collectCount > 0), + true ); } @@ -1562,25 +1624,48 @@ private void testFilterRewriteCase( Consumer configure, Consumer verify, boolean useDocCountField, - Consumer verifyCollectCount + Consumer verifyCollectCount, + boolean randomWrite ) throws IOException { DateFieldMapper.DateFieldType fieldType = aggregableDateFieldType(false, true); try (Directory directory = newDirectory()) { - try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) { - Document document = new Document(); - if (useDocCountField) { - // add the doc count field to the first document - document.add(new NumericDocValuesField(DocCountFieldMapper.NAME, 5)); + if (randomWrite) { + try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) { + Document document = new Document(); + if (useDocCountField) { + // add the doc count field to the first document + document.add(new NumericDocValuesField(DocCountFieldMapper.NAME, 5)); + } + for (Map.Entry date : dataset.entrySet()) { + for (int i = 0; i < date.getValue(); i++) { + long instant = asLong(date.getKey(), fieldType); + document.add(new SortedNumericDocValuesField(AGGREGABLE_DATE, instant)); + document.add(new LongPoint(AGGREGABLE_DATE, instant)); + indexWriter.addDocument(document); + document.clear(); + } + } } - for (Map.Entry date : dataset.entrySet()) { - for (int i = 0; i < date.getValue(); i++) { - long instant = asLong(date.getKey(), fieldType); - document.add(new SortedNumericDocValuesField(AGGREGABLE_DATE, instant)); - document.add(new LongPoint(AGGREGABLE_DATE, instant)); - indexWriter.addDocument(document); - document.clear(); + } else { + // use default codec so max points in leaf is fixed to 512, to cover the node level visit and compare logic + try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig().setCodec(TestUtil.getDefaultCodec()))) { + List documents = new ArrayList<>(); + for (Map.Entry date : dataset.entrySet()) { + for (int i = 0; i < date.getValue(); i++) { + Document document = new Document(); + if (useDocCountField) { + // add the doc count field once + document.add(new NumericDocValuesField(DocCountFieldMapper.NAME, 5)); + useDocCountField = false; + } + long instant = asLong(date.getKey(), fieldType); + document.add(new SortedNumericDocValuesField(AGGREGABLE_DATE, instant)); + document.add(new LongPoint(AGGREGABLE_DATE, instant)); + documents.add(document); + } } + indexWriter.addDocuments(documents); } }