Skip to content

Commit

Permalink
complete implementation on small test set
Browse files Browse the repository at this point in the history
Signed-off-by: bowenlan-amzn <[email protected]>
  • Loading branch information
bowenlan-amzn committed Dec 6, 2023
1 parent 2db35f3 commit 157b2e5
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.PointValues;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.ConstantScoreQuery;
import org.apache.lucene.search.IndexOrDocValuesQuery;
import org.apache.lucene.search.MatchAllDocsQuery;
Expand All @@ -37,8 +36,8 @@
import java.util.function.Supplier;

/**
* Helpers functions to rewrite and optimize aggregations using
* range filter queries
* Help rewrite and optimize aggregations using range filter queries
* Currently supported types of aggregations are: DateHistogramAggregator, AutoDateHistogramAggregator, CompositeAggregator
*
* @opensearch.internal
*/
Expand Down Expand Up @@ -190,6 +189,17 @@ protected String toString(int dimension, byte[] value) {
return filters;
}

/**
* The pre-conditions to initiate fast filter optimization on aggregations are:
* 1. The query with aggregation has to be PointRangeQuery on the same date field
* 2. No parent/sub aggregations
* 3. No missing value/bucket
* 4. No script
*
* @param computeBounds get the lower and upper bound of the field in a shard search
* @param roundingFunction produce Rounding that will provide the interval
* @param preparedRoundingSupplier produce PreparedRounding that will do the rounding
*/
public static FilterContext buildFastFilterContext(
final Object parent,
final int subAggLength,
Expand All @@ -199,8 +209,6 @@ public static FilterContext buildFastFilterContext(
ValueSourceContext valueSourceContext,
CheckedFunction<ValueSourceContext, long[], IOException> computeBounds
) throws IOException {
// Create the filters for fast aggregation only if the query is instance
// of point range query and there aren't any parent/sub aggregations
if (parent == null && subAggLength == 0 && !valueSourceContext.missing && !valueSourceContext.hasScript) {
MappedFieldType fieldType = valueSourceContext.getFieldType();
if (fieldType != null) {
Expand Down Expand Up @@ -251,19 +259,27 @@ public MappedFieldType getFieldType() {
}

public static long getBucketOrd(long bucketOrd) {
if (bucketOrd < 0) { // already seen // TODO reading theoretically for one segment, there cannot be duplicate bucket?
if (bucketOrd < 0) { // already seen
bucketOrd = -1 - bucketOrd;
}

return bucketOrd;
}

/**
* This should be executed for each segment
*
* @param size the maximum number of buckets needed
*/
public static boolean tryFastFilterAggregation(
final LeafReaderContext ctx,
final Weight[] filters,
final DateFieldMapper.DateFieldType fieldType,
final BiConsumer<Long, Integer> incrementDocCount
final BiConsumer<Long, Integer> incrementDocCount,
final int size
) throws IOException {
if (filters == null) return false;

final int[] counts = new int[filters.length];
int i;
for (i = 0; i < filters.length; i++) {
Expand All @@ -275,16 +291,21 @@ public static boolean tryFastFilterAggregation(
}
}

int s = 0;
for (i = 0; i < filters.length; i++) {
if (counts[i] > 0) {
incrementDocCount.accept(
fieldType.convertNanosToMillis(
NumericUtils.sortableBytesToLong(((PointRangeQuery) filters[i].getQuery()).getLowerPoint(), 0)
NumericUtils.sortableBytesToLong(
((PointRangeQuery) filters[i].getQuery()).getLowerPoint(), 0)
),
counts[i]
);
s++;
if (s > size) return true;
}
}
throw new CollectionTerminatedException();

return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,12 @@
import org.opensearch.search.aggregations.bucket.FilterRewriteHelper;

import java.io.IOException;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.LongUnaryOperator;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -160,7 +165,6 @@ final class CompositeAggregator extends BucketsAggregator {
FilterRewriteHelper.ValueSourceContext dateHistogramSourceContext = new FilterRewriteHelper.ValueSourceContext(
dateHistogramSourceConfig.missingBucket(),
dateHistogramSourceConfig.hasScript(),
// TODO reading this can be null, and that's why we support missing
dateHistogramSourceConfig.fieldType()
);
FilterRewriteHelper.FilterContext filterContext = FilterRewriteHelper.buildFastFilterContext(
Expand All @@ -182,6 +186,18 @@ final class CompositeAggregator extends BucketsAggregator {
}
}

// private long[] computeBounds(final FilterRewriteHelper.ValueSourceContext fieldContext) {
// final long[] bounds = FilterRewriteHelper.getAggregationBounds(context, fieldContext.getFieldType().name());
// if (bounds != null) {
// // Update min/max limit if user specified any hard bounds
// if (hardBounds != null) {
// bounds[0] = Math.max(bounds[0], hardBounds.getMin());
// bounds[1] = Math.min(bounds[1], hardBounds.getMax() - 1); // hard bounds max is exclusive
// }
// }
// return bounds;
// }

@Override
protected void doClose() {
try {
Expand Down Expand Up @@ -237,17 +253,14 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
);
}


// For the fast filters optimization
if (bucketOrds.size() != 0) {
// transform existing buckets into map if not 0
// this is for the case where we have duplicate buckets, we need to add bucketOrds content into buckets
Map<CompositeKey, InternalComposite.InternalBucket> bucketMap = new HashMap<>();
for (InternalComposite.InternalBucket internalBucket : buckets) {
bucketMap.put(internalBucket.getRawKey(), internalBucket);
}
// need to add bucketOrds content into buckets

LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(0);
// if duplicate, add to existing
while (ordsEnum.next()) {
Long bucketValue = ordsEnum.value();
CompositeKey key = new CompositeKey(bucketValue);
Expand All @@ -267,9 +280,10 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
bucketMap.put(key, bucket);
}
}

List<InternalComposite.InternalBucket> bucketList = new ArrayList<>(bucketMap.values());
CollectionUtil.introSort(bucketList, InternalComposite.InternalBucket::compareKey);
buckets = bucketList.toArray(InternalComposite.InternalBucket[]::new);
buckets = bucketList.subList(0, Math.min(size, bucketList.size())).toArray(InternalComposite.InternalBucket[]::new);
num = buckets.length;
}

Expand Down Expand Up @@ -514,23 +528,16 @@ private void processLeafFromQuery(LeafReaderContext ctx, Sort indexSortPrefix) t

@Override
protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
// Need to be declared as final and array for usage within the
// LeafBucketCollectorBase subclass below
final boolean[] useOpt = new boolean[1];
useOpt[0] = filters != null;

// Try fast filter aggregation if the filters have been created
// Skip if tried before and gave incorrect/incomplete results
if (useOpt[0]) {
useOpt[0] = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType,
(key, count) -> {
incrementBucketDocCount(
FilterRewriteHelper.getBucketOrd(
bucketOrds.add(0, preparedRounding.round(key))),
count
);
});
}
boolean optimized = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType,
(key, count) -> {
incrementBucketDocCount(
FilterRewriteHelper.getBucketOrd(
bucketOrds.add(0, preparedRounding.round(key))
),
count
);
}, size);
if (optimized) throw new CollectionTerminatedException();

finishLeaf();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.CollectionUtil;
Expand Down Expand Up @@ -232,30 +233,22 @@ public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBuc
return LeafBucketCollector.NO_OP_COLLECTOR;
}

boolean optimized = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType,
(key, count) -> {
incrementBucketDocCount(
FilterRewriteHelper.getBucketOrd(
getBucketOrds().add(0, preparedRounding.round(key))
),
count
);
}, Integer.MAX_VALUE);
if (optimized) throw new CollectionTerminatedException();

final SortedNumericDocValues values = valuesSource.longValues(ctx);
final LeafBucketCollector iteratingCollector = getLeafCollector(values, sub);

// Need to be declared as final and array for usage within the
// LeafBucketCollectorBase subclass below
final boolean[] useOpt = new boolean[1];
useOpt[0] = filters != null;

return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
// Try fast filter aggregation if the filters have been created
// Skip if tried before and gave incorrect/incomplete results
if (useOpt[0]) {
useOpt[0] = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType,
(key, count) -> {
incrementBucketDocCount(
FilterRewriteHelper.getBucketOrd(
getBucketOrds().add(owningBucketOrd, preparedRounding.round(key))),
count
);
});
}

iteratingCollector.collect(doc, owningBucketOrd);
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.CollectionUtil;
Expand Down Expand Up @@ -165,29 +166,21 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol
return LeafBucketCollector.NO_OP_COLLECTOR;
}

// Need to be declared as final and array for usage within the
// LeafBucketCollectorBase subclass below
final boolean[] useOpt = new boolean[1];
useOpt[0] = filters != null;
boolean optimized = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType,
(key, count) -> {
incrementBucketDocCount(
FilterRewriteHelper.getBucketOrd(
bucketOrds.add(0, preparedRounding.round(key))
),
count
);
}, Integer.MAX_VALUE);
if (optimized) throw new CollectionTerminatedException();

SortedNumericDocValues values = valuesSource.longValues(ctx);
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
// Try fast filter aggregation if the filters have been created
// Skip if tried before and gave incorrect/incomplete results
if (useOpt[0]) {
useOpt[0] = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType,
(key, count) -> {
incrementBucketDocCount(
FilterRewriteHelper.getBucketOrd( // TODO reading not possible to see duplicate bucket
bucketOrds.add(owningBucketOrd, preparedRounding.round(key))
),
count
);
});
}

if (values.advanceExact(doc)) {
int valuesCount = values.docValueCount();

Expand Down

0 comments on commit 157b2e5

Please sign in to comment.