From 157b2e5ea4f7835822de28cc3d29b34b0d52bf3f Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Wed, 6 Dec 2023 09:15:00 -0800 Subject: [PATCH] complete implementation on small test set Signed-off-by: bowenlan-amzn --- .../bucket/FilterRewriteHelper.java | 39 ++++++++++--- .../bucket/composite/CompositeAggregator.java | 57 +++++++++++-------- .../AutoDateHistogramAggregator.java | 31 ++++------ .../histogram/DateHistogramAggregator.java | 29 ++++------ 4 files changed, 85 insertions(+), 71 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/FilterRewriteHelper.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/FilterRewriteHelper.java index a1c4f16ce2a97..78418d25d5217 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/FilterRewriteHelper.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/FilterRewriteHelper.java @@ -10,7 +10,6 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.PointValues; -import org.apache.lucene.search.CollectionTerminatedException; import org.apache.lucene.search.ConstantScoreQuery; import org.apache.lucene.search.IndexOrDocValuesQuery; import org.apache.lucene.search.MatchAllDocsQuery; @@ -37,8 +36,8 @@ import java.util.function.Supplier; /** - * Helpers functions to rewrite and optimize aggregations using - * range filter queries + * Help rewrite and optimize aggregations using range filter queries + * Currently supported types of aggregations are: DateHistogramAggregator, AutoDateHistogramAggregator, CompositeAggregator * * @opensearch.internal */ @@ -190,6 +189,17 @@ protected String toString(int dimension, byte[] value) { return filters; } + /** + * The pre-conditions to initiate fast filter optimization on aggregations are: + * 1. The query with aggregation has to be PointRangeQuery on the same date field + * 2. No parent/sub aggregations + * 3. No missing value/bucket + * 4. No script + * + * @param computeBounds get the lower and upper bound of the field in a shard search + * @param roundingFunction produce Rounding that will provide the interval + * @param preparedRoundingSupplier produce PreparedRounding that will do the rounding + */ public static FilterContext buildFastFilterContext( final Object parent, final int subAggLength, @@ -199,8 +209,6 @@ public static FilterContext buildFastFilterContext( ValueSourceContext valueSourceContext, CheckedFunction computeBounds ) throws IOException { - // Create the filters for fast aggregation only if the query is instance - // of point range query and there aren't any parent/sub aggregations if (parent == null && subAggLength == 0 && !valueSourceContext.missing && !valueSourceContext.hasScript) { MappedFieldType fieldType = valueSourceContext.getFieldType(); if (fieldType != null) { @@ -251,19 +259,27 @@ public MappedFieldType getFieldType() { } public static long getBucketOrd(long bucketOrd) { - if (bucketOrd < 0) { // already seen // TODO reading theoretically for one segment, there cannot be duplicate bucket? + if (bucketOrd < 0) { // already seen bucketOrd = -1 - bucketOrd; } return bucketOrd; } + /** + * This should be executed for each segment + * + * @param size the maximum number of buckets needed + */ public static boolean tryFastFilterAggregation( final LeafReaderContext ctx, final Weight[] filters, final DateFieldMapper.DateFieldType fieldType, - final BiConsumer incrementDocCount + final BiConsumer incrementDocCount, + final int size ) throws IOException { + if (filters == null) return false; + final int[] counts = new int[filters.length]; int i; for (i = 0; i < filters.length; i++) { @@ -275,16 +291,21 @@ public static boolean tryFastFilterAggregation( } } + int s = 0; for (i = 0; i < filters.length; i++) { if (counts[i] > 0) { incrementDocCount.accept( fieldType.convertNanosToMillis( - NumericUtils.sortableBytesToLong(((PointRangeQuery) filters[i].getQuery()).getLowerPoint(), 0) + NumericUtils.sortableBytesToLong( + ((PointRangeQuery) filters[i].getQuery()).getLowerPoint(), 0) ), counts[i] ); + s++; + if (s > size) return true; } } - throw new CollectionTerminatedException(); + + return true; } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java index d4c01234a3e72..bbb9c6167e6f7 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -73,7 +73,12 @@ import org.opensearch.search.aggregations.bucket.FilterRewriteHelper; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.function.LongUnaryOperator; import java.util.stream.Collectors; @@ -160,7 +165,6 @@ final class CompositeAggregator extends BucketsAggregator { FilterRewriteHelper.ValueSourceContext dateHistogramSourceContext = new FilterRewriteHelper.ValueSourceContext( dateHistogramSourceConfig.missingBucket(), dateHistogramSourceConfig.hasScript(), - // TODO reading this can be null, and that's why we support missing dateHistogramSourceConfig.fieldType() ); FilterRewriteHelper.FilterContext filterContext = FilterRewriteHelper.buildFastFilterContext( @@ -182,6 +186,18 @@ final class CompositeAggregator extends BucketsAggregator { } } + // private long[] computeBounds(final FilterRewriteHelper.ValueSourceContext fieldContext) { + // final long[] bounds = FilterRewriteHelper.getAggregationBounds(context, fieldContext.getFieldType().name()); + // if (bounds != null) { + // // Update min/max limit if user specified any hard bounds + // if (hardBounds != null) { + // bounds[0] = Math.max(bounds[0], hardBounds.getMin()); + // bounds[1] = Math.min(bounds[1], hardBounds.getMax() - 1); // hard bounds max is exclusive + // } + // } + // return bounds; + // } + @Override protected void doClose() { try { @@ -237,17 +253,14 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I ); } - + // For the fast filters optimization if (bucketOrds.size() != 0) { - // transform existing buckets into map if not 0 - // this is for the case where we have duplicate buckets, we need to add bucketOrds content into buckets Map bucketMap = new HashMap<>(); for (InternalComposite.InternalBucket internalBucket : buckets) { bucketMap.put(internalBucket.getRawKey(), internalBucket); } - // need to add bucketOrds content into buckets + LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(0); - // if duplicate, add to existing while (ordsEnum.next()) { Long bucketValue = ordsEnum.value(); CompositeKey key = new CompositeKey(bucketValue); @@ -267,9 +280,10 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I bucketMap.put(key, bucket); } } + List bucketList = new ArrayList<>(bucketMap.values()); CollectionUtil.introSort(bucketList, InternalComposite.InternalBucket::compareKey); - buckets = bucketList.toArray(InternalComposite.InternalBucket[]::new); + buckets = bucketList.subList(0, Math.min(size, bucketList.size())).toArray(InternalComposite.InternalBucket[]::new); num = buckets.length; } @@ -514,23 +528,16 @@ private void processLeafFromQuery(LeafReaderContext ctx, Sort indexSortPrefix) t @Override protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { - // Need to be declared as final and array for usage within the - // LeafBucketCollectorBase subclass below - final boolean[] useOpt = new boolean[1]; - useOpt[0] = filters != null; - - // Try fast filter aggregation if the filters have been created - // Skip if tried before and gave incorrect/incomplete results - if (useOpt[0]) { - useOpt[0] = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType, - (key, count) -> { - incrementBucketDocCount( - FilterRewriteHelper.getBucketOrd( - bucketOrds.add(0, preparedRounding.round(key))), - count - ); - }); - } + boolean optimized = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType, + (key, count) -> { + incrementBucketDocCount( + FilterRewriteHelper.getBucketOrd( + bucketOrds.add(0, preparedRounding.round(key)) + ), + count + ); + }, size); + if (optimized) throw new CollectionTerminatedException(); finishLeaf(); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java index 6fbbdda84bf05..a2677d6f82768 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java @@ -33,6 +33,7 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.search.CollectionTerminatedException; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.Weight; import org.apache.lucene.util.CollectionUtil; @@ -232,30 +233,22 @@ public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBuc return LeafBucketCollector.NO_OP_COLLECTOR; } + boolean optimized = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType, + (key, count) -> { + incrementBucketDocCount( + FilterRewriteHelper.getBucketOrd( + getBucketOrds().add(0, preparedRounding.round(key)) + ), + count + ); + }, Integer.MAX_VALUE); + if (optimized) throw new CollectionTerminatedException(); + final SortedNumericDocValues values = valuesSource.longValues(ctx); final LeafBucketCollector iteratingCollector = getLeafCollector(values, sub); - - // Need to be declared as final and array for usage within the - // LeafBucketCollectorBase subclass below - final boolean[] useOpt = new boolean[1]; - useOpt[0] = filters != null; - return new LeafBucketCollectorBase(sub, values) { @Override public void collect(int doc, long owningBucketOrd) throws IOException { - // Try fast filter aggregation if the filters have been created - // Skip if tried before and gave incorrect/incomplete results - if (useOpt[0]) { - useOpt[0] = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType, - (key, count) -> { - incrementBucketDocCount( - FilterRewriteHelper.getBucketOrd( - getBucketOrds().add(owningBucketOrd, preparedRounding.round(key))), - count - ); - }); - } - iteratingCollector.collect(doc, owningBucketOrd); } }; diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java index 372fddee37401..24ba87e8a12ae 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java @@ -33,6 +33,7 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.search.CollectionTerminatedException; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.Weight; import org.apache.lucene.util.CollectionUtil; @@ -165,29 +166,21 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol return LeafBucketCollector.NO_OP_COLLECTOR; } - // Need to be declared as final and array for usage within the - // LeafBucketCollectorBase subclass below - final boolean[] useOpt = new boolean[1]; - useOpt[0] = filters != null; + boolean optimized = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType, + (key, count) -> { + incrementBucketDocCount( + FilterRewriteHelper.getBucketOrd( + bucketOrds.add(0, preparedRounding.round(key)) + ), + count + ); + }, Integer.MAX_VALUE); + if (optimized) throw new CollectionTerminatedException(); SortedNumericDocValues values = valuesSource.longValues(ctx); return new LeafBucketCollectorBase(sub, values) { @Override public void collect(int doc, long owningBucketOrd) throws IOException { - // Try fast filter aggregation if the filters have been created - // Skip if tried before and gave incorrect/incomplete results - if (useOpt[0]) { - useOpt[0] = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType, - (key, count) -> { - incrementBucketDocCount( - FilterRewriteHelper.getBucketOrd( // TODO reading not possible to see duplicate bucket - bucketOrds.add(owningBucketOrd, preparedRounding.round(key)) - ), - count - ); - }); - } - if (values.advanceExact(doc)) { int valuesCount = values.docValueCount();