From 85bad70480e2aef432d667c9ad8a9da86f7ea2b1 Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Tue, 23 Jan 2024 21:19:06 -0800 Subject: [PATCH 01/14] Refactoring Signed-off-by: bowenlan-amzn --- .../bucket/FastFilterRewriteHelper.java | 215 +++++++++++------- .../bucket/composite/CompositeAggregator.java | 49 +++- .../AutoDateHistogramAggregator.java | 77 +++++-- .../histogram/DateHistogramAggregator.java | 45 +++- .../DateHistogramAggregatorTests.java | 58 ++--- 5 files changed, 291 insertions(+), 153 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java index f377287d0b3bd..5db9a0bc21b64 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java @@ -18,16 +18,15 @@ import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.Weight; import org.apache.lucene.util.NumericUtils; -import org.opensearch.common.CheckedFunction; import org.opensearch.common.Rounding; import org.opensearch.common.lucene.search.function.FunctionScoreQuery; import org.opensearch.index.mapper.DateFieldMapper; import org.opensearch.index.mapper.MappedFieldType; import org.opensearch.index.query.DateRangeIncludingNowQuery; -import org.opensearch.search.DocValueFormat; -import org.opensearch.search.aggregations.bucket.composite.CompositeKey; +import org.opensearch.search.aggregations.bucket.composite.CompositeAggregator; import org.opensearch.search.aggregations.bucket.composite.CompositeValuesSourceConfig; import org.opensearch.search.aggregations.bucket.composite.RoundingValuesSource; +import org.opensearch.search.aggregations.bucket.histogram.LongBounds; import org.opensearch.search.internal.SearchContext; import java.io.IOException; @@ -37,7 +36,6 @@ import java.util.OptionalLong; import java.util.function.BiConsumer; import java.util.function.Function; -import java.util.function.Supplier; /** * Utility class to help rewrite aggregations into filters. @@ -195,8 +193,6 @@ public static class FastFilterContext { private Weight[] filters = null; public AggregationType aggregationType; - public FastFilterContext() {} - private void setFilters(Weight[] filters) { this.filters = filters; } @@ -212,45 +208,50 @@ public boolean isRewriteable(final Object parent, final int subAggLength) { /** * This filter build method is for date histogram aggregation type * - * @param computeBounds get the lower and upper bound of the field in a shard search - * @param roundingFunction produce Rounding that contains interval of date range. - * Rounding is computed dynamically using the bounds in AutoDateHistogram - * @param preparedRoundingSupplier produce PreparedRounding to round values at call-time */ - public void buildFastFilter( - SearchContext context, - CheckedFunction computeBounds, - Function roundingFunction, - Supplier preparedRoundingSupplier - ) throws IOException { - assert this.aggregationType instanceof DateHistogramAggregationType; - DateHistogramAggregationType aggregationType = (DateHistogramAggregationType) this.aggregationType; - DateFieldMapper.DateFieldType fieldType = aggregationType.getFieldType(); - final long[] bounds = computeBounds.apply(aggregationType); - if (bounds == null) return; - - final Rounding rounding = roundingFunction.apply(bounds); - final OptionalLong intervalOpt = Rounding.getInterval(rounding); - if (intervalOpt.isEmpty()) return; - final long interval = intervalOpt.getAsLong(); - - // afterKey is the last bucket key in previous response, while the bucket key - // is the start of the bucket values, so add the interval - if (aggregationType instanceof CompositeAggregationType && ((CompositeAggregationType) aggregationType).afterKey != -1) { - bounds[0] = ((CompositeAggregationType) aggregationType).afterKey + interval; - } - - final Weight[] filters = FastFilterRewriteHelper.createFilterForAggregations( - context, - interval, - preparedRoundingSupplier.get(), - fieldType.name(), - fieldType, - bounds[0], - bounds[1] - ); + // public void buildFastFilter( + // SearchContext context, + // CheckedFunction computeBounds, + // Function roundingFunction, + // Supplier preparedRoundingSupplier + // ) throws IOException { + // assert this.aggregationType instanceof DateHistogramAggregationType; + // DateHistogramAggregationType aggregationType = (DateHistogramAggregationType) this.aggregationType; + // DateFieldMapper.DateFieldType fieldType = aggregationType.getFieldType(); + // final long[] bounds = computeBounds.apply(aggregationType); + // if (bounds == null) return; + // + // final Rounding rounding = roundingFunction.apply(bounds); + // final OptionalLong intervalOpt = Rounding.getInterval(rounding); + // if (intervalOpt.isEmpty()) return; + // final long interval = intervalOpt.getAsLong(); + // + // // afterKey is the last bucket key in previous response, while the bucket key + // // is the start of the bucket values, so add the interval + // if (aggregationType instanceof CompositeAggregationType && ((CompositeAggregationType) aggregationType).afterKey != -1) { + // bounds[0] = ((CompositeAggregationType) aggregationType).afterKey + interval; + // } + // + // final Weight[] filters = FastFilterRewriteHelper.createFilterForAggregations( + // context, + // interval, + // preparedRoundingSupplier.get(), + // fieldType.name(), + // fieldType, + // bounds[0], + // bounds[1] + // ); + // this.setFilters(filters); + // } + + public void buildFastFilter(SearchContext context) throws IOException { + Weight[] filters = this.aggregationType.buildFastFilter(context); this.setFilters(filters); } + + // this method should delegate to specific aggregation type + + // can this all be triggered in aggregation type? } /** @@ -258,22 +259,29 @@ public void buildFastFilter( */ public interface AggregationType { boolean isRewriteable(Object parent, int subAggLength); + Weight[] buildFastFilter(SearchContext context) throws IOException; } /** * For date histogram aggregation */ - public static class DateHistogramAggregationType implements AggregationType { + public static abstract class AbstractDateHistogramAggregationType implements AggregationType { private final MappedFieldType fieldType; private final boolean missing; private final boolean hasScript; + private LongBounds hardBounds = null; - public DateHistogramAggregationType(MappedFieldType fieldType, boolean missing, boolean hasScript) { + public AbstractDateHistogramAggregationType(MappedFieldType fieldType, boolean missing, boolean hasScript) { this.fieldType = fieldType; this.missing = missing; this.hasScript = hasScript; } + public AbstractDateHistogramAggregationType(MappedFieldType fieldType, boolean missing, boolean hasScript, LongBounds hardBounds) { + this(fieldType, missing, hasScript); + this.hardBounds = hardBounds; + } + @Override public boolean isRewriteable(Object parent, int subAggLength) { if (parent == null && subAggLength == 0 && !missing && !hasScript) { @@ -282,6 +290,57 @@ public boolean isRewriteable(Object parent, int subAggLength) { return false; } + @Override + public Weight[] buildFastFilter(SearchContext context) throws IOException { + // the procedure in this method can be separated + // 1. compute bound + // 2. get rounding, interval + // 3. get prepared rounding, better combined with step 2 + Weight[] result = {}; + + long[] bounds = computeBounds(context); + if (bounds == null) return result; + + final Rounding rounding = getRounding(bounds[0], bounds[1]); + final OptionalLong intervalOpt = Rounding.getInterval(rounding); + if (intervalOpt.isEmpty()) return result; + final long interval = intervalOpt.getAsLong(); + + // afterKey is the last bucket key in previous response, while the bucket key + // is the start of the bucket values, so add the interval + // if (aggregationType instanceof CompositeAggregationType && ((CompositeAggregationType) aggregationType).afterKey != -1) { + // bounds[0] = ((CompositeAggregationType) aggregationType).afterKey + interval; + // } + processAfterKey(bounds, interval); + + return FastFilterRewriteHelper.createFilterForAggregations( + context, + interval, + getRoundingPrepared(), + fieldType.name(), + (DateFieldMapper.DateFieldType) fieldType, + bounds[0], + bounds[1] + ); + } + + protected long[] computeBounds(SearchContext context) throws IOException { + final long[] bounds = getAggregationBounds(context, fieldType.name()); + if (bounds != null) { + // Update min/max limit if user specified any hard bounds + if (hardBounds != null) { + bounds[0] = Math.max(bounds[0], hardBounds.getMin()); + bounds[1] = Math.min(bounds[1], hardBounds.getMax() - 1); // hard bounds max is exclusive + } + } + return bounds; + } + + protected abstract Rounding getRounding(final long low, final long high); + protected abstract Rounding.Prepared getRoundingPrepared(); + + protected void processAfterKey(long[] bound, long interval) {}; + public DateFieldMapper.DateFieldType getFieldType() { assert fieldType instanceof DateFieldMapper.DateFieldType; return (DateFieldMapper.DateFieldType) fieldType; @@ -291,36 +350,36 @@ public DateFieldMapper.DateFieldType getFieldType() { /** * For composite aggregation with date histogram as a source */ - public static class CompositeAggregationType extends DateHistogramAggregationType { - private final RoundingValuesSource valuesSource; - private long afterKey = -1L; - private final int size; - - public CompositeAggregationType( - CompositeValuesSourceConfig[] sourceConfigs, - CompositeKey rawAfterKey, - List formats, - int size - ) { - super(sourceConfigs[0].fieldType(), sourceConfigs[0].missingBucket(), sourceConfigs[0].hasScript()); - this.valuesSource = (RoundingValuesSource) sourceConfigs[0].valuesSource(); - this.size = size; - if (rawAfterKey != null) { - assert rawAfterKey.size() == 1 && formats.size() == 1; - this.afterKey = formats.get(0).parseLong(rawAfterKey.get(0).toString(), false, () -> { - throw new IllegalArgumentException("now() is not supported in [after] key"); - }); - } - } - - public Rounding getRounding() { - return valuesSource.getRounding(); - } - - public Rounding.Prepared getRoundingPreparer() { - return valuesSource.getPreparedRounding(); - } - } + // public static class CompositeAggregationType extends DateHistogramAggregationType { + // private final RoundingValuesSource valuesSource; + // private long afterKey = -1L; + // private final int size; + // + // public CompositeAggregationType( + // CompositeValuesSourceConfig[] sourceConfigs, + // CompositeKey rawAfterKey, + // List formats, + // int size + // ) { + // super(sourceConfigs[0].fieldType(), sourceConfigs[0].missingBucket(), sourceConfigs[0].hasScript()); + // this.valuesSource = (RoundingValuesSource) sourceConfigs[0].valuesSource(); + // this.size = size; + // if (rawAfterKey != null) { + // assert rawAfterKey.size() == 1 && formats.size() == 1; + // this.afterKey = formats.get(0).parseLong(rawAfterKey.get(0).toString(), false, () -> { + // throw new IllegalArgumentException("now() is not supported in [after] key"); + // }); + // } + // } + // + // public Rounding getRounding() { + // return valuesSource.getRounding(); + // } + // + // public Rounding.Prepared getRoundingPreparer() { + // return valuesSource.getPreparedRounding(); + // } + // } public static boolean isCompositeAggRewriteable(CompositeValuesSourceConfig[] sourceConfigs) { return sourceConfigs.length == 1 && sourceConfigs[0].valuesSource() instanceof RoundingValuesSource; @@ -364,14 +423,14 @@ public static boolean tryFastFilterAggregation( for (i = 0; i < filters.length; i++) { if (counts[i] > 0) { long bucketKey = i; // the index of filters is the key for filters aggregation - if (fastFilterContext.aggregationType instanceof DateHistogramAggregationType) { - final DateFieldMapper.DateFieldType fieldType = ((DateHistogramAggregationType) fastFilterContext.aggregationType) + if (fastFilterContext.aggregationType instanceof AbstractDateHistogramAggregationType) { + final DateFieldMapper.DateFieldType fieldType = ((AbstractDateHistogramAggregationType) fastFilterContext.aggregationType) .getFieldType(); bucketKey = fieldType.convertNanosToMillis( NumericUtils.sortableBytesToLong(((PointRangeQuery) filters[i].getQuery()).getLowerPoint(), 0) ); - if (fastFilterContext.aggregationType instanceof CompositeAggregationType) { - size = ((CompositeAggregationType) fastFilterContext.aggregationType).size; + if (fastFilterContext.aggregationType instanceof CompositeAggregator.CompositeAggregationType) { + size = ((CompositeAggregator.CompositeAggregationType) fastFilterContext.aggregationType).getSize(); } } incrementDocCount.accept(bucketKey, counts[i]); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java index 822b8a6c4b118..ab8641eaf8a90 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -97,7 +97,7 @@ * * @opensearch.internal */ -final class CompositeAggregator extends BucketsAggregator { +public final class CompositeAggregator extends BucketsAggregator { private final int size; private final List sourceNames; private final int[] reverseMuls; @@ -167,24 +167,55 @@ final class CompositeAggregator extends BucketsAggregator { fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(); if (!FastFilterRewriteHelper.isCompositeAggRewriteable(sourceConfigs)) return; fastFilterContext.setAggregationType( - new FastFilterRewriteHelper.CompositeAggregationType(sourceConfigs, rawAfterKey, formats, size) + new CompositeAggregationType() ); if (fastFilterContext.isRewriteable(parent, subAggregators.length)) { // bucketOrds is the data structure for saving date histogram results bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), CardinalityUpperBound.ONE); // Currently the filter rewrite is only supported for date histograms - FastFilterRewriteHelper.CompositeAggregationType aggregationType = - (FastFilterRewriteHelper.CompositeAggregationType) fastFilterContext.aggregationType; - preparedRounding = aggregationType.getRoundingPreparer(); + // FastFilterRewriteHelper.CompositeAggregationType aggregationType = + // (FastFilterRewriteHelper.CompositeAggregationType) fastFilterContext.aggregationType; + // preparedRounding = aggregationType.getRoundingPreparer(); fastFilterContext.buildFastFilter( - context, - fc -> FastFilterRewriteHelper.getAggregationBounds(context, fc.getFieldType().name()), - x -> aggregationType.getRounding(), - () -> preparedRounding + context ); } } + public class CompositeAggregationType extends FastFilterRewriteHelper.AbstractDateHistogramAggregationType { + private final RoundingValuesSource valuesSource; + private long afterKey = -1L; + + public CompositeAggregationType() { + super(sourceConfigs[0].fieldType(), sourceConfigs[0].missingBucket(), sourceConfigs[0].hasScript()); + this.valuesSource = (RoundingValuesSource) sourceConfigs[0].valuesSource(); + if (rawAfterKey != null) { + assert rawAfterKey.size() == 1 && formats.size() == 1; + this.afterKey = formats.get(0).parseLong(rawAfterKey.get(0).toString(), false, () -> { + throw new IllegalArgumentException("now() is not supported in [after] key"); + }); + } + } + + @Override + public Rounding getRounding(final long low, final long high) { + return valuesSource.getRounding(); + } + + @Override + protected void processAfterKey(long[] bound, long interval) { + bound[0] = afterKey + interval; + } + + public Rounding.Prepared getRoundingPrepared() { + return valuesSource.getPreparedRounding(); + } + + public int getSize() { + return size; + } + } + @Override protected void doClose() { try { diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java index 0ea820abbedf4..8a2eea9a0417f 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java @@ -42,6 +42,7 @@ import org.opensearch.common.util.IntArray; import org.opensearch.common.util.LongArray; import org.opensearch.core.common.util.ByteArray; +import org.opensearch.index.mapper.MappedFieldType; import org.opensearch.search.DocValueFormat; import org.opensearch.search.aggregations.Aggregator; import org.opensearch.search.aggregations.AggregatorFactories; @@ -158,43 +159,69 @@ private AutoDateHistogramAggregator( fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(); fastFilterContext.setAggregationType( - new FastFilterRewriteHelper.DateHistogramAggregationType( + new AutoHistogramAggregationType( valuesSourceConfig.fieldType(), valuesSourceConfig.missing() != null, valuesSourceConfig.script() != null ) ); if (fastFilterContext.isRewriteable(parent, subAggregators.length)) { - fastFilterContext.buildFastFilter( - context, - fc -> FastFilterRewriteHelper.getAggregationBounds(context, fc.getFieldType().name()), - b -> getMinimumRounding(b[0], b[1]), - // Passing prepared rounding as supplier to ensure the correct prepared - // rounding is set as it is done during getMinimumRounding - () -> preparedRounding - ); + fastFilterContext.buildFastFilter(context); } } - private Rounding getMinimumRounding(final long low, final long high) { - // max - min / targetBuckets = bestDuration - // find the right innerInterval this bestDuration belongs to - // since we cannot exceed targetBuckets, bestDuration should go up, - // so the right innerInterval should be an upper bound - long bestDuration = (high - low) / targetBuckets; - while (roundingIdx < roundingInfos.length - 1) { - final RoundingInfo curRoundingInfo = roundingInfos[roundingIdx]; - final int temp = curRoundingInfo.innerIntervals[curRoundingInfo.innerIntervals.length - 1]; - // If the interval duration is covered by the maximum inner interval, - // we can start with this outer interval for creating the buckets - if (bestDuration <= temp * curRoundingInfo.roughEstimateDurationMillis) { - break; + // private Rounding getMinimumRounding(final long low, final long high) { + // // max - min / targetBuckets = bestDuration + // // find the right innerInterval this bestDuration belongs to + // // since we cannot exceed targetBuckets, bestDuration should go up, + // // so the right innerInterval should be an upper bound + // long bestDuration = (high - low) / targetBuckets; + // while (roundingIdx < roundingInfos.length - 1) { + // final RoundingInfo curRoundingInfo = roundingInfos[roundingIdx]; + // final int temp = curRoundingInfo.innerIntervals[curRoundingInfo.innerIntervals.length - 1]; + // // If the interval duration is covered by the maximum inner interval, + // // we can start with this outer interval for creating the buckets + // if (bestDuration <= temp * curRoundingInfo.roughEstimateDurationMillis) { + // break; + // } + // roundingIdx++; + // } + // + // preparedRounding = prepareRounding(roundingIdx); + // return roundingInfos[roundingIdx].rounding; + // } + + private class AutoHistogramAggregationType extends FastFilterRewriteHelper.AbstractDateHistogramAggregationType { + public AutoHistogramAggregationType(MappedFieldType fieldType, boolean missing, boolean hasScript) { + super(fieldType, missing, hasScript); + } + + @Override + protected Rounding getRounding(final long low, final long high) { + // max - min / targetBuckets = bestDuration + // find the right innerInterval this bestDuration belongs to + // since we cannot exceed targetBuckets, bestDuration should go up, + // so the right innerInterval should be an upper bound + long bestDuration = (high - low) / targetBuckets; + while (roundingIdx < roundingInfos.length - 1) { + final RoundingInfo curRoundingInfo = roundingInfos[roundingIdx]; + final int temp = curRoundingInfo.innerIntervals[curRoundingInfo.innerIntervals.length - 1]; + // If the interval duration is covered by the maximum inner interval, + // we can start with this outer interval for creating the buckets + if (bestDuration <= temp * curRoundingInfo.roughEstimateDurationMillis) { + break; + } + roundingIdx++; } - roundingIdx++; + + preparedRounding = prepareRounding(roundingIdx); + return roundingInfos[roundingIdx].rounding; } - preparedRounding = prepareRounding(roundingIdx); - return roundingInfos[roundingIdx].rounding; + @Override + protected Prepared getRoundingPrepared() { + return preparedRounding; + } } protected abstract LongKeyedBucketOrds getBucketOrds(); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java index b95bd093b82a6..fd0cf1ab3f8b1 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java @@ -39,6 +39,7 @@ import org.opensearch.common.Nullable; import org.opensearch.common.Rounding; import org.opensearch.common.lease.Releasables; +import org.opensearch.index.mapper.MappedFieldType; import org.opensearch.search.DocValueFormat; import org.opensearch.search.aggregations.Aggregator; import org.opensearch.search.aggregations.AggregatorFactories; @@ -117,27 +118,45 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(); fastFilterContext.setAggregationType( - new FastFilterRewriteHelper.DateHistogramAggregationType( + new DateHistogramAggregationType( valuesSourceConfig.fieldType(), valuesSourceConfig.missing() != null, - valuesSourceConfig.script() != null + valuesSourceConfig.script() != null, + hardBounds ) ); if (fastFilterContext.isRewriteable(parent, subAggregators.length)) { - fastFilterContext.buildFastFilter(context, this::computeBounds, x -> rounding, () -> preparedRounding); + fastFilterContext.buildFastFilter(context); } } - private long[] computeBounds(final FastFilterRewriteHelper.DateHistogramAggregationType fieldContext) throws IOException { - final long[] bounds = FastFilterRewriteHelper.getAggregationBounds(context, fieldContext.getFieldType().name()); - if (bounds != null) { - // Update min/max limit if user specified any hard bounds - if (hardBounds != null) { - bounds[0] = Math.max(bounds[0], hardBounds.getMin()); - bounds[1] = Math.min(bounds[1], hardBounds.getMax() - 1); // hard bounds max is exclusive - } + // private long[] computeBounds(final FastFilterRewriteHelper.DateHistogramAggregationType fieldContext) throws IOException { + // final long[] bounds = FastFilterRewriteHelper.getAggregationBounds(context, fieldContext.getFieldType().name()); + // if (bounds != null) { + // // Update min/max limit if user specified any hard bounds + // if (hardBounds != null) { + // bounds[0] = Math.max(bounds[0], hardBounds.getMin()); + // bounds[1] = Math.min(bounds[1], hardBounds.getMax() - 1); // hard bounds max is exclusive + // } + // } + // return bounds; + // } + + private class DateHistogramAggregationType extends FastFilterRewriteHelper.AbstractDateHistogramAggregationType { + + public DateHistogramAggregationType(MappedFieldType fieldType, boolean missing, boolean hasScript, LongBounds hardBounds) { + super(fieldType, missing, hasScript, hardBounds); + } + + @Override + protected Rounding getRounding(long low, long high) { + return rounding; + } + + @Override + protected Rounding.Prepared getRoundingPrepared() { + return preparedRounding; } - return bounds; } @Override @@ -154,6 +173,8 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol return LeafBucketCollector.NO_OP_COLLECTOR; } + // assume we found out at segment level, it's effectively a match all + // boolean optimized = FastFilterRewriteHelper.tryFastFilterAggregation( ctx, fastFilterContext, diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java index bca6623e66104..95dd1687d62bd 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java @@ -154,35 +154,35 @@ public void testMatchAllDocs() throws IOException { false ); - testSearchCase( - query, - DATASET, - aggregation -> aggregation.calendarInterval(DateHistogramInterval.YEAR).field(AGGREGABLE_DATE), - histogram -> assertEquals(8, histogram.getBuckets().size()), - false - ); - testSearchCase( - query, - DATASET, - aggregation -> aggregation.calendarInterval(DateHistogramInterval.YEAR).field(AGGREGABLE_DATE).minDocCount(1L), - histogram -> assertEquals(6, histogram.getBuckets().size()), - false - ); - - testSearchCase( - query, - DATASET, - aggregation -> aggregation.fixedInterval(new DateHistogramInterval("365d")).field(AGGREGABLE_DATE), - histogram -> assertEquals(8, histogram.getBuckets().size()), - false - ); - testSearchCase( - query, - DATASET, - aggregation -> aggregation.fixedInterval(new DateHistogramInterval("365d")).field(AGGREGABLE_DATE).minDocCount(1L), - histogram -> assertEquals(6, histogram.getBuckets().size()), - false - ); + // testSearchCase( + // query, + // DATASET, + // aggregation -> aggregation.calendarInterval(DateHistogramInterval.YEAR).field(AGGREGABLE_DATE), + // histogram -> assertEquals(8, histogram.getBuckets().size()), + // false + // ); + // testSearchCase( + // query, + // DATASET, + // aggregation -> aggregation.calendarInterval(DateHistogramInterval.YEAR).field(AGGREGABLE_DATE).minDocCount(1L), + // histogram -> assertEquals(6, histogram.getBuckets().size()), + // false + // ); + // + // testSearchCase( + // query, + // DATASET, + // aggregation -> aggregation.fixedInterval(new DateHistogramInterval("365d")).field(AGGREGABLE_DATE), + // histogram -> assertEquals(8, histogram.getBuckets().size()), + // false + // ); + // testSearchCase( + // query, + // DATASET, + // aggregation -> aggregation.fixedInterval(new DateHistogramInterval("365d")).field(AGGREGABLE_DATE).minDocCount(1L), + // histogram -> assertEquals(6, histogram.getBuckets().size()), + // false + // ); } public void testAsSubAgg() throws IOException { From 486b8b8208fd92168556a8ee6b60e457f7cc664e Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Mon, 29 Jan 2024 10:49:35 -0800 Subject: [PATCH 02/14] segment level match all case Signed-off-by: bowenlan-amzn --- .../bucket/FastFilterRewriteHelper.java | 165 +++++++----------- .../bucket/composite/CompositeAggregator.java | 31 ++-- .../AutoDateHistogramAggregator.java | 26 +-- .../histogram/DateHistogramAggregator.java | 25 +-- 4 files changed, 88 insertions(+), 159 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java index 5db9a0bc21b64..7527ecefa4198 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java @@ -8,6 +8,8 @@ package org.opensearch.search.aggregations.bucket; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.PointValues; import org.apache.lucene.search.ConstantScoreQuery; @@ -53,6 +55,8 @@ public final class FastFilterRewriteHelper { private FastFilterRewriteHelper() {} + private static final Logger logger = LogManager.getLogger(FastFilterRewriteHelper.class); + private static final int MAX_NUM_FILTER_BUCKETS = 1024; private static final Map, Function> queryWrappers; @@ -86,6 +90,7 @@ private static long[] getIndexBounds(final SearchContext context, final String f // Since the query does not specify bounds for aggregation, we can // build the global min/max from local min/max within each segment for (LeafReaderContext leaf : leaves) { + // return null if the field is not indexed final PointValues values = leaf.reader().getPointValues(fieldName); if (values != null) { min = Math.min(min, NumericUtils.sortableBytesToLong(values.getMinPackedValue(), 0)); @@ -187,14 +192,18 @@ protected String toString(int dimension, byte[] value) { } /** - * Context object to do fast filter optimization + * Context object for fast filter optimization + * + * filters equals to null if the optimization cannot be applied */ public static class FastFilterContext { + private boolean rewriteable = false; private Weight[] filters = null; public AggregationType aggregationType; + protected final SearchContext context; - private void setFilters(Weight[] filters) { - this.filters = filters; + public FastFilterContext(SearchContext context) { + this.context = context; } public void setAggregationType(AggregationType aggregationType) { @@ -202,64 +211,39 @@ public void setAggregationType(AggregationType aggregationType) { } public boolean isRewriteable(final Object parent, final int subAggLength) { - return aggregationType.isRewriteable(parent, subAggLength); + boolean rewriteable = aggregationType.isRewriteable(parent, subAggLength); + logger.debug("Fast filter rewriteable: {}", rewriteable); + this.rewriteable = rewriteable; + return rewriteable; } - /** - * This filter build method is for date histogram aggregation type - * - */ - // public void buildFastFilter( - // SearchContext context, - // CheckedFunction computeBounds, - // Function roundingFunction, - // Supplier preparedRoundingSupplier - // ) throws IOException { - // assert this.aggregationType instanceof DateHistogramAggregationType; - // DateHistogramAggregationType aggregationType = (DateHistogramAggregationType) this.aggregationType; - // DateFieldMapper.DateFieldType fieldType = aggregationType.getFieldType(); - // final long[] bounds = computeBounds.apply(aggregationType); - // if (bounds == null) return; - // - // final Rounding rounding = roundingFunction.apply(bounds); - // final OptionalLong intervalOpt = Rounding.getInterval(rounding); - // if (intervalOpt.isEmpty()) return; - // final long interval = intervalOpt.getAsLong(); - // - // // afterKey is the last bucket key in previous response, while the bucket key - // // is the start of the bucket values, so add the interval - // if (aggregationType instanceof CompositeAggregationType && ((CompositeAggregationType) aggregationType).afterKey != -1) { - // bounds[0] = ((CompositeAggregationType) aggregationType).afterKey + interval; - // } - // - // final Weight[] filters = FastFilterRewriteHelper.createFilterForAggregations( - // context, - // interval, - // preparedRoundingSupplier.get(), - // fieldType.name(), - // fieldType, - // bounds[0], - // bounds[1] - // ); - // this.setFilters(filters); - // } - - public void buildFastFilter(SearchContext context) throws IOException { - Weight[] filters = this.aggregationType.buildFastFilter(context); - this.setFilters(filters); + public void buildFastFilter() throws IOException { + Weight[] filters = this.buildFastFilter(null); + logger.debug("Fast filter built: {}", filters == null ? "no" : "yes"); + this.filters = filters; } - // this method should delegate to specific aggregation type + public Weight[] buildFastFilter(LeafReaderContext ctx) throws IOException { + Weight[] filters = this.aggregationType.buildFastFilter(context, ctx); + logger.debug("Fast filter built: {}", filters == null ? "no" : "yes"); + this.filters = filters; + return filters; + } - // can this all be triggered in aggregation type? + public boolean hasfilterBuilt() { + return filters != null; + } } /** * Different types have different pre-conditions, filter building logic, etc. */ public interface AggregationType { + boolean isRewriteable(Object parent, int subAggLength); - Weight[] buildFastFilter(SearchContext context) throws IOException; + + // Weight[] buildFastFilter(SearchContext context) throws IOException; + Weight[] buildFastFilter(SearchContext context, LeafReaderContext ctx) throws IOException; } /** @@ -269,7 +253,7 @@ public static abstract class AbstractDateHistogramAggregationType implements Agg private final MappedFieldType fieldType; private final boolean missing; private final boolean hasScript; - private LongBounds hardBounds = null; + private LongBounds hardBounds; public AbstractDateHistogramAggregationType(MappedFieldType fieldType, boolean missing, boolean hasScript) { this.fieldType = fieldType; @@ -284,6 +268,9 @@ public AbstractDateHistogramAggregationType(MappedFieldType fieldType, boolean m @Override public boolean isRewriteable(Object parent, int subAggLength) { + // at shard level these are the pre-conditions + // these still need to apply at segment level + // while the effectively segment level match all, should be plugged in within compute bound if (parent == null && subAggLength == 0 && !missing && !hasScript) { return fieldType != null && fieldType instanceof DateFieldMapper.DateFieldType; } @@ -291,26 +278,21 @@ public boolean isRewriteable(Object parent, int subAggLength) { } @Override - public Weight[] buildFastFilter(SearchContext context) throws IOException { - // the procedure in this method can be separated - // 1. compute bound - // 2. get rounding, interval - // 3. get prepared rounding, better combined with step 2 - Weight[] result = {}; - - long[] bounds = computeBounds(context); - if (bounds == null) return result; + public Weight[] buildFastFilter(SearchContext context, LeafReaderContext ctx) throws IOException { + long[] bounds; + if (ctx != null) { + bounds = getIndexBounds(context, fieldType.name()); + } else { + bounds = computeBounds(context); + } + if (bounds == null) return null; final Rounding rounding = getRounding(bounds[0], bounds[1]); final OptionalLong intervalOpt = Rounding.getInterval(rounding); - if (intervalOpt.isEmpty()) return result; + if (intervalOpt.isEmpty()) return null; final long interval = intervalOpt.getAsLong(); - // afterKey is the last bucket key in previous response, while the bucket key - // is the start of the bucket values, so add the interval - // if (aggregationType instanceof CompositeAggregationType && ((CompositeAggregationType) aggregationType).afterKey != -1) { - // bounds[0] = ((CompositeAggregationType) aggregationType).afterKey + interval; - // } + // process the after key from composite agg processAfterKey(bounds, interval); return FastFilterRewriteHelper.createFilterForAggregations( @@ -337,6 +319,7 @@ protected long[] computeBounds(SearchContext context) throws IOException { } protected abstract Rounding getRounding(final long low, final long high); + protected abstract Rounding.Prepared getRoundingPrepared(); protected void processAfterKey(long[] bound, long interval) {}; @@ -347,40 +330,6 @@ public DateFieldMapper.DateFieldType getFieldType() { } } - /** - * For composite aggregation with date histogram as a source - */ - // public static class CompositeAggregationType extends DateHistogramAggregationType { - // private final RoundingValuesSource valuesSource; - // private long afterKey = -1L; - // private final int size; - // - // public CompositeAggregationType( - // CompositeValuesSourceConfig[] sourceConfigs, - // CompositeKey rawAfterKey, - // List formats, - // int size - // ) { - // super(sourceConfigs[0].fieldType(), sourceConfigs[0].missingBucket(), sourceConfigs[0].hasScript()); - // this.valuesSource = (RoundingValuesSource) sourceConfigs[0].valuesSource(); - // this.size = size; - // if (rawAfterKey != null) { - // assert rawAfterKey.size() == 1 && formats.size() == 1; - // this.afterKey = formats.get(0).parseLong(rawAfterKey.get(0).toString(), false, () -> { - // throw new IllegalArgumentException("now() is not supported in [after] key"); - // }); - // } - // } - // - // public Rounding getRounding() { - // return valuesSource.getRounding(); - // } - // - // public Rounding.Prepared getRoundingPreparer() { - // return valuesSource.getPreparedRounding(); - // } - // } - public static boolean isCompositeAggRewriteable(CompositeValuesSourceConfig[] sourceConfigs) { return sourceConfigs.length == 1 && sourceConfigs[0].valuesSource() instanceof RoundingValuesSource; } @@ -394,7 +343,7 @@ public static long getBucketOrd(long bucketOrd) { } /** - * This is executed for each segment by passing the leaf reader context + * Try to get the bucket doc counts from the fast filters for the aggregation * * @param incrementDocCount takes in the bucket key value and the bucket count */ @@ -404,7 +353,17 @@ public static boolean tryFastFilterAggregation( final BiConsumer incrementDocCount ) throws IOException { if (fastFilterContext == null) return false; - if (fastFilterContext.filters == null) return false; + if (!fastFilterContext.hasfilterBuilt() && fastFilterContext.rewriteable) { + // Check if the query is functionally match-all at segment level + // if so, we still compute the index bounds and use it to build the filters + SearchContext context = fastFilterContext.context; + Weight weight = context.searcher().createWeight(context.query(), ScoreMode.COMPLETE_NO_SCORES, 1f); + boolean segmentRewriteable = weight != null && weight.count(ctx) == ctx.reader().numDocs(); + if (segmentRewriteable) { + fastFilterContext.buildFastFilter(ctx); + } + } + if (!fastFilterContext.hasfilterBuilt()) return false; final Weight[] filters = fastFilterContext.filters; final int[] counts = new int[filters.length]; @@ -424,8 +383,8 @@ public static boolean tryFastFilterAggregation( if (counts[i] > 0) { long bucketKey = i; // the index of filters is the key for filters aggregation if (fastFilterContext.aggregationType instanceof AbstractDateHistogramAggregationType) { - final DateFieldMapper.DateFieldType fieldType = ((AbstractDateHistogramAggregationType) fastFilterContext.aggregationType) - .getFieldType(); + final DateFieldMapper.DateFieldType fieldType = + ((AbstractDateHistogramAggregationType) fastFilterContext.aggregationType).getFieldType(); bucketKey = fieldType.convertNanosToMillis( NumericUtils.sortableBytesToLong(((PointRangeQuery) filters[i].getQuery()).getLowerPoint(), 0) ); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java index ab8641eaf8a90..d8e7229e0d450 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -164,24 +164,20 @@ public final class CompositeAggregator extends BucketsAggregator { this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size, rawAfterKey); this.rawAfterKey = rawAfterKey; - fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(); + fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(context); if (!FastFilterRewriteHelper.isCompositeAggRewriteable(sourceConfigs)) return; - fastFilterContext.setAggregationType( - new CompositeAggregationType() - ); + fastFilterContext.setAggregationType(new CompositeAggregationType()); if (fastFilterContext.isRewriteable(parent, subAggregators.length)) { - // bucketOrds is the data structure for saving date histogram results + // bucketOrds is used for saving date histogram results bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), CardinalityUpperBound.ONE); - // Currently the filter rewrite is only supported for date histograms - // FastFilterRewriteHelper.CompositeAggregationType aggregationType = - // (FastFilterRewriteHelper.CompositeAggregationType) fastFilterContext.aggregationType; - // preparedRounding = aggregationType.getRoundingPreparer(); - fastFilterContext.buildFastFilter( - context - ); + preparedRounding = ((CompositeAggregationType) fastFilterContext.aggregationType).getRoundingPrepared(); + fastFilterContext.buildFastFilter(); } } + /** + * Currently the filter rewrite is only supported for date histograms + */ public class CompositeAggregationType extends FastFilterRewriteHelper.AbstractDateHistogramAggregationType { private final RoundingValuesSource valuesSource; private long afterKey = -1L; @@ -197,20 +193,21 @@ public CompositeAggregationType() { } } - @Override public Rounding getRounding(final long low, final long high) { return valuesSource.getRounding(); } + public Rounding.Prepared getRoundingPrepared() { + return valuesSource.getPreparedRounding(); + } + @Override protected void processAfterKey(long[] bound, long interval) { + // afterKey is the last bucket key in previous response, and the bucket key + // is the minimum of all values in the bucket, so need to add the interval bound[0] = afterKey + interval; } - public Rounding.Prepared getRoundingPrepared() { - return valuesSource.getPreparedRounding(); - } - public int getSize() { return size; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java index 8a2eea9a0417f..a76a05fb3985c 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java @@ -157,7 +157,7 @@ private AutoDateHistogramAggregator( this.roundingPreparer = roundingPreparer; this.preparedRounding = prepareRounding(0); - fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(); + fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(context); fastFilterContext.setAggregationType( new AutoHistogramAggregationType( valuesSourceConfig.fieldType(), @@ -166,32 +166,12 @@ private AutoDateHistogramAggregator( ) ); if (fastFilterContext.isRewriteable(parent, subAggregators.length)) { - fastFilterContext.buildFastFilter(context); + fastFilterContext.buildFastFilter(); } } - // private Rounding getMinimumRounding(final long low, final long high) { - // // max - min / targetBuckets = bestDuration - // // find the right innerInterval this bestDuration belongs to - // // since we cannot exceed targetBuckets, bestDuration should go up, - // // so the right innerInterval should be an upper bound - // long bestDuration = (high - low) / targetBuckets; - // while (roundingIdx < roundingInfos.length - 1) { - // final RoundingInfo curRoundingInfo = roundingInfos[roundingIdx]; - // final int temp = curRoundingInfo.innerIntervals[curRoundingInfo.innerIntervals.length - 1]; - // // If the interval duration is covered by the maximum inner interval, - // // we can start with this outer interval for creating the buckets - // if (bestDuration <= temp * curRoundingInfo.roughEstimateDurationMillis) { - // break; - // } - // roundingIdx++; - // } - // - // preparedRounding = prepareRounding(roundingIdx); - // return roundingInfos[roundingIdx].rounding; - // } - private class AutoHistogramAggregationType extends FastFilterRewriteHelper.AbstractDateHistogramAggregationType { + public AutoHistogramAggregationType(MappedFieldType fieldType, boolean missing, boolean hasScript) { super(fieldType, missing, hasScript); } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java index fd0cf1ab3f8b1..5b896cb95cf25 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java @@ -31,10 +31,13 @@ package org.opensearch.search.aggregations.bucket.histogram; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.SortedNumericDocValues; import org.apache.lucene.search.CollectionTerminatedException; import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Weight; import org.apache.lucene.util.CollectionUtil; import org.opensearch.common.Nullable; import org.opensearch.common.Rounding; @@ -84,6 +87,9 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg private final LongKeyedBucketOrds bucketOrds; private final FastFilterRewriteHelper.FastFilterContext fastFilterContext; + private Weight weight; + + private static final Logger logger = LogManager.getLogger(DateHistogramAggregator.class); DateHistogramAggregator( String name, @@ -116,7 +122,7 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), cardinality); - fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(); + fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(context); fastFilterContext.setAggregationType( new DateHistogramAggregationType( valuesSourceConfig.fieldType(), @@ -126,22 +132,10 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg ) ); if (fastFilterContext.isRewriteable(parent, subAggregators.length)) { - fastFilterContext.buildFastFilter(context); + fastFilterContext.buildFastFilter(); } } - // private long[] computeBounds(final FastFilterRewriteHelper.DateHistogramAggregationType fieldContext) throws IOException { - // final long[] bounds = FastFilterRewriteHelper.getAggregationBounds(context, fieldContext.getFieldType().name()); - // if (bounds != null) { - // // Update min/max limit if user specified any hard bounds - // if (hardBounds != null) { - // bounds[0] = Math.max(bounds[0], hardBounds.getMin()); - // bounds[1] = Math.min(bounds[1], hardBounds.getMax() - 1); // hard bounds max is exclusive - // } - // } - // return bounds; - // } - private class DateHistogramAggregationType extends FastFilterRewriteHelper.AbstractDateHistogramAggregationType { public DateHistogramAggregationType(MappedFieldType fieldType, boolean missing, boolean hasScript, LongBounds hardBounds) { @@ -173,8 +167,6 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol return LeafBucketCollector.NO_OP_COLLECTOR; } - // assume we found out at segment level, it's effectively a match all - // boolean optimized = FastFilterRewriteHelper.tryFastFilterAggregation( ctx, fastFilterContext, @@ -183,6 +175,7 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol count ) ); + logger.info("optimized = " + optimized); if (optimized) throw new CollectionTerminatedException(); SortedNumericDocValues values = valuesSource.longValues(ctx); From 93c70de78453e33d7f9962319c754a213cde0737 Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Mon, 29 Jan 2024 10:52:03 -0800 Subject: [PATCH 03/14] remove segment level match check at shard level Signed-off-by: bowenlan-amzn --- .../aggregations/bucket/FastFilterRewriteHelper.java | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java index 7527ecefa4198..382914c496713 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java @@ -123,14 +123,8 @@ public static long[] getAggregationBounds(final SearchContext context, final Str } else if (cq instanceof MatchAllDocsQuery) { return indexBounds; } - // Check if the top-level query (which may be a PRQ on another field) is functionally match-all - Weight weight = context.searcher().createWeight(context.query(), ScoreMode.COMPLETE_NO_SCORES, 1f); - for (LeafReaderContext ctx : context.searcher().getIndexReader().leaves()) { - if (weight.count(ctx) != ctx.reader().numDocs()) { - return null; - } - } - return indexBounds; + + return null; } /** From e9639810f773868ee6db45db7ea145b81f5a23cd Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Mon, 29 Jan 2024 11:13:31 -0800 Subject: [PATCH 04/14] small refactor Signed-off-by: bowenlan-amzn --- .../bucket/FastFilterRewriteHelper.java | 50 ++++++++++------ .../histogram/DateHistogramAggregator.java | 7 --- .../DateHistogramAggregatorTests.java | 58 +++++++++---------- 3 files changed, 60 insertions(+), 55 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java index 382914c496713..06732c9a41442 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java @@ -13,6 +13,7 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.PointValues; import org.apache.lucene.search.ConstantScoreQuery; +import org.apache.lucene.search.FieldExistsQuery; import org.apache.lucene.search.IndexOrDocValuesQuery; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.PointRangeQuery; @@ -122,6 +123,11 @@ public static long[] getAggregationBounds(final SearchContext context, final Str } } else if (cq instanceof MatchAllDocsQuery) { return indexBounds; + } else if (cq instanceof FieldExistsQuery) { + // when a range query covers all values of a shard, it will be rewrite field exists query + if (((FieldExistsQuery) cq).getField().equals(fieldName)) { + return indexBounds; + } } return null; @@ -187,14 +193,12 @@ protected String toString(int dimension, byte[] value) { /** * Context object for fast filter optimization - * - * filters equals to null if the optimization cannot be applied */ public static class FastFilterContext { private boolean rewriteable = false; private Weight[] filters = null; public AggregationType aggregationType; - protected final SearchContext context; + private final SearchContext context; public FastFilterContext(SearchContext context) { this.context = context; @@ -217,14 +221,14 @@ public void buildFastFilter() throws IOException { this.filters = filters; } - public Weight[] buildFastFilter(LeafReaderContext ctx) throws IOException { + private Weight[] buildFastFilter(LeafReaderContext ctx) throws IOException { Weight[] filters = this.aggregationType.buildFastFilter(context, ctx); logger.debug("Fast filter built: {}", filters == null ? "no" : "yes"); this.filters = filters; return filters; } - public boolean hasfilterBuilt() { + public boolean filterBuilt() { return filters != null; } } @@ -236,7 +240,6 @@ public interface AggregationType { boolean isRewriteable(Object parent, int subAggLength); - // Weight[] buildFastFilter(SearchContext context) throws IOException; Weight[] buildFastFilter(SearchContext context, LeafReaderContext ctx) throws IOException; } @@ -262,9 +265,6 @@ public AbstractDateHistogramAggregationType(MappedFieldType fieldType, boolean m @Override public boolean isRewriteable(Object parent, int subAggLength) { - // at shard level these are the pre-conditions - // these still need to apply at segment level - // while the effectively segment level match all, should be plugged in within compute bound if (parent == null && subAggLength == 0 && !missing && !hasScript) { return fieldType != null && fieldType instanceof DateFieldMapper.DateFieldType; } @@ -347,17 +347,20 @@ public static boolean tryFastFilterAggregation( final BiConsumer incrementDocCount ) throws IOException { if (fastFilterContext == null) return false; - if (!fastFilterContext.hasfilterBuilt() && fastFilterContext.rewriteable) { - // Check if the query is functionally match-all at segment level - // if so, we still compute the index bounds and use it to build the filters - SearchContext context = fastFilterContext.context; - Weight weight = context.searcher().createWeight(context.query(), ScoreMode.COMPLETE_NO_SCORES, 1f); - boolean segmentRewriteable = weight != null && weight.count(ctx) == ctx.reader().numDocs(); - if (segmentRewriteable) { - fastFilterContext.buildFastFilter(ctx); + + // check if the query is functionally match-all at segment level + if (segmentMatchAll(fastFilterContext.context, ctx)) { + logger.debug("Functionally match all for the segment {}", ctx); + if (fastFilterContext.rewriteable) { + if (!fastFilterContext.filterBuilt()) { + fastFilterContext.buildFastFilter(ctx); + } + } else { + return false; } } - if (!fastFilterContext.hasfilterBuilt()) return false; + + if (!fastFilterContext.filterBuilt()) return false; final Weight[] filters = fastFilterContext.filters; final int[] counts = new int[filters.length]; @@ -388,10 +391,19 @@ public static boolean tryFastFilterAggregation( } incrementDocCount.accept(bucketKey, counts[i]); s++; - if (s > size) return true; + if (s > size) { + logger.debug("Fast filter optimization applied with size {}", size); + return true; + } } } + logger.debug("Fast filter optimization applied"); return true; } + + private static boolean segmentMatchAll(SearchContext ctx, LeafReaderContext leafCtx) throws IOException { + Weight weight = ctx.searcher().createWeight(ctx.query(), ScoreMode.COMPLETE_NO_SCORES, 1f); + return weight != null && weight.count(leafCtx) == leafCtx.reader().numDocs(); + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java index 5b896cb95cf25..0e830106c8284 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java @@ -31,13 +31,10 @@ package org.opensearch.search.aggregations.bucket.histogram; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.SortedNumericDocValues; import org.apache.lucene.search.CollectionTerminatedException; import org.apache.lucene.search.ScoreMode; -import org.apache.lucene.search.Weight; import org.apache.lucene.util.CollectionUtil; import org.opensearch.common.Nullable; import org.opensearch.common.Rounding; @@ -87,9 +84,6 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg private final LongKeyedBucketOrds bucketOrds; private final FastFilterRewriteHelper.FastFilterContext fastFilterContext; - private Weight weight; - - private static final Logger logger = LogManager.getLogger(DateHistogramAggregator.class); DateHistogramAggregator( String name, @@ -175,7 +169,6 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol count ) ); - logger.info("optimized = " + optimized); if (optimized) throw new CollectionTerminatedException(); SortedNumericDocValues values = valuesSource.longValues(ctx); diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java index 95dd1687d62bd..bca6623e66104 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java @@ -154,35 +154,35 @@ public void testMatchAllDocs() throws IOException { false ); - // testSearchCase( - // query, - // DATASET, - // aggregation -> aggregation.calendarInterval(DateHistogramInterval.YEAR).field(AGGREGABLE_DATE), - // histogram -> assertEquals(8, histogram.getBuckets().size()), - // false - // ); - // testSearchCase( - // query, - // DATASET, - // aggregation -> aggregation.calendarInterval(DateHistogramInterval.YEAR).field(AGGREGABLE_DATE).minDocCount(1L), - // histogram -> assertEquals(6, histogram.getBuckets().size()), - // false - // ); - // - // testSearchCase( - // query, - // DATASET, - // aggregation -> aggregation.fixedInterval(new DateHistogramInterval("365d")).field(AGGREGABLE_DATE), - // histogram -> assertEquals(8, histogram.getBuckets().size()), - // false - // ); - // testSearchCase( - // query, - // DATASET, - // aggregation -> aggregation.fixedInterval(new DateHistogramInterval("365d")).field(AGGREGABLE_DATE).minDocCount(1L), - // histogram -> assertEquals(6, histogram.getBuckets().size()), - // false - // ); + testSearchCase( + query, + DATASET, + aggregation -> aggregation.calendarInterval(DateHistogramInterval.YEAR).field(AGGREGABLE_DATE), + histogram -> assertEquals(8, histogram.getBuckets().size()), + false + ); + testSearchCase( + query, + DATASET, + aggregation -> aggregation.calendarInterval(DateHistogramInterval.YEAR).field(AGGREGABLE_DATE).minDocCount(1L), + histogram -> assertEquals(6, histogram.getBuckets().size()), + false + ); + + testSearchCase( + query, + DATASET, + aggregation -> aggregation.fixedInterval(new DateHistogramInterval("365d")).field(AGGREGABLE_DATE), + histogram -> assertEquals(8, histogram.getBuckets().size()), + false + ); + testSearchCase( + query, + DATASET, + aggregation -> aggregation.fixedInterval(new DateHistogramInterval("365d")).field(AGGREGABLE_DATE).minDocCount(1L), + histogram -> assertEquals(6, histogram.getBuckets().size()), + false + ); } public void testAsSubAgg() throws IOException { From eb090bb66360f3ae70e932a510fe781f0e77d38f Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Mon, 29 Jan 2024 11:41:25 -0800 Subject: [PATCH 05/14] the optimization cannot handle the documents with _doc_count field Signed-off-by: bowenlan-amzn --- .../aggregations/bucket/FastFilterRewriteHelper.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java index 06732c9a41442..ddff864899fb9 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java @@ -10,7 +10,9 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.index.DocValues; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.PointValues; import org.apache.lucene.search.ConstantScoreQuery; import org.apache.lucene.search.FieldExistsQuery; @@ -24,6 +26,7 @@ import org.opensearch.common.Rounding; import org.opensearch.common.lucene.search.function.FunctionScoreQuery; import org.opensearch.index.mapper.DateFieldMapper; +import org.opensearch.index.mapper.DocCountFieldMapper; import org.opensearch.index.mapper.MappedFieldType; import org.opensearch.index.query.DateRangeIncludingNowQuery; import org.opensearch.search.aggregations.bucket.composite.CompositeAggregator; @@ -40,6 +43,8 @@ import java.util.function.BiConsumer; import java.util.function.Function; +import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; + /** * Utility class to help rewrite aggregations into filters. * Instead of aggregation collects documents one by one, filter may count all documents that match in one pass. @@ -348,6 +353,12 @@ public static boolean tryFastFilterAggregation( ) throws IOException { if (fastFilterContext == null) return false; + NumericDocValues docCountValues = DocValues.getNumeric(ctx.reader(), DocCountFieldMapper.NAME); + if (docCountValues.nextDoc() != NO_MORE_DOCS) { + logger.debug("Segment {} has at least one document with _doc_count field", ctx); + return false; + } + // check if the query is functionally match-all at segment level if (segmentMatchAll(fastFilterContext.context, ctx)) { logger.debug("Functionally match all for the segment {}", ctx); From 3738a4982201e216932e9e9a746038eb9fe4f73d Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Tue, 30 Jan 2024 10:42:49 -0800 Subject: [PATCH 06/14] add tests to cover hard bounds logic Signed-off-by: bowenlan-amzn --- .../bucket/FastFilterRewriteHelper.java | 12 ++- .../composite/CompositeAggregatorTests.java | 2 +- .../DateHistogramAggregatorTests.java | 76 ++++++++++++++++++- 3 files changed, 85 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java index 90f34b02858dc..87816d169e1b8 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java @@ -106,6 +106,8 @@ private static long[] getIndexBounds(final SearchContext context, final String f } } + // TODO b remove + logger.info("get index bounds [{}-{}]", min, max); if (min == Long.MAX_VALUE || max == Long.MIN_VALUE) { return null; } @@ -234,7 +236,7 @@ private void buildFastFilter(GetBounds getBounds) assert filters == null : "Filters should only be built once, but they are already built"; Weight[] filters = this.aggregationType.buildFastFilter(context, getBounds); if (filters != null) { - logger.debug("Fast filter built for shard {}", context.indexShard().shardId()); + logger.info("Fast filter built for shard {}", context.indexShard().shardId()); filtersBuiltAtShardLevel = true; this.filters = filters; } @@ -294,11 +296,13 @@ public boolean isRewriteable(Object parent, int subAggLength) { @Override public Weight[] buildFastFilter(SearchContext context, GetBounds getBounds) throws IOException { long[] bounds = getBounds.apply(context, fieldType.name()); + logger.info("before process: Bounds is {} for shard {}", bounds, context.indexShard().shardId()); bounds = processHardBounds(bounds); - logger.trace("Bounds is {} for shard {}", bounds, context.indexShard().shardId()); + logger.info("Bounds is {} for shard {}", bounds, context.indexShard().shardId()); if (bounds == null) { return null; } + assert bounds[0] <= bounds[1] : "Low bound should be less than high bound"; final Rounding rounding = getRounding(bounds[0], bounds[1]); final OptionalLong intervalOpt = Rounding.getInterval(rounding); @@ -339,6 +343,8 @@ protected long[] processHardBounds(long[] bounds) { if (bounds[0] > bounds[1]) { return null; } + // bounds[0] = Math.max(bounds[0], hardBounds.getMin()); + // bounds[1] = Math.min(bounds[1], hardBounds.getMax() - 1); // hard bounds max is exclusive } } return bounds; @@ -385,7 +391,7 @@ public static boolean tryFastFilterAggregation( // check if the query is functionally match-all at segment level if (!fastFilterContext.filtersBuiltAtShardLevel && !segmentMatchAll(fastFilterContext.context, ctx)) return false; if (!fastFilterContext.filterBuilt()) { - logger.debug( + logger.info( "Shard {} segment {} functionally match all documents. Build the fast filter", fastFilterContext.context.indexShard().shardId(), ctx.ord diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java index b581e552fec4f..425432d1ba72d 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java @@ -1275,7 +1275,7 @@ public void testWithDateHistogram() throws IOException { () -> { DateHistogramValuesSourceBuilder histo = new DateHistogramValuesSourceBuilder("date").field("date") .calendarInterval(DateHistogramInterval.days(1)); - return new CompositeAggregationBuilder("name", Collections.singletonList(histo)); + return new CompositeAggregationBuilder("name", Collections.singletonList(histo)).size(3); }, (result) -> { assertEquals(3, result.getBuckets().size()); diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java index bca6623e66104..d33683b0444de 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java @@ -1178,6 +1178,80 @@ public void testOverlappingBounds() { ); } + public void testHardBoundsNotOverlapping() throws IOException { + testSearchCase( + new MatchAllDocsQuery(), + Arrays.asList("2017-02-01", "2017-02-02", "2017-02-02", "2017-02-03", "2017-02-03", "2017-02-03", "2017-02-05"), + aggregation -> aggregation.calendarInterval(DateHistogramInterval.DAY) + .hardBounds(new LongBounds("2018-01-01", "2020-01-01")) + .field(AGGREGABLE_DATE), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(0, buckets.size()); + }, + false + ); + + testSearchCase( + new MatchAllDocsQuery(), + Arrays.asList("2017-02-01", "2017-02-02", "2017-02-02", "2017-02-03", "2017-02-03", "2017-02-03", "2017-02-05"), + aggregation -> aggregation.calendarInterval(DateHistogramInterval.DAY) + .hardBounds(new LongBounds("2016-01-01", "2017-01-01")) + .field(AGGREGABLE_DATE), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(0, buckets.size()); + }, + false + ); + + testSearchCase( + new MatchAllDocsQuery(), + Arrays.asList("2017-02-01", "2017-02-02", "2017-02-02", "2017-02-03", "2017-02-03", "2017-02-03", "2017-02-05"), + aggregation -> aggregation.calendarInterval(DateHistogramInterval.DAY) + .hardBounds(new LongBounds("2016-01-01", "2017-02-03")) + .field(AGGREGABLE_DATE), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(2, buckets.size()); + + Histogram.Bucket bucket = buckets.get(0); + assertEquals("2017-02-01T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(1); + assertEquals("2017-02-02T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(2, bucket.getDocCount()); + }, + false + ); + + testSearchCase( + new MatchAllDocsQuery(), + Arrays.asList("2017-02-01", "2017-02-02", "2017-02-02", "2017-02-03", "2017-02-03", "2017-02-03", "2017-02-05"), + aggregation -> aggregation.calendarInterval(DateHistogramInterval.DAY) + .hardBounds(new LongBounds("2017-02-03", "2020-01-01")) + .field(AGGREGABLE_DATE), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(3, buckets.size()); + + Histogram.Bucket bucket = buckets.get(0); + assertEquals("2017-02-03T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(3, bucket.getDocCount()); + + bucket = buckets.get(1); + assertEquals("2017-02-04T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(0, bucket.getDocCount()); + + bucket = buckets.get(2); + assertEquals("2017-02-05T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + }, + false + ); + } + public void testIllegalInterval() throws IOException { IllegalArgumentException e = expectThrows( IllegalArgumentException.class, @@ -1211,7 +1285,7 @@ private void testSearchCase( int maxBucket, boolean useNanosecondResolution ) throws IOException { - boolean aggregableDateIsSearchable = randomBoolean(); + boolean aggregableDateIsSearchable = true; DateFieldMapper.DateFieldType fieldType = aggregableDateFieldType(useNanosecondResolution, aggregableDateIsSearchable); try (Directory directory = newDirectory()) { From d933f6162cf044371d42cdbde99ec09aed145806 Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Tue, 30 Jan 2024 15:35:25 -0800 Subject: [PATCH 07/14] add tests for doc count field Signed-off-by: bowenlan-amzn --- .../bucket/FastFilterRewriteHelper.java | 27 ++++---- .../bucket/composite/CompositeAggregator.java | 4 +- .../composite/CompositeAggregatorTests.java | 61 +++++++++++++++++++ .../DateHistogramAggregatorTests.java | 44 ++++++++++++- .../BaseCompositeAggregatorTestCase.java | 8 +++ 5 files changed, 129 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java index 87816d169e1b8..446180651bfcf 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java @@ -106,8 +106,6 @@ private static long[] getIndexBounds(final SearchContext context, final String f } } - // TODO b remove - logger.info("get index bounds [{}-{}]", min, max); if (min == Long.MAX_VALUE || max == Long.MIN_VALUE) { return null; } @@ -162,7 +160,10 @@ private static Weight[] createFilterForAggregations( int bucketCount = 0; while (roundedLow <= fieldType.convertNanosToMillis(high)) { bucketCount++; - if (bucketCount > MAX_NUM_FILTER_BUCKETS) return null; + if (bucketCount > MAX_NUM_FILTER_BUCKETS) { + logger.info("2.2.2 max number of filters reached [{}]", MAX_NUM_FILTER_BUCKETS); + return null; + } // Below rounding is needed as the interval could return in // non-rounded values for something like calendar month roundedLow = preparedRounding.round(roundedLow + interval); @@ -210,6 +211,7 @@ public static class FastFilterContext { private boolean rewriteable = false; private Weight[] filters = null; private boolean filtersBuiltAtShardLevel = false; + public AggregationType aggregationType; private final SearchContext context; @@ -223,12 +225,13 @@ public void setAggregationType(AggregationType aggregationType) { public boolean isRewriteable(final Object parent, final int subAggLength) { boolean rewriteable = aggregationType.isRewriteable(parent, subAggLength); - logger.debug("Fast filter rewriteable: {} for shard {}", rewriteable, context.indexShard().shardId()); + logger.debug("1. Fast filter rewriteable: {} for shard {}", rewriteable, context.indexShard().shardId()); this.rewriteable = rewriteable; return rewriteable; } public void buildFastFilter() throws IOException { + logger.info("2. Build filters at shard level"); this.buildFastFilter(FastFilterRewriteHelper::getDateHistoAggBounds); } @@ -236,7 +239,7 @@ private void buildFastFilter(GetBounds getBounds) assert filters == null : "Filters should only be built once, but they are already built"; Weight[] filters = this.aggregationType.buildFastFilter(context, getBounds); if (filters != null) { - logger.info("Fast filter built for shard {}", context.indexShard().shardId()); + logger.info("2.e Fast filter built for shard {}", context.indexShard().shardId()); filtersBuiltAtShardLevel = true; this.filters = filters; } @@ -296,9 +299,8 @@ public boolean isRewriteable(Object parent, int subAggLength) { @Override public Weight[] buildFastFilter(SearchContext context, GetBounds getBounds) throws IOException { long[] bounds = getBounds.apply(context, fieldType.name()); - logger.info("before process: Bounds is {} for shard {}", bounds, context.indexShard().shardId()); bounds = processHardBounds(bounds); - logger.info("Bounds is {} for shard {}", bounds, context.indexShard().shardId()); + logger.info("2.1 Bounds is {} for shard {}", bounds, context.indexShard().shardId()); if (bounds == null) { return null; } @@ -343,8 +345,6 @@ protected long[] processHardBounds(long[] bounds) { if (bounds[0] > bounds[1]) { return null; } - // bounds[0] = Math.max(bounds[0], hardBounds.getMin()); - // bounds[1] = Math.min(bounds[1], hardBounds.getMax() - 1); // hard bounds max is exclusive } } return bounds; @@ -383,7 +383,8 @@ public static boolean tryFastFilterAggregation( NumericDocValues docCountValues = DocValues.getNumeric(ctx.reader(), DocCountFieldMapper.NAME); if (docCountValues.nextDoc() != NO_MORE_DOCS) { - logger.debug("Segment {} has at least one document with _doc_count field", ctx); + logger.info("Shard {} segment {} has at least one document with _doc_count field, skip fast filter optimization", + fastFilterContext.context.indexShard().shardId(),ctx.ord); return false; } @@ -392,7 +393,7 @@ public static boolean tryFastFilterAggregation( if (!fastFilterContext.filtersBuiltAtShardLevel && !segmentMatchAll(fastFilterContext.context, ctx)) return false; if (!fastFilterContext.filterBuilt()) { logger.info( - "Shard {} segment {} functionally match all documents. Build the fast filter", + "3.1 Shard {} segment {} functionally match all documents. Build the fast filter", fastFilterContext.context.indexShard().shardId(), ctx.ord ); @@ -430,13 +431,13 @@ public static boolean tryFastFilterAggregation( incrementDocCount.accept(bucketKey, counts[i]); s++; if (s > size) { - logger.debug("Fast filter optimization applied with size {}", size); + logger.info("3.e1 Fast filter optimization applied with size {}", size); return true; } } } - logger.debug("Fast filter optimization applied"); + logger.info("3.e Fast filter optimization applied"); return true; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java index d8e7229e0d450..77f1f0af6c42e 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -205,7 +205,9 @@ public Rounding.Prepared getRoundingPrepared() { protected void processAfterKey(long[] bound, long interval) { // afterKey is the last bucket key in previous response, and the bucket key // is the minimum of all values in the bucket, so need to add the interval - bound[0] = afterKey + interval; + if (afterKey != -1L) { + bound[0] = afterKey + interval; + } } public int getSize() { diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java index 425432d1ba72d..df29a92c168bf 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java @@ -35,6 +35,7 @@ import org.apache.lucene.document.LongPoint; import org.apache.lucene.index.Term; import org.apache.lucene.search.DocValuesFieldExistsQuery; +import org.apache.lucene.search.FieldExistsQuery; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.TermQuery; import org.opensearch.OpenSearchParseException; @@ -1253,6 +1254,66 @@ public void testMultiValuedWithKeywordLongAndDouble() throws Exception { ); } + public void testDateHistogramSourceWithSize() throws IOException { + final List>> dataset = new ArrayList<>(Arrays.asList( + createDocument("date", asLong("2017-10-20T03:08:45")), + createDocument("date", asLong("2016-09-20T09:00:34")), + createDocument("date", asLong("2016-09-20T11:34:00")), + createDocument("date", asLong("2017-10-20T06:09:24")), + createDocument("date", asLong("2017-10-19T06:09:24")), + createDocument("long", 4L) + )); + testSearchCase( + Arrays.asList( + new MatchAllDocsQuery(), + new FieldExistsQuery("date"), + LongPoint.newRangeQuery("date", asLong("2016-09-20T09:00:34"), asLong("2017-10-20T06:09:24")) + ), + dataset, + () -> { + DateHistogramValuesSourceBuilder histo = new DateHistogramValuesSourceBuilder("date").field("date") + .calendarInterval(DateHistogramInterval.days(1)); + return new CompositeAggregationBuilder("name", Collections.singletonList(histo)).size(1); + }, + (result) -> { + assertEquals(1, result.getBuckets().size()); + assertEquals("{date=1474329600000}", result.afterKey().toString()); // 2017-10-20T00:00:00 + assertEquals("{date=1474329600000}", result.getBuckets().get(0).getKeyAsString()); + assertEquals(2L, result.getBuckets().get(0).getDocCount()); + } + ); + } + + public void testDateHistogramSourceWithDocCountField() throws IOException { + final List>> dataset = new ArrayList<>(Arrays.asList( + createDocument("date", asLong("2017-10-20T03:08:45"), "_doc_count", 5), + createDocument("date", asLong("2016-09-20T09:00:34")), + createDocument("date", asLong("2016-09-20T11:34:00")), + createDocument("date", asLong("2017-10-20T06:09:24")), + createDocument("date", asLong("2017-10-19T06:09:24")), + createDocument("long", 4L) + )); + testSearchCase( + Arrays.asList( + new MatchAllDocsQuery(), + new FieldExistsQuery("date"), + LongPoint.newRangeQuery("date", asLong("2016-09-20T09:00:34"), asLong("2017-10-20T06:09:24")) + ), + dataset, + () -> { + DateHistogramValuesSourceBuilder histo = new DateHistogramValuesSourceBuilder("date").field("date") + .calendarInterval(DateHistogramInterval.days(1)); + return new CompositeAggregationBuilder("name", Collections.singletonList(histo)).size(1); + }, + (result) -> { + assertEquals(1, result.getBuckets().size()); + assertEquals("{date=1474329600000}", result.afterKey().toString()); // 2017-10-20T00:00:00 + assertEquals("{date=1474329600000}", result.getBuckets().get(0).getKeyAsString()); + assertEquals(2L, result.getBuckets().get(0).getDocCount()); + } + ); + } + public void testWithDateHistogram() throws IOException { final List>> dataset = new ArrayList<>(); dataset.addAll( diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java index d33683b0444de..a77da70a65a5f 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java @@ -34,6 +34,7 @@ import org.apache.lucene.document.Document; import org.apache.lucene.document.LongPoint; +import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.document.SortedNumericDocValuesField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; @@ -45,6 +46,7 @@ import org.apache.lucene.tests.index.RandomIndexWriter; import org.opensearch.common.time.DateFormatters; import org.opensearch.index.mapper.DateFieldMapper; +import org.opensearch.index.mapper.DocCountFieldMapper; import org.opensearch.search.aggregations.AggregationBuilder; import org.opensearch.search.aggregations.BucketOrder; import org.opensearch.search.aggregations.bucket.terms.StringTerms; @@ -1252,6 +1254,30 @@ public void testHardBoundsNotOverlapping() throws IOException { ); } + public void testDocCountField() throws IOException { + testSearchCase( + new MatchAllDocsQuery(), + Arrays.asList("2017-02-01", "2017-02-02", "2017-02-02"), + aggregation -> aggregation.calendarInterval(DateHistogramInterval.DAY) + .field(AGGREGABLE_DATE), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(2, buckets.size()); + + Histogram.Bucket bucket = buckets.get(0); + assertEquals("2017-02-01T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(5, bucket.getDocCount()); + + bucket = buckets.get(1); + assertEquals("2017-02-02T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(2, bucket.getDocCount()); + }, + 10000, + false, + true + ); + } + public void testIllegalInterval() throws IOException { IllegalArgumentException e = expectThrows( IllegalArgumentException.class, @@ -1285,13 +1311,29 @@ private void testSearchCase( int maxBucket, boolean useNanosecondResolution ) throws IOException { - boolean aggregableDateIsSearchable = true; + testSearchCase(query, dataset, configure, verify, maxBucket, useNanosecondResolution, false); + } + + private void testSearchCase( + Query query, + List dataset, + Consumer configure, + Consumer verify, + int maxBucket, + boolean useNanosecondResolution, + boolean useDocCountField + ) throws IOException { + boolean aggregableDateIsSearchable = randomBoolean(); DateFieldMapper.DateFieldType fieldType = aggregableDateFieldType(useNanosecondResolution, aggregableDateIsSearchable); try (Directory directory = newDirectory()) { try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) { Document document = new Document(); + if (useDocCountField) { + // add the doc count field to the first document + document.add(new NumericDocValuesField(DocCountFieldMapper.NAME, 5)); + } for (String date : dataset) { long instant = asLong(date, fieldType); document.add(new SortedNumericDocValuesField(AGGREGABLE_DATE, instant)); diff --git a/test/framework/src/main/java/org/opensearch/search/aggregations/composite/BaseCompositeAggregatorTestCase.java b/test/framework/src/main/java/org/opensearch/search/aggregations/composite/BaseCompositeAggregatorTestCase.java index 6b5ec838f401d..31125127f8399 100644 --- a/test/framework/src/main/java/org/opensearch/search/aggregations/composite/BaseCompositeAggregatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/search/aggregations/composite/BaseCompositeAggregatorTestCase.java @@ -14,6 +14,7 @@ import org.apache.lucene.document.InetAddressPoint; import org.apache.lucene.document.IntPoint; import org.apache.lucene.document.LongPoint; +import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.document.SortedNumericDocValuesField; import org.apache.lucene.document.SortedSetDocValuesField; import org.apache.lucene.document.StringField; @@ -40,6 +41,7 @@ import org.opensearch.core.index.Index; import org.opensearch.index.IndexSettings; import org.opensearch.index.mapper.DateFieldMapper; +import org.opensearch.index.mapper.DocCountFieldMapper; import org.opensearch.index.mapper.DocumentMapper; import org.opensearch.index.mapper.IpFieldMapper; import org.opensearch.index.mapper.KeywordFieldMapper; @@ -196,6 +198,12 @@ protected void addToDocument(int id, Document doc, Map> key doc.add(new StringField("id", Integer.toString(id), Field.Store.NO)); for (Map.Entry> entry : keys.entrySet()) { final String name = entry.getKey(); + if (name.equals(DocCountFieldMapper.NAME)) { + doc.add(new IntPoint(name, (int) entry.getValue().get(0))); + // doc count field should be DocValuesType.NUMERIC + doc.add(new NumericDocValuesField(name, (int) entry.getValue().get(0))); + continue; + } for (Object value : entry.getValue()) { if (value instanceof Integer) { doc.add(new SortedNumericDocValuesField(name, (int) value)); From 3f2f1efcaa604a0323069727240a4021349dbaea Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Tue, 30 Jan 2024 15:39:21 -0800 Subject: [PATCH 08/14] clean up Signed-off-by: bowenlan-amzn --- .../bucket/FastFilterRewriteHelper.java | 24 ++++++++++--------- .../composite/CompositeAggregatorTests.java | 24 +++++++++++-------- .../DateHistogramAggregatorTests.java | 3 +-- 3 files changed, 28 insertions(+), 23 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java index 446180651bfcf..947d3a783798a 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java @@ -161,7 +161,7 @@ private static Weight[] createFilterForAggregations( while (roundedLow <= fieldType.convertNanosToMillis(high)) { bucketCount++; if (bucketCount > MAX_NUM_FILTER_BUCKETS) { - logger.info("2.2.2 max number of filters reached [{}]", MAX_NUM_FILTER_BUCKETS); + logger.debug("Max number of filters reached [{}], skip the fast filter optimization", MAX_NUM_FILTER_BUCKETS); return null; } // Below rounding is needed as the interval could return in @@ -225,13 +225,12 @@ public void setAggregationType(AggregationType aggregationType) { public boolean isRewriteable(final Object parent, final int subAggLength) { boolean rewriteable = aggregationType.isRewriteable(parent, subAggLength); - logger.debug("1. Fast filter rewriteable: {} for shard {}", rewriteable, context.indexShard().shardId()); + logger.debug("Fast filter rewriteable: {} for shard {}", rewriteable, context.indexShard().shardId()); this.rewriteable = rewriteable; return rewriteable; } public void buildFastFilter() throws IOException { - logger.info("2. Build filters at shard level"); this.buildFastFilter(FastFilterRewriteHelper::getDateHistoAggBounds); } @@ -239,7 +238,7 @@ private void buildFastFilter(GetBounds getBounds) assert filters == null : "Filters should only be built once, but they are already built"; Weight[] filters = this.aggregationType.buildFastFilter(context, getBounds); if (filters != null) { - logger.info("2.e Fast filter built for shard {}", context.indexShard().shardId()); + logger.debug("Fast filter built for shard {}", context.indexShard().shardId()); filtersBuiltAtShardLevel = true; this.filters = filters; } @@ -300,7 +299,7 @@ public boolean isRewriteable(Object parent, int subAggLength) { public Weight[] buildFastFilter(SearchContext context, GetBounds getBounds) throws IOException { long[] bounds = getBounds.apply(context, fieldType.name()); bounds = processHardBounds(bounds); - logger.info("2.1 Bounds is {} for shard {}", bounds, context.indexShard().shardId()); + logger.debug("Bounds are {} for shard {}", bounds, context.indexShard().shardId()); if (bounds == null) { return null; } @@ -383,8 +382,11 @@ public static boolean tryFastFilterAggregation( NumericDocValues docCountValues = DocValues.getNumeric(ctx.reader(), DocCountFieldMapper.NAME); if (docCountValues.nextDoc() != NO_MORE_DOCS) { - logger.info("Shard {} segment {} has at least one document with _doc_count field, skip fast filter optimization", - fastFilterContext.context.indexShard().shardId(),ctx.ord); + logger.debug( + "Shard {} segment {} has at least one document with _doc_count field, skip fast filter optimization", + fastFilterContext.context.indexShard().shardId(), + ctx.ord + ); return false; } @@ -392,8 +394,8 @@ public static boolean tryFastFilterAggregation( // check if the query is functionally match-all at segment level if (!fastFilterContext.filtersBuiltAtShardLevel && !segmentMatchAll(fastFilterContext.context, ctx)) return false; if (!fastFilterContext.filterBuilt()) { - logger.info( - "3.1 Shard {} segment {} functionally match all documents. Build the fast filter", + logger.debug( + "Shard {} segment {} functionally match all documents. Build the fast filter", fastFilterContext.context.indexShard().shardId(), ctx.ord ); @@ -431,13 +433,13 @@ public static boolean tryFastFilterAggregation( incrementDocCount.accept(bucketKey, counts[i]); s++; if (s > size) { - logger.info("3.e1 Fast filter optimization applied with size {}", size); + logger.debug("Fast filter optimization applied to composite aggregation with size {}", size); return true; } } } - logger.info("3.e Fast filter optimization applied"); + logger.debug("Fast filter optimization applied"); return true; } diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java index df29a92c168bf..dabe83af0efa0 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java @@ -1255,14 +1255,16 @@ public void testMultiValuedWithKeywordLongAndDouble() throws Exception { } public void testDateHistogramSourceWithSize() throws IOException { - final List>> dataset = new ArrayList<>(Arrays.asList( + final List>> dataset = new ArrayList<>( + Arrays.asList( createDocument("date", asLong("2017-10-20T03:08:45")), createDocument("date", asLong("2016-09-20T09:00:34")), createDocument("date", asLong("2016-09-20T11:34:00")), createDocument("date", asLong("2017-10-20T06:09:24")), createDocument("date", asLong("2017-10-19T06:09:24")), createDocument("long", 4L) - )); + ) + ); testSearchCase( Arrays.asList( new MatchAllDocsQuery(), @@ -1285,14 +1287,16 @@ public void testDateHistogramSourceWithSize() throws IOException { } public void testDateHistogramSourceWithDocCountField() throws IOException { - final List>> dataset = new ArrayList<>(Arrays.asList( - createDocument("date", asLong("2017-10-20T03:08:45"), "_doc_count", 5), - createDocument("date", asLong("2016-09-20T09:00:34")), - createDocument("date", asLong("2016-09-20T11:34:00")), - createDocument("date", asLong("2017-10-20T06:09:24")), - createDocument("date", asLong("2017-10-19T06:09:24")), - createDocument("long", 4L) - )); + final List>> dataset = new ArrayList<>( + Arrays.asList( + createDocument("date", asLong("2017-10-20T03:08:45"), "_doc_count", 5), + createDocument("date", asLong("2016-09-20T09:00:34")), + createDocument("date", asLong("2016-09-20T11:34:00")), + createDocument("date", asLong("2017-10-20T06:09:24")), + createDocument("date", asLong("2017-10-19T06:09:24")), + createDocument("long", 4L) + ) + ); testSearchCase( Arrays.asList( new MatchAllDocsQuery(), diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java index a77da70a65a5f..31e3a62df28f1 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java @@ -1258,8 +1258,7 @@ public void testDocCountField() throws IOException { testSearchCase( new MatchAllDocsQuery(), Arrays.asList("2017-02-01", "2017-02-02", "2017-02-02"), - aggregation -> aggregation.calendarInterval(DateHistogramInterval.DAY) - .field(AGGREGABLE_DATE), + aggregation -> aggregation.calendarInterval(DateHistogramInterval.DAY).field(AGGREGABLE_DATE), histogram -> { List buckets = histogram.getBuckets(); assertEquals(2, buckets.size()); From 3da288f511b99577d9d18ca16ab5edcae3a0c381 Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Tue, 30 Jan 2024 16:07:26 -0800 Subject: [PATCH 09/14] clean up Signed-off-by: bowenlan-amzn --- .../composite/CompositeAggregatorTests.java | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java index dabe83af0efa0..9227f622345d8 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java @@ -1291,9 +1291,9 @@ public void testDateHistogramSourceWithDocCountField() throws IOException { Arrays.asList( createDocument("date", asLong("2017-10-20T03:08:45"), "_doc_count", 5), createDocument("date", asLong("2016-09-20T09:00:34")), - createDocument("date", asLong("2016-09-20T11:34:00")), + createDocument("date", asLong("2016-09-20T11:34:00"), "_doc_count", 2), createDocument("date", asLong("2017-10-20T06:09:24")), - createDocument("date", asLong("2017-10-19T06:09:24")), + createDocument("date", asLong("2017-10-19T06:09:24"), "_doc_count", 3), createDocument("long", 4L) ) ); @@ -1307,13 +1307,17 @@ public void testDateHistogramSourceWithDocCountField() throws IOException { () -> { DateHistogramValuesSourceBuilder histo = new DateHistogramValuesSourceBuilder("date").field("date") .calendarInterval(DateHistogramInterval.days(1)); - return new CompositeAggregationBuilder("name", Collections.singletonList(histo)).size(1); + return new CompositeAggregationBuilder("name", Collections.singletonList(histo)); }, (result) -> { - assertEquals(1, result.getBuckets().size()); - assertEquals("{date=1474329600000}", result.afterKey().toString()); // 2017-10-20T00:00:00 + assertEquals(3, result.getBuckets().size()); + assertEquals("{date=1508457600000}", result.afterKey().toString()); assertEquals("{date=1474329600000}", result.getBuckets().get(0).getKeyAsString()); - assertEquals(2L, result.getBuckets().get(0).getDocCount()); + assertEquals(3L, result.getBuckets().get(0).getDocCount()); + assertEquals("{date=1508371200000}", result.getBuckets().get(1).getKeyAsString()); + assertEquals(3L, result.getBuckets().get(1).getDocCount()); + assertEquals("{date=1508457600000}", result.getBuckets().get(2).getKeyAsString()); + assertEquals(6L, result.getBuckets().get(2).getDocCount()); } ); } @@ -1340,7 +1344,7 @@ public void testWithDateHistogram() throws IOException { () -> { DateHistogramValuesSourceBuilder histo = new DateHistogramValuesSourceBuilder("date").field("date") .calendarInterval(DateHistogramInterval.days(1)); - return new CompositeAggregationBuilder("name", Collections.singletonList(histo)).size(3); + return new CompositeAggregationBuilder("name", Collections.singletonList(histo)); }, (result) -> { assertEquals(3, result.getBuckets().size()); From b6ec7567b0fdc8414a38afc88aed82b336d283c9 Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Thu, 1 Feb 2024 09:37:41 -0800 Subject: [PATCH 10/14] check if field is indexed directly Signed-off-by: bowenlan-amzn --- .../bucket/FastFilterRewriteHelper.java | 19 ++++++++++++------- .../bucket/composite/CompositeAggregator.java | 2 +- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java index 947d3a783798a..5ead3abe77cb7 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java @@ -97,12 +97,10 @@ private static long[] getIndexBounds(final SearchContext context, final String f long min = Long.MAX_VALUE, max = Long.MIN_VALUE; for (LeafReaderContext leaf : leaves) { final PointValues values = leaf.reader().getPointValues(fieldName); + // "values" is null here means this segment doesn't have any values for this field if (values != null) { min = Math.min(min, NumericUtils.sortableBytesToLong(values.getMinPackedValue(), 0)); max = Math.max(max, NumericUtils.sortableBytesToLong(values.getMaxPackedValue(), 0)); - } else { - // "values" is null if the field is not indexed - return null; } } @@ -124,6 +122,7 @@ public static long[] getDateHistoAggBounds(final SearchContext context, final St // Ensure that the query and aggregation are on the same field if (prq.getField().equals(fieldName)) { final long[] indexBounds = getIndexBounds(context, fieldName); + if (indexBounds == null) return null; return new long[] { // Minimum bound for aggregation is the max between query and global Math.max(NumericUtils.sortableBytesToLong(prq.getLowerPoint(), 0), indexBounds[0]), @@ -212,13 +211,17 @@ public static class FastFilterContext { private Weight[] filters = null; private boolean filtersBuiltAtShardLevel = false; - public AggregationType aggregationType; + private AggregationType aggregationType; private final SearchContext context; public FastFilterContext(SearchContext context) { this.context = context; } + public AggregationType getAggregationType() { + return aggregationType; + } + public void setAggregationType(AggregationType aggregationType) { this.aggregationType = aggregationType; } @@ -252,7 +255,7 @@ private boolean filterBuilt() { /** * Different types have different pre-conditions, filter building logic, etc. */ - public interface AggregationType { + interface AggregationType { boolean isRewriteable(Object parent, int subAggLength); @@ -263,7 +266,7 @@ public interface AggregationType { * Functional interface for getting bounds for date histogram aggregation */ @FunctionalInterface - public interface GetBounds { + interface GetBounds { R apply(T t, U u) throws IOException; } @@ -290,7 +293,9 @@ public AbstractDateHistogramAggregationType(MappedFieldType fieldType, boolean m @Override public boolean isRewriteable(Object parent, int subAggLength) { if (parent == null && subAggLength == 0 && !missing && !hasScript) { - return fieldType != null && fieldType instanceof DateFieldMapper.DateFieldType; + if (fieldType != null && fieldType instanceof DateFieldMapper.DateFieldType) { + return fieldType.isSearchable(); + } } return false; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java index 77f1f0af6c42e..7034a00ed14f8 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -170,7 +170,7 @@ public final class CompositeAggregator extends BucketsAggregator { if (fastFilterContext.isRewriteable(parent, subAggregators.length)) { // bucketOrds is used for saving date histogram results bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), CardinalityUpperBound.ONE); - preparedRounding = ((CompositeAggregationType) fastFilterContext.aggregationType).getRoundingPrepared(); + preparedRounding = ((CompositeAggregationType) fastFilterContext.getAggregationType()).getRoundingPrepared(); fastFilterContext.buildFastFilter(); } } From 8573fe57066d6a0e1a4c68e12a94a5fef87d4700 Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Fri, 2 Feb 2024 10:55:45 -0800 Subject: [PATCH 11/14] Only build filter once at segment level Signed-off-by: bowenlan-amzn --- .../bucket/FastFilterRewriteHelper.java | 94 +++++++++++-------- .../DateHistogramAggregatorTests.java | 71 ++++++++++++++ 2 files changed, 128 insertions(+), 37 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java index 5ead3abe77cb7..09fde5d19892b 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.document.LongPoint; import org.apache.lucene.index.DocValues; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.NumericDocValues; @@ -23,6 +24,7 @@ import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.Weight; import org.apache.lucene.util.NumericUtils; +import org.opensearch.common.CheckedBiFunction; import org.opensearch.common.Rounding; import org.opensearch.common.lucene.search.function.FunctionScoreQuery; import org.opensearch.index.mapper.DateFieldMapper; @@ -97,7 +99,6 @@ private static long[] getIndexBounds(final SearchContext context, final String f long min = Long.MAX_VALUE, max = Long.MIN_VALUE; for (LeafReaderContext leaf : leaves) { final PointValues values = leaf.reader().getPointValues(fieldName); - // "values" is null here means this segment doesn't have any values for this field if (values != null) { min = Math.min(min, NumericUtils.sortableBytesToLong(values.getMinPackedValue(), 0)); max = Math.max(max, NumericUtils.sortableBytesToLong(values.getMaxPackedValue(), 0)); @@ -119,16 +120,9 @@ public static long[] getDateHistoAggBounds(final SearchContext context, final St final Query cq = unwrapIntoConcreteQuery(context.query()); if (cq instanceof PointRangeQuery) { final PointRangeQuery prq = (PointRangeQuery) cq; - // Ensure that the query and aggregation are on the same field - if (prq.getField().equals(fieldName)) { - final long[] indexBounds = getIndexBounds(context, fieldName); - if (indexBounds == null) return null; - return new long[] { - // Minimum bound for aggregation is the max between query and global - Math.max(NumericUtils.sortableBytesToLong(prq.getLowerPoint(), 0), indexBounds[0]), - // Maximum bound for aggregation is the min between query and global - Math.min(NumericUtils.sortableBytesToLong(prq.getUpperPoint(), 0), indexBounds[1]) }; - } + final long[] indexBounds = getIndexBounds(context, fieldName); + if (indexBounds == null) return null; + return getBoundsWithRangeQuery(prq, fieldName, indexBounds); } else if (cq instanceof MatchAllDocsQuery) { return getIndexBounds(context, fieldName); } else if (cq instanceof FieldExistsQuery) { @@ -141,6 +135,32 @@ public static long[] getDateHistoAggBounds(final SearchContext context, final St return null; } + private static long[] getDateHistoAggBoundsSegLevel(final SearchContext context, final String fieldName) throws IOException { + final long[] indexBounds = getIndexBounds(context, fieldName); + if (indexBounds == null) return null; + final Query cq = unwrapIntoConcreteQuery(context.query()); + if (cq instanceof PointRangeQuery) { + final PointRangeQuery prq = (PointRangeQuery) cq; + return getBoundsWithRangeQuery(prq, fieldName, indexBounds); + } + return indexBounds; + } + + private static long[] getBoundsWithRangeQuery(PointRangeQuery prq, String fieldName, long[] indexBounds) { + // Ensure that the query and aggregation are on the same field + if (prq.getField().equals(fieldName)) { + // Minimum bound for aggregation is the max between query and global + long lower = Math.max(NumericUtils.sortableBytesToLong(prq.getLowerPoint(), 0), indexBounds[0]); + // Maximum bound for aggregation is the min between query and global + long upper = Math.min(NumericUtils.sortableBytesToLong(prq.getUpperPoint(), 0), indexBounds[1]); + if (lower > upper) { + return null; + } + return new long[]{lower, upper}; + } + return null; + } + /** * Creates the date range filters for aggregations using the interval, min/max * bounds and prepared rounding @@ -192,7 +212,7 @@ private static Weight[] createFilterForAggregations( filters[i++] = context.searcher().createWeight(new PointRangeQuery(fieldType.name(), lower, upper, 1) { @Override protected String toString(int dimension, byte[] value) { - return null; + return Long.toString(LongPoint.decodeDimension(value, 0)); } }, ScoreMode.COMPLETE_NO_SCORES, 1); } @@ -210,6 +230,7 @@ public static class FastFilterContext { private boolean rewriteable = false; private Weight[] filters = null; private boolean filtersBuiltAtShardLevel = false; + private boolean shouldBuildFiltersAtSegmentLevel = true; private AggregationType aggregationType; private final SearchContext context; @@ -234,21 +255,17 @@ public boolean isRewriteable(final Object parent, final int subAggLength) { } public void buildFastFilter() throws IOException { - this.buildFastFilter(FastFilterRewriteHelper::getDateHistoAggBounds); - } - - private void buildFastFilter(GetBounds getBounds) throws IOException { - assert filters == null : "Filters should only be built once, but they are already built"; - Weight[] filters = this.aggregationType.buildFastFilter(context, getBounds); + this.filters = this.buildFastFilter(FastFilterRewriteHelper::getDateHistoAggBounds); if (filters != null) { logger.debug("Fast filter built for shard {}", context.indexShard().shardId()); filtersBuiltAtShardLevel = true; - this.filters = filters; } } - private boolean filterBuilt() { - return filters != null; + // This method can also be used at segment level + private Weight[] buildFastFilter(CheckedBiFunction getBounds) throws IOException { + assert filters == null : "Filters should only be built once, but they are already built"; + return this.aggregationType.buildFastFilter(context, getBounds); } } @@ -259,15 +276,8 @@ interface AggregationType { boolean isRewriteable(Object parent, int subAggLength); - Weight[] buildFastFilter(SearchContext ctx, GetBounds getBounds) throws IOException; - } - - /** - * Functional interface for getting bounds for date histogram aggregation - */ - @FunctionalInterface - interface GetBounds { - R apply(T t, U u) throws IOException; + Weight[] buildFastFilter(SearchContext ctx, CheckedBiFunction getBounds) + throws IOException; } /** @@ -301,7 +311,8 @@ public boolean isRewriteable(Object parent, int subAggLength) { } @Override - public Weight[] buildFastFilter(SearchContext context, GetBounds getBounds) throws IOException { + public Weight[] buildFastFilter(SearchContext context, CheckedBiFunction getBounds) + throws IOException { long[] bounds = getBounds.apply(context, fieldType.name()); bounds = processHardBounds(bounds); logger.debug("Bounds are {} for shard {}", bounds, context.indexShard().shardId()); @@ -383,7 +394,9 @@ public static boolean tryFastFilterAggregation( final BiConsumer incrementDocCount ) throws IOException { if (fastFilterContext == null) return false; - if (!fastFilterContext.rewriteable) return false; + if (!fastFilterContext.rewriteable || !fastFilterContext.shouldBuildFiltersAtSegmentLevel) { + return false; + } NumericDocValues docCountValues = DocValues.getNumeric(ctx.reader(), DocCountFieldMapper.NAME); if (docCountValues.nextDoc() != NO_MORE_DOCS) { @@ -397,18 +410,25 @@ public static boolean tryFastFilterAggregation( // if no filters built at shard level (see getDateHistoAggBounds method for possible reasons) // check if the query is functionally match-all at segment level - if (!fastFilterContext.filtersBuiltAtShardLevel && !segmentMatchAll(fastFilterContext.context, ctx)) return false; - if (!fastFilterContext.filterBuilt()) { + if (!fastFilterContext.filtersBuiltAtShardLevel && !segmentMatchAll(fastFilterContext.context, ctx)) { + return false; + } + Weight[] filters = fastFilterContext.filters; + if (filters == null) { logger.debug( "Shard {} segment {} functionally match all documents. Build the fast filter", fastFilterContext.context.indexShard().shardId(), ctx.ord ); - fastFilterContext.buildFastFilter(FastFilterRewriteHelper::getIndexBounds); + filters = fastFilterContext.buildFastFilter(FastFilterRewriteHelper::getDateHistoAggBoundsSegLevel); + if (filters == null) { + // At segment level, build filter should only be called once + // since the conditions for build filter won't change for other segments + fastFilterContext.shouldBuildFiltersAtSegmentLevel = false; + return false; + } } - if (!fastFilterContext.filterBuilt()) return false; - final Weight[] filters = fastFilterContext.filters; final int[] counts = new int[filters.length]; int i; for (i = 0; i < filters.length; i++) { diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java index 31e3a62df28f1..2a4616da90d5e 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java @@ -1254,6 +1254,76 @@ public void testHardBoundsNotOverlapping() throws IOException { ); } + public void testRangeQuery() throws IOException { + testSearchCase( + LongPoint.newRangeQuery(SEARCHABLE_DATE, asLong("2018-01-01"), asLong("2020-01-01")), + Arrays.asList("2017-02-01", "2017-02-02", "2017-02-02", "2017-02-03", "2017-02-03", "2017-02-03", "2017-02-05"), + aggregation -> aggregation.calendarInterval(DateHistogramInterval.DAY) + .field(AGGREGABLE_DATE), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(0, buckets.size()); + }, + false + ); + + testSearchCase( + LongPoint.newRangeQuery(SEARCHABLE_DATE, asLong("2016-01-01"), asLong("2017-01-01")), + Arrays.asList("2017-02-01", "2017-02-02", "2017-02-02", "2017-02-03", "2017-02-03", "2017-02-03", "2017-02-05"), + aggregation -> aggregation.calendarInterval(DateHistogramInterval.DAY) + .field(AGGREGABLE_DATE), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(0, buckets.size()); + }, + false + ); + + testSearchCase( + LongPoint.newRangeQuery(SEARCHABLE_DATE, asLong("2016-01-01"), asLong("2017-02-02")), + Arrays.asList("2017-02-01", "2017-02-02", "2017-02-02", "2017-02-03", "2017-02-03", "2017-02-03", "2017-02-05"), + aggregation -> aggregation.calendarInterval(DateHistogramInterval.DAY) + .field(AGGREGABLE_DATE), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(2, buckets.size()); + + Histogram.Bucket bucket = buckets.get(0); + assertEquals("2017-02-01T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(1); + assertEquals("2017-02-02T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(2, bucket.getDocCount()); + }, + false + ); + + testSearchCase( + LongPoint.newRangeQuery(SEARCHABLE_DATE, asLong("2017-02-03"), asLong("2020-01-01")), + Arrays.asList("2017-02-01", "2017-02-02", "2017-02-02", "2017-02-03", "2017-02-03", "2017-02-03", "2017-02-05"), + aggregation -> aggregation.calendarInterval(DateHistogramInterval.DAY) + .field(AGGREGABLE_DATE), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(3, buckets.size()); + + Histogram.Bucket bucket = buckets.get(0); + assertEquals("2017-02-03T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(3, bucket.getDocCount()); + + bucket = buckets.get(1); + assertEquals("2017-02-04T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(0, bucket.getDocCount()); + + bucket = buckets.get(2); + assertEquals("2017-02-05T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + }, + false + ); + } + public void testDocCountField() throws IOException { testSearchCase( new MatchAllDocsQuery(), @@ -1323,6 +1393,7 @@ private void testSearchCase( boolean useDocCountField ) throws IOException { boolean aggregableDateIsSearchable = randomBoolean(); + logger.debug("Aggregable date is searchable {}", aggregableDateIsSearchable); DateFieldMapper.DateFieldType fieldType = aggregableDateFieldType(useNanosecondResolution, aggregableDateIsSearchable); try (Directory directory = newDirectory()) { From 710688aea3dddd1d5375f36c1e4cd1887cbeeb08 Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Fri, 2 Feb 2024 11:27:28 -0800 Subject: [PATCH 12/14] auto date histo get rounding method should be idempotent Signed-off-by: bowenlan-amzn --- .../aggregations/bucket/FastFilterRewriteHelper.java | 2 +- .../histogram/AutoDateHistogramAggregator.java | 2 ++ .../histogram/DateHistogramAggregatorTests.java | 12 ++++-------- 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java index 09fde5d19892b..2d823f155ebc7 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java @@ -156,7 +156,7 @@ private static long[] getBoundsWithRangeQuery(PointRangeQuery prq, String fieldN if (lower > upper) { return null; } - return new long[]{lower, upper}; + return new long[] { lower, upper }; } return null; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java index a76a05fb3985c..12aefc540e75c 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java @@ -183,6 +183,8 @@ protected Rounding getRounding(final long low, final long high) { // since we cannot exceed targetBuckets, bestDuration should go up, // so the right innerInterval should be an upper bound long bestDuration = (high - low) / targetBuckets; + // reset so this function is idempotent + roundingIdx = 0; while (roundingIdx < roundingInfos.length - 1) { final RoundingInfo curRoundingInfo = roundingInfos[roundingIdx]; final int temp = curRoundingInfo.innerIntervals[curRoundingInfo.innerIntervals.length - 1]; diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java index 2a4616da90d5e..05e448cec5fcf 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java @@ -1258,8 +1258,7 @@ public void testRangeQuery() throws IOException { testSearchCase( LongPoint.newRangeQuery(SEARCHABLE_DATE, asLong("2018-01-01"), asLong("2020-01-01")), Arrays.asList("2017-02-01", "2017-02-02", "2017-02-02", "2017-02-03", "2017-02-03", "2017-02-03", "2017-02-05"), - aggregation -> aggregation.calendarInterval(DateHistogramInterval.DAY) - .field(AGGREGABLE_DATE), + aggregation -> aggregation.calendarInterval(DateHistogramInterval.DAY).field(AGGREGABLE_DATE), histogram -> { List buckets = histogram.getBuckets(); assertEquals(0, buckets.size()); @@ -1270,8 +1269,7 @@ public void testRangeQuery() throws IOException { testSearchCase( LongPoint.newRangeQuery(SEARCHABLE_DATE, asLong("2016-01-01"), asLong("2017-01-01")), Arrays.asList("2017-02-01", "2017-02-02", "2017-02-02", "2017-02-03", "2017-02-03", "2017-02-03", "2017-02-05"), - aggregation -> aggregation.calendarInterval(DateHistogramInterval.DAY) - .field(AGGREGABLE_DATE), + aggregation -> aggregation.calendarInterval(DateHistogramInterval.DAY).field(AGGREGABLE_DATE), histogram -> { List buckets = histogram.getBuckets(); assertEquals(0, buckets.size()); @@ -1282,8 +1280,7 @@ public void testRangeQuery() throws IOException { testSearchCase( LongPoint.newRangeQuery(SEARCHABLE_DATE, asLong("2016-01-01"), asLong("2017-02-02")), Arrays.asList("2017-02-01", "2017-02-02", "2017-02-02", "2017-02-03", "2017-02-03", "2017-02-03", "2017-02-05"), - aggregation -> aggregation.calendarInterval(DateHistogramInterval.DAY) - .field(AGGREGABLE_DATE), + aggregation -> aggregation.calendarInterval(DateHistogramInterval.DAY).field(AGGREGABLE_DATE), histogram -> { List buckets = histogram.getBuckets(); assertEquals(2, buckets.size()); @@ -1302,8 +1299,7 @@ public void testRangeQuery() throws IOException { testSearchCase( LongPoint.newRangeQuery(SEARCHABLE_DATE, asLong("2017-02-03"), asLong("2020-01-01")), Arrays.asList("2017-02-01", "2017-02-02", "2017-02-02", "2017-02-03", "2017-02-03", "2017-02-03", "2017-02-05"), - aggregation -> aggregation.calendarInterval(DateHistogramInterval.DAY) - .field(AGGREGABLE_DATE), + aggregation -> aggregation.calendarInterval(DateHistogramInterval.DAY).field(AGGREGABLE_DATE), histogram -> { List buckets = histogram.getBuckets(); assertEquals(3, buckets.size()); From 7aeee3d3df95e5ea15490c8bc312c6c8e805eca8 Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Tue, 6 Feb 2024 10:24:01 -0800 Subject: [PATCH 13/14] handle segment level bounds separately Signed-off-by: bowenlan-amzn --- .../bucket/FastFilterRewriteHelper.java | 83 ++++++++++++------- .../bucket/composite/CompositeAggregator.java | 5 +- .../DateHistogramAggregatorTests.java | 44 +++++++--- 3 files changed, 92 insertions(+), 40 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java index 2d823f155ebc7..56c4e8ad45e28 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java @@ -31,7 +31,6 @@ import org.opensearch.index.mapper.DocCountFieldMapper; import org.opensearch.index.mapper.MappedFieldType; import org.opensearch.index.query.DateRangeIncludingNowQuery; -import org.opensearch.search.aggregations.bucket.composite.CompositeAggregator; import org.opensearch.search.aggregations.bucket.composite.CompositeValuesSourceConfig; import org.opensearch.search.aggregations.bucket.composite.RoundingValuesSource; import org.opensearch.search.aggregations.bucket.histogram.LongBounds; @@ -90,11 +89,11 @@ private static Query unwrapIntoConcreteQuery(Query query) { } /** - * Finds the global min and max bounds of the field for the shard from each segment + * Finds the global min and max bounds of the field for the shard across all segments * * @return null if the field is empty or not indexed */ - private static long[] getIndexBounds(final SearchContext context, final String fieldName) throws IOException { + private static long[] getShardBounds(final SearchContext context, final String fieldName) throws IOException { final List leaves = context.searcher().getIndexReader().leaves(); long min = Long.MAX_VALUE, max = Long.MIN_VALUE; for (LeafReaderContext leaf : leaves) { @@ -111,6 +110,25 @@ private static long[] getIndexBounds(final SearchContext context, final String f return new long[] { min, max }; } + /** + * Finds the min and max bounds of the field for the segment + * + * @return null if the field is empty or not indexed + */ + private static long[] getSegmentBounds(final LeafReaderContext context, final String fieldName) throws IOException { + long min = Long.MAX_VALUE, max = Long.MIN_VALUE; + final PointValues values = context.reader().getPointValues(fieldName); + if (values != null) { + min = Math.min(min, NumericUtils.sortableBytesToLong(values.getMinPackedValue(), 0)); + max = Math.max(max, NumericUtils.sortableBytesToLong(values.getMaxPackedValue(), 0)); + } + + if (min == Long.MAX_VALUE || max == Long.MIN_VALUE) { + return null; + } + return new long[] { min, max }; + } + /** * This method also acts as a pre-condition check for the optimization * @@ -120,32 +138,21 @@ public static long[] getDateHistoAggBounds(final SearchContext context, final St final Query cq = unwrapIntoConcreteQuery(context.query()); if (cq instanceof PointRangeQuery) { final PointRangeQuery prq = (PointRangeQuery) cq; - final long[] indexBounds = getIndexBounds(context, fieldName); + final long[] indexBounds = getShardBounds(context, fieldName); if (indexBounds == null) return null; return getBoundsWithRangeQuery(prq, fieldName, indexBounds); } else if (cq instanceof MatchAllDocsQuery) { - return getIndexBounds(context, fieldName); + return getShardBounds(context, fieldName); } else if (cq instanceof FieldExistsQuery) { // when a range query covers all values of a shard, it will be rewrite field exists query if (((FieldExistsQuery) cq).getField().equals(fieldName)) { - return getIndexBounds(context, fieldName); + return getShardBounds(context, fieldName); } } return null; } - private static long[] getDateHistoAggBoundsSegLevel(final SearchContext context, final String fieldName) throws IOException { - final long[] indexBounds = getIndexBounds(context, fieldName); - if (indexBounds == null) return null; - final Query cq = unwrapIntoConcreteQuery(context.query()); - if (cq instanceof PointRangeQuery) { - final PointRangeQuery prq = (PointRangeQuery) cq; - return getBoundsWithRangeQuery(prq, fieldName, indexBounds); - } - return indexBounds; - } - private static long[] getBoundsWithRangeQuery(PointRangeQuery prq, String fieldName, long[] indexBounds) { // Ensure that the query and aggregation are on the same field if (prq.getField().equals(fieldName)) { @@ -158,6 +165,7 @@ private static long[] getBoundsWithRangeQuery(PointRangeQuery prq, String fieldN } return new long[] { lower, upper }; } + return null; } @@ -230,7 +238,6 @@ public static class FastFilterContext { private boolean rewriteable = false; private Weight[] filters = null; private boolean filtersBuiltAtShardLevel = false; - private boolean shouldBuildFiltersAtSegmentLevel = true; private AggregationType aggregationType; private final SearchContext context; @@ -278,6 +285,10 @@ interface AggregationType { Weight[] buildFastFilter(SearchContext ctx, CheckedBiFunction getBounds) throws IOException; + + default int getSize() { + return Integer.MAX_VALUE; + } } /** @@ -314,8 +325,23 @@ public boolean isRewriteable(Object parent, int subAggLength) { public Weight[] buildFastFilter(SearchContext context, CheckedBiFunction getBounds) throws IOException { long[] bounds = getBounds.apply(context, fieldType.name()); - bounds = processHardBounds(bounds); logger.debug("Bounds are {} for shard {}", bounds, context.indexShard().shardId()); + return buildFastFilter(context, bounds); + } + + private Weight[] buildFastFilterWithSegBounds( + SearchContext context, + CheckedBiFunction getBounds, + LeafReaderContext leaf + ) throws IOException { + long[] bounds = getBounds.apply(leaf, fieldType.name()); + logger.debug("Bounds are {} for shard {} segment {}", bounds, context.indexShard().shardId(), leaf.ord); + return buildFastFilter(context, bounds); + } + + private Weight[] buildFastFilter(SearchContext context, long[] bounds) throws IOException { + bounds = processHardBounds(bounds); + logger.debug("Bounds are {} for shard {} with hard bound", bounds, context.indexShard().shardId()); if (bounds == null) { return null; } @@ -394,7 +420,7 @@ public static boolean tryFastFilterAggregation( final BiConsumer incrementDocCount ) throws IOException { if (fastFilterContext == null) return false; - if (!fastFilterContext.rewriteable || !fastFilterContext.shouldBuildFiltersAtSegmentLevel) { + if (!fastFilterContext.rewriteable) { return false; } @@ -420,11 +446,15 @@ public static boolean tryFastFilterAggregation( fastFilterContext.context.indexShard().shardId(), ctx.ord ); - filters = fastFilterContext.buildFastFilter(FastFilterRewriteHelper::getDateHistoAggBoundsSegLevel); + if (fastFilterContext.aggregationType instanceof AbstractDateHistogramAggregationType) { + filters = ((AbstractDateHistogramAggregationType) fastFilterContext.aggregationType).buildFastFilterWithSegBounds( + fastFilterContext.context, + FastFilterRewriteHelper::getSegmentBounds, + ctx + ); + } if (filters == null) { - // At segment level, build filter should only be called once - // since the conditions for build filter won't change for other segments - fastFilterContext.shouldBuildFiltersAtSegmentLevel = false; + return false; } } @@ -441,7 +471,7 @@ public static boolean tryFastFilterAggregation( } int s = 0; - int size = Integer.MAX_VALUE; + int size = fastFilterContext.aggregationType.getSize(); for (i = 0; i < filters.length; i++) { if (counts[i] > 0) { long bucketKey = i; // the index of filters is the key for filters aggregation @@ -451,9 +481,6 @@ public static boolean tryFastFilterAggregation( bucketKey = fieldType.convertNanosToMillis( NumericUtils.sortableBytesToLong(((PointRangeQuery) filters[i].getQuery()).getLowerPoint(), 0) ); - if (fastFilterContext.aggregationType instanceof CompositeAggregator.CompositeAggregationType) { - size = ((CompositeAggregator.CompositeAggregationType) fastFilterContext.aggregationType).getSize(); - } } incrementDocCount.accept(bucketKey, counts[i]); s++; diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java index 7034a00ed14f8..b97c814cdf645 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -97,7 +97,7 @@ * * @opensearch.internal */ -public final class CompositeAggregator extends BucketsAggregator { +final class CompositeAggregator extends BucketsAggregator { private final int size; private final List sourceNames; private final int[] reverseMuls; @@ -178,7 +178,7 @@ public final class CompositeAggregator extends BucketsAggregator { /** * Currently the filter rewrite is only supported for date histograms */ - public class CompositeAggregationType extends FastFilterRewriteHelper.AbstractDateHistogramAggregationType { + private class CompositeAggregationType extends FastFilterRewriteHelper.AbstractDateHistogramAggregationType { private final RoundingValuesSource valuesSource; private long afterKey = -1L; @@ -210,6 +210,7 @@ protected void processAfterKey(long[] bound, long interval) { } } + @Override public int getSize() { return size; } diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java index 05e448cec5fcf..2a4fbca7a8541 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java @@ -1254,31 +1254,37 @@ public void testHardBoundsNotOverlapping() throws IOException { ); } - public void testRangeQuery() throws IOException { + public void testFilterRewriteOptimizationWithRangeQuery() throws IOException { testSearchCase( - LongPoint.newRangeQuery(SEARCHABLE_DATE, asLong("2018-01-01"), asLong("2020-01-01")), + LongPoint.newRangeQuery(AGGREGABLE_DATE, asLong("2018-01-01"), asLong("2020-01-01")), Arrays.asList("2017-02-01", "2017-02-02", "2017-02-02", "2017-02-03", "2017-02-03", "2017-02-03", "2017-02-05"), aggregation -> aggregation.calendarInterval(DateHistogramInterval.DAY).field(AGGREGABLE_DATE), histogram -> { List buckets = histogram.getBuckets(); assertEquals(0, buckets.size()); }, - false + 10000, + false, + false, + true // force AGGREGABLE_DATE field to be searchable to test the filter rewrite optimization path ); testSearchCase( - LongPoint.newRangeQuery(SEARCHABLE_DATE, asLong("2016-01-01"), asLong("2017-01-01")), + LongPoint.newRangeQuery(AGGREGABLE_DATE, asLong("2016-01-01"), asLong("2017-01-01")), Arrays.asList("2017-02-01", "2017-02-02", "2017-02-02", "2017-02-03", "2017-02-03", "2017-02-03", "2017-02-05"), aggregation -> aggregation.calendarInterval(DateHistogramInterval.DAY).field(AGGREGABLE_DATE), histogram -> { List buckets = histogram.getBuckets(); assertEquals(0, buckets.size()); }, - false + 10000, + false, + false, + true ); testSearchCase( - LongPoint.newRangeQuery(SEARCHABLE_DATE, asLong("2016-01-01"), asLong("2017-02-02")), + LongPoint.newRangeQuery(AGGREGABLE_DATE, asLong("2016-01-01"), asLong("2017-02-02")), Arrays.asList("2017-02-01", "2017-02-02", "2017-02-02", "2017-02-03", "2017-02-03", "2017-02-03", "2017-02-05"), aggregation -> aggregation.calendarInterval(DateHistogramInterval.DAY).field(AGGREGABLE_DATE), histogram -> { @@ -1293,11 +1299,14 @@ public void testRangeQuery() throws IOException { assertEquals("2017-02-02T00:00:00.000Z", bucket.getKeyAsString()); assertEquals(2, bucket.getDocCount()); }, - false + 10000, + false, + false, + true ); testSearchCase( - LongPoint.newRangeQuery(SEARCHABLE_DATE, asLong("2017-02-03"), asLong("2020-01-01")), + LongPoint.newRangeQuery(AGGREGABLE_DATE, asLong("2017-02-03"), asLong("2020-01-01")), Arrays.asList("2017-02-01", "2017-02-02", "2017-02-02", "2017-02-03", "2017-02-03", "2017-02-03", "2017-02-05"), aggregation -> aggregation.calendarInterval(DateHistogramInterval.DAY).field(AGGREGABLE_DATE), histogram -> { @@ -1316,7 +1325,10 @@ public void testRangeQuery() throws IOException { assertEquals("2017-02-05T00:00:00.000Z", bucket.getKeyAsString()); assertEquals(1, bucket.getDocCount()); }, - false + 10000, + false, + false, + true ); } @@ -1388,7 +1400,19 @@ private void testSearchCase( boolean useNanosecondResolution, boolean useDocCountField ) throws IOException { - boolean aggregableDateIsSearchable = randomBoolean(); + testSearchCase(query, dataset, configure, verify, maxBucket, useNanosecondResolution, useDocCountField, randomBoolean()); + } + + private void testSearchCase( + Query query, + List dataset, + Consumer configure, + Consumer verify, + int maxBucket, + boolean useNanosecondResolution, + boolean useDocCountField, + boolean aggregableDateIsSearchable + ) throws IOException { logger.debug("Aggregable date is searchable {}", aggregableDateIsSearchable); DateFieldMapper.DateFieldType fieldType = aggregableDateFieldType(useNanosecondResolution, aggregableDateIsSearchable); From 5f8255bc1ab13ec1e30d1250ba9fb1586bc4198e Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Tue, 6 Feb 2024 10:59:27 -0800 Subject: [PATCH 14/14] small refactor Signed-off-by: bowenlan-amzn --- .../bucket/FastFilterRewriteHelper.java | 43 ++++++++----------- 1 file changed, 19 insertions(+), 24 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java index 56c4e8ad45e28..6f1cc901e2d82 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java @@ -24,7 +24,6 @@ import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.Weight; import org.apache.lucene.util.NumericUtils; -import org.opensearch.common.CheckedBiFunction; import org.opensearch.common.Rounding; import org.opensearch.common.lucene.search.function.FunctionScoreQuery; import org.opensearch.index.mapper.DateFieldMapper; @@ -262,17 +261,20 @@ public boolean isRewriteable(final Object parent, final int subAggLength) { } public void buildFastFilter() throws IOException { - this.filters = this.buildFastFilter(FastFilterRewriteHelper::getDateHistoAggBounds); + assert filters == null : "Filters should only be built once, but they are already built"; + this.filters = this.aggregationType.buildFastFilter(context); if (filters != null) { logger.debug("Fast filter built for shard {}", context.indexShard().shardId()); filtersBuiltAtShardLevel = true; } } - // This method can also be used at segment level - private Weight[] buildFastFilter(CheckedBiFunction getBounds) throws IOException { + public void buildFastFilter(LeafReaderContext leaf) throws IOException { assert filters == null : "Filters should only be built once, but they are already built"; - return this.aggregationType.buildFastFilter(context, getBounds); + this.filters = this.aggregationType.buildFastFilter(leaf, context); + if (filters != null) { + logger.debug("Fast filter built for shard {} segment {}", context.indexShard().shardId(), leaf.ord); + } } } @@ -283,8 +285,9 @@ interface AggregationType { boolean isRewriteable(Object parent, int subAggLength); - Weight[] buildFastFilter(SearchContext ctx, CheckedBiFunction getBounds) - throws IOException; + Weight[] buildFastFilter(SearchContext ctx) throws IOException; + + Weight[] buildFastFilter(LeafReaderContext leaf, SearchContext ctx) throws IOException; default int getSize() { return Integer.MAX_VALUE; @@ -322,19 +325,15 @@ public boolean isRewriteable(Object parent, int subAggLength) { } @Override - public Weight[] buildFastFilter(SearchContext context, CheckedBiFunction getBounds) - throws IOException { - long[] bounds = getBounds.apply(context, fieldType.name()); + public Weight[] buildFastFilter(SearchContext context) throws IOException { + long[] bounds = getDateHistoAggBounds(context, fieldType.name()); logger.debug("Bounds are {} for shard {}", bounds, context.indexShard().shardId()); return buildFastFilter(context, bounds); } - private Weight[] buildFastFilterWithSegBounds( - SearchContext context, - CheckedBiFunction getBounds, - LeafReaderContext leaf - ) throws IOException { - long[] bounds = getBounds.apply(leaf, fieldType.name()); + @Override + public Weight[] buildFastFilter(LeafReaderContext leaf, SearchContext context) throws IOException { + long[] bounds = getSegmentBounds(leaf, fieldType.name()); logger.debug("Bounds are {} for shard {} segment {}", bounds, context.indexShard().shardId(), leaf.ord); return buildFastFilter(context, bounds); } @@ -411,6 +410,8 @@ public static long getBucketOrd(long bucketOrd) { /** * Try to get the bucket doc counts from the fast filters for the aggregation + *

+ * Usage: invoked at segment level — in getLeafCollector of aggregator * * @param incrementDocCount takes in the bucket key value and the bucket count */ @@ -446,15 +447,9 @@ public static boolean tryFastFilterAggregation( fastFilterContext.context.indexShard().shardId(), ctx.ord ); - if (fastFilterContext.aggregationType instanceof AbstractDateHistogramAggregationType) { - filters = ((AbstractDateHistogramAggregationType) fastFilterContext.aggregationType).buildFastFilterWithSegBounds( - fastFilterContext.context, - FastFilterRewriteHelper::getSegmentBounds, - ctx - ); - } + fastFilterContext.buildFastFilter(ctx); + filters = fastFilterContext.filters; if (filters == null) { - return false; } }