diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java index 5db9a0bc21b64..7527ecefa4198 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java @@ -8,6 +8,8 @@ package org.opensearch.search.aggregations.bucket; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.PointValues; import org.apache.lucene.search.ConstantScoreQuery; @@ -53,6 +55,8 @@ public final class FastFilterRewriteHelper { private FastFilterRewriteHelper() {} + private static final Logger logger = LogManager.getLogger(FastFilterRewriteHelper.class); + private static final int MAX_NUM_FILTER_BUCKETS = 1024; private static final Map, Function> queryWrappers; @@ -86,6 +90,7 @@ private static long[] getIndexBounds(final SearchContext context, final String f // Since the query does not specify bounds for aggregation, we can // build the global min/max from local min/max within each segment for (LeafReaderContext leaf : leaves) { + // return null if the field is not indexed final PointValues values = leaf.reader().getPointValues(fieldName); if (values != null) { min = Math.min(min, NumericUtils.sortableBytesToLong(values.getMinPackedValue(), 0)); @@ -187,14 +192,18 @@ protected String toString(int dimension, byte[] value) { } /** - * Context object to do fast filter optimization + * Context object for fast filter optimization + * + * filters equals to null if the optimization cannot be applied */ public static class FastFilterContext { + private boolean rewriteable = false; private Weight[] filters = null; public AggregationType aggregationType; + protected final SearchContext context; - private void setFilters(Weight[] filters) { - this.filters = filters; + public FastFilterContext(SearchContext context) { + this.context = context; } public void setAggregationType(AggregationType aggregationType) { @@ -202,64 +211,39 @@ public void setAggregationType(AggregationType aggregationType) { } public boolean isRewriteable(final Object parent, final int subAggLength) { - return aggregationType.isRewriteable(parent, subAggLength); + boolean rewriteable = aggregationType.isRewriteable(parent, subAggLength); + logger.debug("Fast filter rewriteable: {}", rewriteable); + this.rewriteable = rewriteable; + return rewriteable; } - /** - * This filter build method is for date histogram aggregation type - * - */ - // public void buildFastFilter( - // SearchContext context, - // CheckedFunction computeBounds, - // Function roundingFunction, - // Supplier preparedRoundingSupplier - // ) throws IOException { - // assert this.aggregationType instanceof DateHistogramAggregationType; - // DateHistogramAggregationType aggregationType = (DateHistogramAggregationType) this.aggregationType; - // DateFieldMapper.DateFieldType fieldType = aggregationType.getFieldType(); - // final long[] bounds = computeBounds.apply(aggregationType); - // if (bounds == null) return; - // - // final Rounding rounding = roundingFunction.apply(bounds); - // final OptionalLong intervalOpt = Rounding.getInterval(rounding); - // if (intervalOpt.isEmpty()) return; - // final long interval = intervalOpt.getAsLong(); - // - // // afterKey is the last bucket key in previous response, while the bucket key - // // is the start of the bucket values, so add the interval - // if (aggregationType instanceof CompositeAggregationType && ((CompositeAggregationType) aggregationType).afterKey != -1) { - // bounds[0] = ((CompositeAggregationType) aggregationType).afterKey + interval; - // } - // - // final Weight[] filters = FastFilterRewriteHelper.createFilterForAggregations( - // context, - // interval, - // preparedRoundingSupplier.get(), - // fieldType.name(), - // fieldType, - // bounds[0], - // bounds[1] - // ); - // this.setFilters(filters); - // } - - public void buildFastFilter(SearchContext context) throws IOException { - Weight[] filters = this.aggregationType.buildFastFilter(context); - this.setFilters(filters); + public void buildFastFilter() throws IOException { + Weight[] filters = this.buildFastFilter(null); + logger.debug("Fast filter built: {}", filters == null ? "no" : "yes"); + this.filters = filters; } - // this method should delegate to specific aggregation type + public Weight[] buildFastFilter(LeafReaderContext ctx) throws IOException { + Weight[] filters = this.aggregationType.buildFastFilter(context, ctx); + logger.debug("Fast filter built: {}", filters == null ? "no" : "yes"); + this.filters = filters; + return filters; + } - // can this all be triggered in aggregation type? + public boolean hasfilterBuilt() { + return filters != null; + } } /** * Different types have different pre-conditions, filter building logic, etc. */ public interface AggregationType { + boolean isRewriteable(Object parent, int subAggLength); - Weight[] buildFastFilter(SearchContext context) throws IOException; + + // Weight[] buildFastFilter(SearchContext context) throws IOException; + Weight[] buildFastFilter(SearchContext context, LeafReaderContext ctx) throws IOException; } /** @@ -269,7 +253,7 @@ public static abstract class AbstractDateHistogramAggregationType implements Agg private final MappedFieldType fieldType; private final boolean missing; private final boolean hasScript; - private LongBounds hardBounds = null; + private LongBounds hardBounds; public AbstractDateHistogramAggregationType(MappedFieldType fieldType, boolean missing, boolean hasScript) { this.fieldType = fieldType; @@ -284,6 +268,9 @@ public AbstractDateHistogramAggregationType(MappedFieldType fieldType, boolean m @Override public boolean isRewriteable(Object parent, int subAggLength) { + // at shard level these are the pre-conditions + // these still need to apply at segment level + // while the effectively segment level match all, should be plugged in within compute bound if (parent == null && subAggLength == 0 && !missing && !hasScript) { return fieldType != null && fieldType instanceof DateFieldMapper.DateFieldType; } @@ -291,26 +278,21 @@ public boolean isRewriteable(Object parent, int subAggLength) { } @Override - public Weight[] buildFastFilter(SearchContext context) throws IOException { - // the procedure in this method can be separated - // 1. compute bound - // 2. get rounding, interval - // 3. get prepared rounding, better combined with step 2 - Weight[] result = {}; - - long[] bounds = computeBounds(context); - if (bounds == null) return result; + public Weight[] buildFastFilter(SearchContext context, LeafReaderContext ctx) throws IOException { + long[] bounds; + if (ctx != null) { + bounds = getIndexBounds(context, fieldType.name()); + } else { + bounds = computeBounds(context); + } + if (bounds == null) return null; final Rounding rounding = getRounding(bounds[0], bounds[1]); final OptionalLong intervalOpt = Rounding.getInterval(rounding); - if (intervalOpt.isEmpty()) return result; + if (intervalOpt.isEmpty()) return null; final long interval = intervalOpt.getAsLong(); - // afterKey is the last bucket key in previous response, while the bucket key - // is the start of the bucket values, so add the interval - // if (aggregationType instanceof CompositeAggregationType && ((CompositeAggregationType) aggregationType).afterKey != -1) { - // bounds[0] = ((CompositeAggregationType) aggregationType).afterKey + interval; - // } + // process the after key from composite agg processAfterKey(bounds, interval); return FastFilterRewriteHelper.createFilterForAggregations( @@ -337,6 +319,7 @@ protected long[] computeBounds(SearchContext context) throws IOException { } protected abstract Rounding getRounding(final long low, final long high); + protected abstract Rounding.Prepared getRoundingPrepared(); protected void processAfterKey(long[] bound, long interval) {}; @@ -347,40 +330,6 @@ public DateFieldMapper.DateFieldType getFieldType() { } } - /** - * For composite aggregation with date histogram as a source - */ - // public static class CompositeAggregationType extends DateHistogramAggregationType { - // private final RoundingValuesSource valuesSource; - // private long afterKey = -1L; - // private final int size; - // - // public CompositeAggregationType( - // CompositeValuesSourceConfig[] sourceConfigs, - // CompositeKey rawAfterKey, - // List formats, - // int size - // ) { - // super(sourceConfigs[0].fieldType(), sourceConfigs[0].missingBucket(), sourceConfigs[0].hasScript()); - // this.valuesSource = (RoundingValuesSource) sourceConfigs[0].valuesSource(); - // this.size = size; - // if (rawAfterKey != null) { - // assert rawAfterKey.size() == 1 && formats.size() == 1; - // this.afterKey = formats.get(0).parseLong(rawAfterKey.get(0).toString(), false, () -> { - // throw new IllegalArgumentException("now() is not supported in [after] key"); - // }); - // } - // } - // - // public Rounding getRounding() { - // return valuesSource.getRounding(); - // } - // - // public Rounding.Prepared getRoundingPreparer() { - // return valuesSource.getPreparedRounding(); - // } - // } - public static boolean isCompositeAggRewriteable(CompositeValuesSourceConfig[] sourceConfigs) { return sourceConfigs.length == 1 && sourceConfigs[0].valuesSource() instanceof RoundingValuesSource; } @@ -394,7 +343,7 @@ public static long getBucketOrd(long bucketOrd) { } /** - * This is executed for each segment by passing the leaf reader context + * Try to get the bucket doc counts from the fast filters for the aggregation * * @param incrementDocCount takes in the bucket key value and the bucket count */ @@ -404,7 +353,17 @@ public static boolean tryFastFilterAggregation( final BiConsumer incrementDocCount ) throws IOException { if (fastFilterContext == null) return false; - if (fastFilterContext.filters == null) return false; + if (!fastFilterContext.hasfilterBuilt() && fastFilterContext.rewriteable) { + // Check if the query is functionally match-all at segment level + // if so, we still compute the index bounds and use it to build the filters + SearchContext context = fastFilterContext.context; + Weight weight = context.searcher().createWeight(context.query(), ScoreMode.COMPLETE_NO_SCORES, 1f); + boolean segmentRewriteable = weight != null && weight.count(ctx) == ctx.reader().numDocs(); + if (segmentRewriteable) { + fastFilterContext.buildFastFilter(ctx); + } + } + if (!fastFilterContext.hasfilterBuilt()) return false; final Weight[] filters = fastFilterContext.filters; final int[] counts = new int[filters.length]; @@ -424,8 +383,8 @@ public static boolean tryFastFilterAggregation( if (counts[i] > 0) { long bucketKey = i; // the index of filters is the key for filters aggregation if (fastFilterContext.aggregationType instanceof AbstractDateHistogramAggregationType) { - final DateFieldMapper.DateFieldType fieldType = ((AbstractDateHistogramAggregationType) fastFilterContext.aggregationType) - .getFieldType(); + final DateFieldMapper.DateFieldType fieldType = + ((AbstractDateHistogramAggregationType) fastFilterContext.aggregationType).getFieldType(); bucketKey = fieldType.convertNanosToMillis( NumericUtils.sortableBytesToLong(((PointRangeQuery) filters[i].getQuery()).getLowerPoint(), 0) ); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java index ab8641eaf8a90..d8e7229e0d450 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -164,24 +164,20 @@ public final class CompositeAggregator extends BucketsAggregator { this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size, rawAfterKey); this.rawAfterKey = rawAfterKey; - fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(); + fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(context); if (!FastFilterRewriteHelper.isCompositeAggRewriteable(sourceConfigs)) return; - fastFilterContext.setAggregationType( - new CompositeAggregationType() - ); + fastFilterContext.setAggregationType(new CompositeAggregationType()); if (fastFilterContext.isRewriteable(parent, subAggregators.length)) { - // bucketOrds is the data structure for saving date histogram results + // bucketOrds is used for saving date histogram results bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), CardinalityUpperBound.ONE); - // Currently the filter rewrite is only supported for date histograms - // FastFilterRewriteHelper.CompositeAggregationType aggregationType = - // (FastFilterRewriteHelper.CompositeAggregationType) fastFilterContext.aggregationType; - // preparedRounding = aggregationType.getRoundingPreparer(); - fastFilterContext.buildFastFilter( - context - ); + preparedRounding = ((CompositeAggregationType) fastFilterContext.aggregationType).getRoundingPrepared(); + fastFilterContext.buildFastFilter(); } } + /** + * Currently the filter rewrite is only supported for date histograms + */ public class CompositeAggregationType extends FastFilterRewriteHelper.AbstractDateHistogramAggregationType { private final RoundingValuesSource valuesSource; private long afterKey = -1L; @@ -197,20 +193,21 @@ public CompositeAggregationType() { } } - @Override public Rounding getRounding(final long low, final long high) { return valuesSource.getRounding(); } + public Rounding.Prepared getRoundingPrepared() { + return valuesSource.getPreparedRounding(); + } + @Override protected void processAfterKey(long[] bound, long interval) { + // afterKey is the last bucket key in previous response, and the bucket key + // is the minimum of all values in the bucket, so need to add the interval bound[0] = afterKey + interval; } - public Rounding.Prepared getRoundingPrepared() { - return valuesSource.getPreparedRounding(); - } - public int getSize() { return size; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java index 8a2eea9a0417f..a76a05fb3985c 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java @@ -157,7 +157,7 @@ private AutoDateHistogramAggregator( this.roundingPreparer = roundingPreparer; this.preparedRounding = prepareRounding(0); - fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(); + fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(context); fastFilterContext.setAggregationType( new AutoHistogramAggregationType( valuesSourceConfig.fieldType(), @@ -166,32 +166,12 @@ private AutoDateHistogramAggregator( ) ); if (fastFilterContext.isRewriteable(parent, subAggregators.length)) { - fastFilterContext.buildFastFilter(context); + fastFilterContext.buildFastFilter(); } } - // private Rounding getMinimumRounding(final long low, final long high) { - // // max - min / targetBuckets = bestDuration - // // find the right innerInterval this bestDuration belongs to - // // since we cannot exceed targetBuckets, bestDuration should go up, - // // so the right innerInterval should be an upper bound - // long bestDuration = (high - low) / targetBuckets; - // while (roundingIdx < roundingInfos.length - 1) { - // final RoundingInfo curRoundingInfo = roundingInfos[roundingIdx]; - // final int temp = curRoundingInfo.innerIntervals[curRoundingInfo.innerIntervals.length - 1]; - // // If the interval duration is covered by the maximum inner interval, - // // we can start with this outer interval for creating the buckets - // if (bestDuration <= temp * curRoundingInfo.roughEstimateDurationMillis) { - // break; - // } - // roundingIdx++; - // } - // - // preparedRounding = prepareRounding(roundingIdx); - // return roundingInfos[roundingIdx].rounding; - // } - private class AutoHistogramAggregationType extends FastFilterRewriteHelper.AbstractDateHistogramAggregationType { + public AutoHistogramAggregationType(MappedFieldType fieldType, boolean missing, boolean hasScript) { super(fieldType, missing, hasScript); } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java index fd0cf1ab3f8b1..5b896cb95cf25 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java @@ -31,10 +31,13 @@ package org.opensearch.search.aggregations.bucket.histogram; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.SortedNumericDocValues; import org.apache.lucene.search.CollectionTerminatedException; import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Weight; import org.apache.lucene.util.CollectionUtil; import org.opensearch.common.Nullable; import org.opensearch.common.Rounding; @@ -84,6 +87,9 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg private final LongKeyedBucketOrds bucketOrds; private final FastFilterRewriteHelper.FastFilterContext fastFilterContext; + private Weight weight; + + private static final Logger logger = LogManager.getLogger(DateHistogramAggregator.class); DateHistogramAggregator( String name, @@ -116,7 +122,7 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), cardinality); - fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(); + fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(context); fastFilterContext.setAggregationType( new DateHistogramAggregationType( valuesSourceConfig.fieldType(), @@ -126,22 +132,10 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg ) ); if (fastFilterContext.isRewriteable(parent, subAggregators.length)) { - fastFilterContext.buildFastFilter(context); + fastFilterContext.buildFastFilter(); } } - // private long[] computeBounds(final FastFilterRewriteHelper.DateHistogramAggregationType fieldContext) throws IOException { - // final long[] bounds = FastFilterRewriteHelper.getAggregationBounds(context, fieldContext.getFieldType().name()); - // if (bounds != null) { - // // Update min/max limit if user specified any hard bounds - // if (hardBounds != null) { - // bounds[0] = Math.max(bounds[0], hardBounds.getMin()); - // bounds[1] = Math.min(bounds[1], hardBounds.getMax() - 1); // hard bounds max is exclusive - // } - // } - // return bounds; - // } - private class DateHistogramAggregationType extends FastFilterRewriteHelper.AbstractDateHistogramAggregationType { public DateHistogramAggregationType(MappedFieldType fieldType, boolean missing, boolean hasScript, LongBounds hardBounds) { @@ -173,8 +167,6 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol return LeafBucketCollector.NO_OP_COLLECTOR; } - // assume we found out at segment level, it's effectively a match all - // boolean optimized = FastFilterRewriteHelper.tryFastFilterAggregation( ctx, fastFilterContext, @@ -183,6 +175,7 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol count ) ); + logger.info("optimized = " + optimized); if (optimized) throw new CollectionTerminatedException(); SortedNumericDocValues values = valuesSource.longValues(ctx);