Skip to content

Commit

Permalink
drafts changes
Browse files Browse the repository at this point in the history
Signed-off-by: bowenlan-amzn <[email protected]>
  • Loading branch information
bowenlan-amzn committed Jan 10, 2024
1 parent 81d01fa commit 999f334
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.PointValues;
import org.apache.lucene.search.ConstantScoreQuery;
import org.apache.lucene.search.FieldExistsQuery;
import org.apache.lucene.search.IndexOrDocValuesQuery;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.PointRangeQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.NumericUtils;
import org.opensearch.common.CheckedFunction;
import org.opensearch.common.Rounding;
import org.opensearch.common.lucene.search.function.FunctionScoreQuery;
import org.opensearch.index.mapper.DateFieldMapper;
Expand All @@ -28,6 +28,7 @@
import org.opensearch.search.aggregations.bucket.composite.CompositeKey;
import org.opensearch.search.aggregations.bucket.composite.CompositeValuesSourceConfig;
import org.opensearch.search.aggregations.bucket.composite.RoundingValuesSource;
import org.opensearch.search.aggregations.bucket.histogram.LongBounds;
import org.opensearch.search.internal.SearchContext;

import java.io.IOException;
Expand All @@ -48,6 +49,7 @@
* <li> date histogram : date range filter.
* Applied: DateHistogramAggregator, AutoDateHistogramAggregator, CompositeAggregator </li>
* </ul>
*
* @opensearch.internal
*/
public class FastFilterRewriteHelper {
Expand Down Expand Up @@ -94,7 +96,7 @@ private static long[] getIndexBoundsFromLeaves(final SearchContext context, fina

if (min == Long.MAX_VALUE || max == Long.MIN_VALUE) return null;

return new long[] { min, max };
return new long[]{min, max};
}

/**
Expand All @@ -108,14 +110,19 @@ public static long[] getAggregationBounds(final SearchContext context, final Str
final PointRangeQuery prq = (PointRangeQuery) cq;
// Ensure that the query and aggregation are on the same field
if (prq.getField().equals(fieldName)) {
return new long[] {
return new long[]{
// Minimum bound for aggregation is the max between query and global
Math.max(NumericUtils.sortableBytesToLong(prq.getLowerPoint(), 0), indexBounds[0]),
// Maximum bound for aggregation is the min between query and global
Math.min(NumericUtils.sortableBytesToLong(prq.getUpperPoint(), 0), indexBounds[1]) };
Math.min(NumericUtils.sortableBytesToLong(prq.getUpperPoint(), 0), indexBounds[1])};
}
} else if (cq instanceof MatchAllDocsQuery) {
return indexBounds;
} else if (cq instanceof FieldExistsQuery) {
final FieldExistsQuery feq = (FieldExistsQuery) cq;
if (feq.getField().equals(fieldName)) {
return indexBounds;
}
}

return null;
Expand Down Expand Up @@ -163,8 +170,8 @@ private static Weight[] createFilterForAggregations(
roundedLow = preparedRounding.round(roundedLow + interval);
final byte[] upper = new byte[8];
NumericUtils.longToSortableBytes(i + 1 == bucketCount ? high :
// Subtract -1 if the minimum is roundedLow as roundedLow itself
// is included in the next bucket
// Subtract -1 if the minimum is roundedLow as roundedLow itself
// is included in the next bucket
fieldType.convertRoundedMillisToNanos(roundedLow) - 1, upper, 0);

filters[i++] = context.searcher().createWeight(new PointRangeQuery(field, lower, upper, 1) {
Expand All @@ -180,46 +187,45 @@ protected String toString(int dimension, byte[] value) {
}

/**
* @param computeBounds get the lower and upper bound of the field in a shard search
* This method build filters for date histogram aggregation
*
* @param roundingFunction produce Rounding that contains interval of date range.
* Rounding is computed dynamically using the bounds in AutoDateHistogram
* @param preparedRoundingSupplier produce PreparedRounding to round values at call-time
*/
public static void buildFastFilter(
SearchContext context,
CheckedFunction<FastFilterContext, long[], IOException> computeBounds,
Function<long[], Rounding> roundingFunction,
Supplier<Rounding.Prepared> preparedRoundingSupplier,
FastFilterContext fastFilterContext
) throws IOException {
assert fastFilterContext.fieldType instanceof DateFieldMapper.DateFieldType;
DateFieldMapper.DateFieldType fieldType = (DateFieldMapper.DateFieldType) fastFilterContext.fieldType;
final long[] bounds = computeBounds.apply(fastFilterContext); // TODO b do we need to pass in the context? or specific things
if (bounds != null) {
final Rounding rounding = roundingFunction.apply(bounds);
final OptionalLong intervalOpt = Rounding.getInterval(rounding);
if (intervalOpt.isEmpty()) {
return;
}
final long interval = intervalOpt.getAsLong();

// afterKey is the last bucket key in previous response, while the bucket key
// is the start of the bucket values, so add the interval
if (fastFilterContext.afterKey != -1) {
bounds[0] = fastFilterContext.afterKey + interval;
}
final long[] bounds = fastFilterContext.bounds;
assert bounds != null;
final Rounding rounding = roundingFunction.apply(bounds);
final OptionalLong intervalOpt = Rounding.getInterval(rounding);
if (intervalOpt.isEmpty()) {
return;
}
final long interval = intervalOpt.getAsLong();

final Weight[] filters = FastFilterRewriteHelper.createFilterForAggregations(
context,
interval,
preparedRoundingSupplier.get(),
fieldType.name(),
fieldType,
bounds[0],
bounds[1]
);
fastFilterContext.setFilters(filters);
// afterKey is the last bucket key in previous response, while the bucket key
// is the start of the bucket values, so add the interval
if (fastFilterContext.afterKey != -1) {
bounds[0] = fastFilterContext.afterKey + interval;
}

final Weight[] filters = FastFilterRewriteHelper.createFilterForAggregations(
context,
interval,
preparedRoundingSupplier.get(),
fieldType.name(),
fieldType,
bounds[0],
bounds[1]
);
fastFilterContext.setFilters(filters);
}

/**
Expand All @@ -229,12 +235,15 @@ public static class FastFilterContext {
private boolean missing = false; // TODO b confirm UT that can catch this
private boolean hasScript = false;
private boolean showOtherBucket = false;
private LongBounds hardBounds = null;

private final MappedFieldType fieldType;
private long[] bounds = null;

private long afterKey = -1L;
private int size = Integer.MAX_VALUE; // only used by composite aggregation for pagination
private Weight[] filters = null;

public Weight[] filters = null;

private final Type type;

Expand Down Expand Up @@ -293,16 +302,49 @@ public void setHasScript(boolean hasScript) {
this.hasScript = hasScript;
}

public void setHardBounds(LongBounds hardBounds) {
this.hardBounds = hardBounds;
}

public void setShowOtherBucket(boolean showOtherBucket) {
this.showOtherBucket = showOtherBucket;
}

public boolean isRewriteable(final Object parent, final int subAggLength) {
public boolean isRewriteable(final Object parent, final int subAggLength, SearchContext context) throws IOException {
if (parent == null && subAggLength == 0 && !missing && !hasScript) {
if (type == Type.FILTERS) {
return !showOtherBucket;
} else if (type == Type.DATE_HISTO && fieldType != null) {
final long[] bounds = FastFilterRewriteHelper.getAggregationBounds(context, this.fieldType.name());
if (bounds != null) {
// Update min/max limit if user specified any hard bounds
if (hardBounds != null) {
bounds[0] = Math.max(bounds[0], hardBounds.getMin());
bounds[1] = Math.min(bounds[1], hardBounds.getMax() - 1); // hard bounds max is exclusive
}
this.bounds = bounds;
return fieldType instanceof DateFieldMapper.DateFieldType;
}
}
}
return false;
}

public boolean isRewriteable(final Object parent, final int subAggLength, LeafReaderContext leaf) throws IOException {
if (parent == null && subAggLength == 0 && !missing && !hasScript) {
if (type == Type.FILTERS) {
return !showOtherBucket;
} else if (type == Type.DATE_HISTO) {
return fieldType != null && fieldType instanceof DateFieldMapper.DateFieldType;
} else if (type == Type.DATE_HISTO && fieldType != null) {
final long[] bounds = FastFilterRewriteHelper.getIndexBoundsFromLeave(leaf, this.fieldType.name());
if (bounds != null) {
// Update min/max limit if user specified any hard bounds
if (hardBounds != null) {
bounds[0] = Math.max(bounds[0], hardBounds.getMin());
bounds[1] = Math.min(bounds[1], hardBounds.getMax() - 1); // hard bounds max is exclusive
}
this.bounds = bounds;
return fieldType instanceof DateFieldMapper.DateFieldType;
}
}
}
return false;
Expand All @@ -317,6 +359,17 @@ public enum Type {
}
}

private static long[] getIndexBoundsFromLeave(LeafReaderContext leaf, final String fieldName) throws IOException {
final PointValues values = leaf.reader().getPointValues(fieldName);
long min = Long.MAX_VALUE, max = Long.MIN_VALUE;
if (values != null) {
min = Math.min(min, NumericUtils.sortableBytesToLong(values.getMinPackedValue(), 0));
max = Math.max(max, NumericUtils.sortableBytesToLong(values.getMaxPackedValue(), 0));
}
if (min == Long.MAX_VALUE || max == Long.MIN_VALUE) return null;
return new long[]{min, max};
}

public static long getBucketOrd(long bucketOrd) {
if (bucketOrd < 0) { // already seen
bucketOrd = -1 - bucketOrd;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ final class CompositeAggregator extends BucketsAggregator {
this.rawAfterKey = rawAfterKey;

fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(sourceConfigs, rawAfterKey, formats);
if (fastFilterContext.isRewriteable(parent, subAggregators.length)) {
if (fastFilterContext.isRewriteable(parent, subAggregators.length, context)) {
// Currently the filter rewrite is only supported for date histograms
RoundingValuesSource dateHistogramSource = fastFilterContext.getDateHistogramSource();
preparedRounding = dateHistogramSource.getPreparedRounding();
Expand All @@ -173,7 +173,6 @@ final class CompositeAggregator extends BucketsAggregator {
fastFilterContext.setSize(size);
FastFilterRewriteHelper.buildFastFilter(
context,
fc -> FastFilterRewriteHelper.getAggregationBounds(context, fc.getFieldType().name()),
x -> dateHistogramSource.getRounding(),
() -> preparedRounding,
fastFilterContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,9 @@ private AutoDateHistogramAggregator(
fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(valuesSourceConfig.fieldType());
fastFilterContext.setMissing(valuesSourceConfig.missing() != null);
fastFilterContext.setHasScript(valuesSourceConfig.script() != null);
if (fastFilterContext.isRewriteable(parent, subAggregators.length)) {
if (fastFilterContext.isRewriteable(parent, subAggregators.length, context)) {
FastFilterRewriteHelper.buildFastFilter(
context,
fc -> FastFilterRewriteHelper.getAggregationBounds(context, fc.getFieldType().name()),
b -> getMinimumRounding(b[0], b[1]),
// Passing prepared rounding as supplier to ensure the correct prepared
// rounding is set as it is done during getMinimumRounding
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.CollectionUtil;
import org.opensearch.common.Nullable;
import org.opensearch.common.Rounding;
Expand Down Expand Up @@ -83,6 +84,7 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg
private final LongKeyedBucketOrds bucketOrds;

private final FastFilterRewriteHelper.FastFilterContext fastFilterContext;
private Weight weight;

DateHistogramAggregator(
String name,
Expand Down Expand Up @@ -118,23 +120,12 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg
fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(valuesSourceConfig.fieldType());
fastFilterContext.setMissing(valuesSourceConfig.missing() != null);
fastFilterContext.setHasScript(valuesSourceConfig.script() != null);
if (fastFilterContext.isRewriteable(parent, subAggregators.length)) {
FastFilterRewriteHelper.buildFastFilter(context, this::computeBounds, x -> rounding, () -> preparedRounding, fastFilterContext);
fastFilterContext.setHardBounds(hardBounds); // TODO b try add ut for this
if (fastFilterContext.isRewriteable(parent, subAggregators.length, context)) {
FastFilterRewriteHelper.buildFastFilter(context, x -> rounding, () -> preparedRounding, fastFilterContext);
}
}

private long[] computeBounds(final FastFilterRewriteHelper.FastFilterContext fieldContext) throws IOException {
final long[] bounds = FastFilterRewriteHelper.getAggregationBounds(context, fieldContext.getFieldType().name());
if (bounds != null) {
// Update min/max limit if user specified any hard bounds
if (hardBounds != null) {
bounds[0] = Math.max(bounds[0], hardBounds.getMin());
bounds[1] = Math.min(bounds[1], hardBounds.getMax() - 1); // hard bounds max is exclusive
}
}
return bounds;
}

@Override
public ScoreMode scoreMode() {
if (valuesSource != null && valuesSource.needsScores()) {
Expand All @@ -149,10 +140,24 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol
return LeafBucketCollector.NO_OP_COLLECTOR;
}

// Not rewriteable at shard level, now at segment level, check if it can be rewritten
boolean rewriteable = fastFilterContext.filters != null;
if (!rewriteable && ctx.reader().numDocs() == weight.count(ctx)) {
if (fastFilterContext.isRewriteable(parent, subAggregators.length, ctx)) {
rewriteable = true;
FastFilterRewriteHelper.buildFastFilter(context, x -> rounding, () -> preparedRounding, fastFilterContext);
}
}

boolean optimized = FastFilterRewriteHelper.tryFastFilterAggregation(ctx, fastFilterContext, (key, count) -> {
incrementBucketDocCount(FastFilterRewriteHelper.getBucketOrd(bucketOrds.add(0, preparedRounding.round(key))), count);
});
if (optimized) throw new CollectionTerminatedException();
if (optimized) {
if (rewriteable == true) { // reset filters after segment level rewrite
fastFilterContext.setFilters(null);
}
throw new CollectionTerminatedException();
}

SortedNumericDocValues values = valuesSource.longValues(ctx);
return new LeafBucketCollectorBase(sub, values) {
Expand Down Expand Up @@ -185,6 +190,11 @@ public void collect(int doc, long owningBucketOrd) throws IOException {
};
}

@Override
public void setWeight(Weight weight) {
this.weight = weight;
}

@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
return buildAggregationsForVariableBuckets(owningBucketOrds, bucketOrds, (bucketValue, docCount, subAggregationResults) -> {
Expand Down

0 comments on commit 999f334

Please sign in to comment.