diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java index 59781aff9b822..8dff568aac36a 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java @@ -11,6 +11,7 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.PointValues; import org.apache.lucene.search.ConstantScoreQuery; +import org.apache.lucene.search.FieldExistsQuery; import org.apache.lucene.search.IndexOrDocValuesQuery; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.PointRangeQuery; @@ -18,7 +19,6 @@ import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.Weight; import org.apache.lucene.util.NumericUtils; -import org.opensearch.common.CheckedFunction; import org.opensearch.common.Rounding; import org.opensearch.common.lucene.search.function.FunctionScoreQuery; import org.opensearch.index.mapper.DateFieldMapper; @@ -28,6 +28,7 @@ import org.opensearch.search.aggregations.bucket.composite.CompositeKey; import org.opensearch.search.aggregations.bucket.composite.CompositeValuesSourceConfig; import org.opensearch.search.aggregations.bucket.composite.RoundingValuesSource; +import org.opensearch.search.aggregations.bucket.histogram.LongBounds; import org.opensearch.search.internal.SearchContext; import java.io.IOException; @@ -48,6 +49,7 @@ *
  • date histogram : date range filter. * Applied: DateHistogramAggregator, AutoDateHistogramAggregator, CompositeAggregator
  • * + * * @opensearch.internal */ public class FastFilterRewriteHelper { @@ -94,7 +96,7 @@ private static long[] getIndexBoundsFromLeaves(final SearchContext context, fina if (min == Long.MAX_VALUE || max == Long.MIN_VALUE) return null; - return new long[] { min, max }; + return new long[]{min, max}; } /** @@ -108,14 +110,19 @@ public static long[] getAggregationBounds(final SearchContext context, final Str final PointRangeQuery prq = (PointRangeQuery) cq; // Ensure that the query and aggregation are on the same field if (prq.getField().equals(fieldName)) { - return new long[] { + return new long[]{ // Minimum bound for aggregation is the max between query and global Math.max(NumericUtils.sortableBytesToLong(prq.getLowerPoint(), 0), indexBounds[0]), // Maximum bound for aggregation is the min between query and global - Math.min(NumericUtils.sortableBytesToLong(prq.getUpperPoint(), 0), indexBounds[1]) }; + Math.min(NumericUtils.sortableBytesToLong(prq.getUpperPoint(), 0), indexBounds[1])}; } } else if (cq instanceof MatchAllDocsQuery) { return indexBounds; + } else if (cq instanceof FieldExistsQuery) { + final FieldExistsQuery feq = (FieldExistsQuery) cq; + if (feq.getField().equals(fieldName)) { + return indexBounds; + } } return null; @@ -163,8 +170,8 @@ private static Weight[] createFilterForAggregations( roundedLow = preparedRounding.round(roundedLow + interval); final byte[] upper = new byte[8]; NumericUtils.longToSortableBytes(i + 1 == bucketCount ? high : - // Subtract -1 if the minimum is roundedLow as roundedLow itself - // is included in the next bucket + // Subtract -1 if the minimum is roundedLow as roundedLow itself + // is included in the next bucket fieldType.convertRoundedMillisToNanos(roundedLow) - 1, upper, 0); filters[i++] = context.searcher().createWeight(new PointRangeQuery(field, lower, upper, 1) { @@ -180,46 +187,45 @@ protected String toString(int dimension, byte[] value) { } /** - * @param computeBounds get the lower and upper bound of the field in a shard search + * This method build filters for date histogram aggregation + * * @param roundingFunction produce Rounding that contains interval of date range. * Rounding is computed dynamically using the bounds in AutoDateHistogram * @param preparedRoundingSupplier produce PreparedRounding to round values at call-time */ public static void buildFastFilter( SearchContext context, - CheckedFunction computeBounds, Function roundingFunction, Supplier preparedRoundingSupplier, FastFilterContext fastFilterContext ) throws IOException { assert fastFilterContext.fieldType instanceof DateFieldMapper.DateFieldType; DateFieldMapper.DateFieldType fieldType = (DateFieldMapper.DateFieldType) fastFilterContext.fieldType; - final long[] bounds = computeBounds.apply(fastFilterContext); // TODO b do we need to pass in the context? or specific things - if (bounds != null) { - final Rounding rounding = roundingFunction.apply(bounds); - final OptionalLong intervalOpt = Rounding.getInterval(rounding); - if (intervalOpt.isEmpty()) { - return; - } - final long interval = intervalOpt.getAsLong(); - - // afterKey is the last bucket key in previous response, while the bucket key - // is the start of the bucket values, so add the interval - if (fastFilterContext.afterKey != -1) { - bounds[0] = fastFilterContext.afterKey + interval; - } + final long[] bounds = fastFilterContext.bounds; + assert bounds != null; + final Rounding rounding = roundingFunction.apply(bounds); + final OptionalLong intervalOpt = Rounding.getInterval(rounding); + if (intervalOpt.isEmpty()) { + return; + } + final long interval = intervalOpt.getAsLong(); - final Weight[] filters = FastFilterRewriteHelper.createFilterForAggregations( - context, - interval, - preparedRoundingSupplier.get(), - fieldType.name(), - fieldType, - bounds[0], - bounds[1] - ); - fastFilterContext.setFilters(filters); + // afterKey is the last bucket key in previous response, while the bucket key + // is the start of the bucket values, so add the interval + if (fastFilterContext.afterKey != -1) { + bounds[0] = fastFilterContext.afterKey + interval; } + + final Weight[] filters = FastFilterRewriteHelper.createFilterForAggregations( + context, + interval, + preparedRoundingSupplier.get(), + fieldType.name(), + fieldType, + bounds[0], + bounds[1] + ); + fastFilterContext.setFilters(filters); } /** @@ -229,12 +235,15 @@ public static class FastFilterContext { private boolean missing = false; // TODO b confirm UT that can catch this private boolean hasScript = false; private boolean showOtherBucket = false; + private LongBounds hardBounds = null; private final MappedFieldType fieldType; + private long[] bounds = null; private long afterKey = -1L; private int size = Integer.MAX_VALUE; // only used by composite aggregation for pagination - private Weight[] filters = null; + + public Weight[] filters = null; private final Type type; @@ -293,16 +302,49 @@ public void setHasScript(boolean hasScript) { this.hasScript = hasScript; } + public void setHardBounds(LongBounds hardBounds) { + this.hardBounds = hardBounds; + } + public void setShowOtherBucket(boolean showOtherBucket) { this.showOtherBucket = showOtherBucket; } - public boolean isRewriteable(final Object parent, final int subAggLength) { + public boolean isRewriteable(final Object parent, final int subAggLength, SearchContext context) throws IOException { + if (parent == null && subAggLength == 0 && !missing && !hasScript) { + if (type == Type.FILTERS) { + return !showOtherBucket; + } else if (type == Type.DATE_HISTO && fieldType != null) { + final long[] bounds = FastFilterRewriteHelper.getAggregationBounds(context, this.fieldType.name()); + if (bounds != null) { + // Update min/max limit if user specified any hard bounds + if (hardBounds != null) { + bounds[0] = Math.max(bounds[0], hardBounds.getMin()); + bounds[1] = Math.min(bounds[1], hardBounds.getMax() - 1); // hard bounds max is exclusive + } + this.bounds = bounds; + return fieldType instanceof DateFieldMapper.DateFieldType; + } + } + } + return false; + } + + public boolean isRewriteable(final Object parent, final int subAggLength, LeafReaderContext leaf) throws IOException { if (parent == null && subAggLength == 0 && !missing && !hasScript) { if (type == Type.FILTERS) { return !showOtherBucket; - } else if (type == Type.DATE_HISTO) { - return fieldType != null && fieldType instanceof DateFieldMapper.DateFieldType; + } else if (type == Type.DATE_HISTO && fieldType != null) { + final long[] bounds = FastFilterRewriteHelper.getIndexBoundsFromLeave(leaf, this.fieldType.name()); + if (bounds != null) { + // Update min/max limit if user specified any hard bounds + if (hardBounds != null) { + bounds[0] = Math.max(bounds[0], hardBounds.getMin()); + bounds[1] = Math.min(bounds[1], hardBounds.getMax() - 1); // hard bounds max is exclusive + } + this.bounds = bounds; + return fieldType instanceof DateFieldMapper.DateFieldType; + } } } return false; @@ -317,6 +359,17 @@ public enum Type { } } + private static long[] getIndexBoundsFromLeave(LeafReaderContext leaf, final String fieldName) throws IOException { + final PointValues values = leaf.reader().getPointValues(fieldName); + long min = Long.MAX_VALUE, max = Long.MIN_VALUE; + if (values != null) { + min = Math.min(min, NumericUtils.sortableBytesToLong(values.getMinPackedValue(), 0)); + max = Math.max(max, NumericUtils.sortableBytesToLong(values.getMaxPackedValue(), 0)); + } + if (min == Long.MAX_VALUE || max == Long.MIN_VALUE) return null; + return new long[]{min, max}; + } + public static long getBucketOrd(long bucketOrd) { if (bucketOrd < 0) { // already seen bucketOrd = -1 - bucketOrd; diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java index e79e4602a3a2d..14b22ae5b63a7 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -164,7 +164,7 @@ final class CompositeAggregator extends BucketsAggregator { this.rawAfterKey = rawAfterKey; fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(sourceConfigs, rawAfterKey, formats); - if (fastFilterContext.isRewriteable(parent, subAggregators.length)) { + if (fastFilterContext.isRewriteable(parent, subAggregators.length, context)) { // Currently the filter rewrite is only supported for date histograms RoundingValuesSource dateHistogramSource = fastFilterContext.getDateHistogramSource(); preparedRounding = dateHistogramSource.getPreparedRounding(); @@ -173,7 +173,6 @@ final class CompositeAggregator extends BucketsAggregator { fastFilterContext.setSize(size); FastFilterRewriteHelper.buildFastFilter( context, - fc -> FastFilterRewriteHelper.getAggregationBounds(context, fc.getFieldType().name()), x -> dateHistogramSource.getRounding(), () -> preparedRounding, fastFilterContext diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java index 7a0e49c275542..ee08d23be8236 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java @@ -159,10 +159,9 @@ private AutoDateHistogramAggregator( fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(valuesSourceConfig.fieldType()); fastFilterContext.setMissing(valuesSourceConfig.missing() != null); fastFilterContext.setHasScript(valuesSourceConfig.script() != null); - if (fastFilterContext.isRewriteable(parent, subAggregators.length)) { + if (fastFilterContext.isRewriteable(parent, subAggregators.length, context)) { FastFilterRewriteHelper.buildFastFilter( context, - fc -> FastFilterRewriteHelper.getAggregationBounds(context, fc.getFieldType().name()), b -> getMinimumRounding(b[0], b[1]), // Passing prepared rounding as supplier to ensure the correct prepared // rounding is set as it is done during getMinimumRounding diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java index afcb711a11ba1..ef6acfa480be2 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java @@ -35,6 +35,7 @@ import org.apache.lucene.index.SortedNumericDocValues; import org.apache.lucene.search.CollectionTerminatedException; import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Weight; import org.apache.lucene.util.CollectionUtil; import org.opensearch.common.Nullable; import org.opensearch.common.Rounding; @@ -83,6 +84,7 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg private final LongKeyedBucketOrds bucketOrds; private final FastFilterRewriteHelper.FastFilterContext fastFilterContext; + private Weight weight; DateHistogramAggregator( String name, @@ -118,23 +120,12 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(valuesSourceConfig.fieldType()); fastFilterContext.setMissing(valuesSourceConfig.missing() != null); fastFilterContext.setHasScript(valuesSourceConfig.script() != null); - if (fastFilterContext.isRewriteable(parent, subAggregators.length)) { - FastFilterRewriteHelper.buildFastFilter(context, this::computeBounds, x -> rounding, () -> preparedRounding, fastFilterContext); + fastFilterContext.setHardBounds(hardBounds); // TODO b try add ut for this + if (fastFilterContext.isRewriteable(parent, subAggregators.length, context)) { + FastFilterRewriteHelper.buildFastFilter(context, x -> rounding, () -> preparedRounding, fastFilterContext); } } - private long[] computeBounds(final FastFilterRewriteHelper.FastFilterContext fieldContext) throws IOException { - final long[] bounds = FastFilterRewriteHelper.getAggregationBounds(context, fieldContext.getFieldType().name()); - if (bounds != null) { - // Update min/max limit if user specified any hard bounds - if (hardBounds != null) { - bounds[0] = Math.max(bounds[0], hardBounds.getMin()); - bounds[1] = Math.min(bounds[1], hardBounds.getMax() - 1); // hard bounds max is exclusive - } - } - return bounds; - } - @Override public ScoreMode scoreMode() { if (valuesSource != null && valuesSource.needsScores()) { @@ -149,10 +140,24 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol return LeafBucketCollector.NO_OP_COLLECTOR; } + // Not rewriteable at shard level, now at segment level, check if it can be rewritten + boolean rewriteable = fastFilterContext.filters != null; + if (!rewriteable && ctx.reader().numDocs() == weight.count(ctx)) { + if (fastFilterContext.isRewriteable(parent, subAggregators.length, ctx)) { + rewriteable = true; + FastFilterRewriteHelper.buildFastFilter(context, x -> rounding, () -> preparedRounding, fastFilterContext); + } + } + boolean optimized = FastFilterRewriteHelper.tryFastFilterAggregation(ctx, fastFilterContext, (key, count) -> { incrementBucketDocCount(FastFilterRewriteHelper.getBucketOrd(bucketOrds.add(0, preparedRounding.round(key))), count); }); - if (optimized) throw new CollectionTerminatedException(); + if (optimized) { + if (rewriteable == true) { // reset filters after segment level rewrite + fastFilterContext.setFilters(null); + } + throw new CollectionTerminatedException(); + } SortedNumericDocValues values = valuesSource.longValues(ctx); return new LeafBucketCollectorBase(sub, values) { @@ -185,6 +190,11 @@ public void collect(int doc, long owningBucketOrd) throws IOException { }; } + @Override + public void setWeight(Weight weight) { + this.weight = weight; + } + @Override public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { return buildAggregationsForVariableBuckets(owningBucketOrds, bucketOrds, (bucketValue, docCount, subAggregationResults) -> {