Skip to content

Commit

Permalink
segment level match all case
Browse files Browse the repository at this point in the history
Signed-off-by: bowenlan-amzn <[email protected]>
  • Loading branch information
bowenlan-amzn committed Jan 29, 2024
1 parent 2e5a07b commit 486b8b8
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 159 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.search.aggregations.bucket;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.PointValues;
import org.apache.lucene.search.ConstantScoreQuery;
Expand Down Expand Up @@ -53,6 +55,8 @@ public final class FastFilterRewriteHelper {

private FastFilterRewriteHelper() {}

private static final Logger logger = LogManager.getLogger(FastFilterRewriteHelper.class);

private static final int MAX_NUM_FILTER_BUCKETS = 1024;
private static final Map<Class<?>, Function<Query, Query>> queryWrappers;

Expand Down Expand Up @@ -86,6 +90,7 @@ private static long[] getIndexBounds(final SearchContext context, final String f
// Since the query does not specify bounds for aggregation, we can
// build the global min/max from local min/max within each segment
for (LeafReaderContext leaf : leaves) {
// return null if the field is not indexed
final PointValues values = leaf.reader().getPointValues(fieldName);
if (values != null) {
min = Math.min(min, NumericUtils.sortableBytesToLong(values.getMinPackedValue(), 0));
Expand Down Expand Up @@ -187,79 +192,58 @@ protected String toString(int dimension, byte[] value) {
}

/**
* Context object to do fast filter optimization
* Context object for fast filter optimization
*
* filters equals to null if the optimization cannot be applied
*/
public static class FastFilterContext {
private boolean rewriteable = false;
private Weight[] filters = null;
public AggregationType aggregationType;
protected final SearchContext context;

private void setFilters(Weight[] filters) {
this.filters = filters;
public FastFilterContext(SearchContext context) {
this.context = context;
}

public void setAggregationType(AggregationType aggregationType) {
this.aggregationType = aggregationType;
}

public boolean isRewriteable(final Object parent, final int subAggLength) {
return aggregationType.isRewriteable(parent, subAggLength);
boolean rewriteable = aggregationType.isRewriteable(parent, subAggLength);
logger.debug("Fast filter rewriteable: {}", rewriteable);
this.rewriteable = rewriteable;
return rewriteable;
}

/**
* This filter build method is for date histogram aggregation type
*
*/
// public void buildFastFilter(
// SearchContext context,
// CheckedFunction<DateHistogramAggregationType, long[], IOException> computeBounds,
// Function<long[], Rounding> roundingFunction,
// Supplier<Rounding.Prepared> preparedRoundingSupplier
// ) throws IOException {
// assert this.aggregationType instanceof DateHistogramAggregationType;
// DateHistogramAggregationType aggregationType = (DateHistogramAggregationType) this.aggregationType;
// DateFieldMapper.DateFieldType fieldType = aggregationType.getFieldType();
// final long[] bounds = computeBounds.apply(aggregationType);
// if (bounds == null) return;
//
// final Rounding rounding = roundingFunction.apply(bounds);
// final OptionalLong intervalOpt = Rounding.getInterval(rounding);
// if (intervalOpt.isEmpty()) return;
// final long interval = intervalOpt.getAsLong();
//
// // afterKey is the last bucket key in previous response, while the bucket key
// // is the start of the bucket values, so add the interval
// if (aggregationType instanceof CompositeAggregationType && ((CompositeAggregationType) aggregationType).afterKey != -1) {
// bounds[0] = ((CompositeAggregationType) aggregationType).afterKey + interval;
// }
//
// final Weight[] filters = FastFilterRewriteHelper.createFilterForAggregations(
// context,
// interval,
// preparedRoundingSupplier.get(),
// fieldType.name(),
// fieldType,
// bounds[0],
// bounds[1]
// );
// this.setFilters(filters);
// }

public void buildFastFilter(SearchContext context) throws IOException {
Weight[] filters = this.aggregationType.buildFastFilter(context);
this.setFilters(filters);
public void buildFastFilter() throws IOException {
Weight[] filters = this.buildFastFilter(null);
logger.debug("Fast filter built: {}", filters == null ? "no" : "yes");
this.filters = filters;
}

// this method should delegate to specific aggregation type
public Weight[] buildFastFilter(LeafReaderContext ctx) throws IOException {
Weight[] filters = this.aggregationType.buildFastFilter(context, ctx);
logger.debug("Fast filter built: {}", filters == null ? "no" : "yes");
this.filters = filters;
return filters;
}

// can this all be triggered in aggregation type?
public boolean hasfilterBuilt() {
return filters != null;
}
}

/**
* Different types have different pre-conditions, filter building logic, etc.
*/
public interface AggregationType {

boolean isRewriteable(Object parent, int subAggLength);
Weight[] buildFastFilter(SearchContext context) throws IOException;

// Weight[] buildFastFilter(SearchContext context) throws IOException;
Weight[] buildFastFilter(SearchContext context, LeafReaderContext ctx) throws IOException;
}

/**
Expand All @@ -269,7 +253,7 @@ public static abstract class AbstractDateHistogramAggregationType implements Agg
private final MappedFieldType fieldType;
private final boolean missing;
private final boolean hasScript;
private LongBounds hardBounds = null;
private LongBounds hardBounds;

public AbstractDateHistogramAggregationType(MappedFieldType fieldType, boolean missing, boolean hasScript) {
this.fieldType = fieldType;
Expand All @@ -284,33 +268,31 @@ public AbstractDateHistogramAggregationType(MappedFieldType fieldType, boolean m

@Override
public boolean isRewriteable(Object parent, int subAggLength) {
// at shard level these are the pre-conditions
// these still need to apply at segment level
// while the effectively segment level match all, should be plugged in within compute bound
if (parent == null && subAggLength == 0 && !missing && !hasScript) {
return fieldType != null && fieldType instanceof DateFieldMapper.DateFieldType;
}
return false;
}

@Override
public Weight[] buildFastFilter(SearchContext context) throws IOException {
// the procedure in this method can be separated
// 1. compute bound
// 2. get rounding, interval
// 3. get prepared rounding, better combined with step 2
Weight[] result = {};

long[] bounds = computeBounds(context);
if (bounds == null) return result;
public Weight[] buildFastFilter(SearchContext context, LeafReaderContext ctx) throws IOException {
long[] bounds;
if (ctx != null) {
bounds = getIndexBounds(context, fieldType.name());
} else {
bounds = computeBounds(context);
}
if (bounds == null) return null;

final Rounding rounding = getRounding(bounds[0], bounds[1]);
final OptionalLong intervalOpt = Rounding.getInterval(rounding);
if (intervalOpt.isEmpty()) return result;
if (intervalOpt.isEmpty()) return null;
final long interval = intervalOpt.getAsLong();

// afterKey is the last bucket key in previous response, while the bucket key
// is the start of the bucket values, so add the interval
// if (aggregationType instanceof CompositeAggregationType && ((CompositeAggregationType) aggregationType).afterKey != -1) {
// bounds[0] = ((CompositeAggregationType) aggregationType).afterKey + interval;
// }
// process the after key from composite agg
processAfterKey(bounds, interval);

return FastFilterRewriteHelper.createFilterForAggregations(
Expand All @@ -337,6 +319,7 @@ protected long[] computeBounds(SearchContext context) throws IOException {
}

protected abstract Rounding getRounding(final long low, final long high);

protected abstract Rounding.Prepared getRoundingPrepared();

protected void processAfterKey(long[] bound, long interval) {};
Expand All @@ -347,40 +330,6 @@ public DateFieldMapper.DateFieldType getFieldType() {
}
}

/**
* For composite aggregation with date histogram as a source
*/
// public static class CompositeAggregationType extends DateHistogramAggregationType {
// private final RoundingValuesSource valuesSource;
// private long afterKey = -1L;
// private final int size;
//
// public CompositeAggregationType(
// CompositeValuesSourceConfig[] sourceConfigs,
// CompositeKey rawAfterKey,
// List<DocValueFormat> formats,
// int size
// ) {
// super(sourceConfigs[0].fieldType(), sourceConfigs[0].missingBucket(), sourceConfigs[0].hasScript());
// this.valuesSource = (RoundingValuesSource) sourceConfigs[0].valuesSource();
// this.size = size;
// if (rawAfterKey != null) {
// assert rawAfterKey.size() == 1 && formats.size() == 1;
// this.afterKey = formats.get(0).parseLong(rawAfterKey.get(0).toString(), false, () -> {
// throw new IllegalArgumentException("now() is not supported in [after] key");
// });
// }
// }
//
// public Rounding getRounding() {
// return valuesSource.getRounding();
// }
//
// public Rounding.Prepared getRoundingPreparer() {
// return valuesSource.getPreparedRounding();
// }
// }

public static boolean isCompositeAggRewriteable(CompositeValuesSourceConfig[] sourceConfigs) {
return sourceConfigs.length == 1 && sourceConfigs[0].valuesSource() instanceof RoundingValuesSource;
}
Expand All @@ -394,7 +343,7 @@ public static long getBucketOrd(long bucketOrd) {
}

/**
* This is executed for each segment by passing the leaf reader context
* Try to get the bucket doc counts from the fast filters for the aggregation
*
* @param incrementDocCount takes in the bucket key value and the bucket count
*/
Expand All @@ -404,7 +353,17 @@ public static boolean tryFastFilterAggregation(
final BiConsumer<Long, Integer> incrementDocCount
) throws IOException {
if (fastFilterContext == null) return false;
if (fastFilterContext.filters == null) return false;
if (!fastFilterContext.hasfilterBuilt() && fastFilterContext.rewriteable) {
// Check if the query is functionally match-all at segment level
// if so, we still compute the index bounds and use it to build the filters
SearchContext context = fastFilterContext.context;
Weight weight = context.searcher().createWeight(context.query(), ScoreMode.COMPLETE_NO_SCORES, 1f);
boolean segmentRewriteable = weight != null && weight.count(ctx) == ctx.reader().numDocs();
if (segmentRewriteable) {
fastFilterContext.buildFastFilter(ctx);
}
}
if (!fastFilterContext.hasfilterBuilt()) return false;

final Weight[] filters = fastFilterContext.filters;
final int[] counts = new int[filters.length];
Expand All @@ -424,8 +383,8 @@ public static boolean tryFastFilterAggregation(
if (counts[i] > 0) {
long bucketKey = i; // the index of filters is the key for filters aggregation
if (fastFilterContext.aggregationType instanceof AbstractDateHistogramAggregationType) {
final DateFieldMapper.DateFieldType fieldType = ((AbstractDateHistogramAggregationType) fastFilterContext.aggregationType)
.getFieldType();
final DateFieldMapper.DateFieldType fieldType =
((AbstractDateHistogramAggregationType) fastFilterContext.aggregationType).getFieldType();
bucketKey = fieldType.convertNanosToMillis(
NumericUtils.sortableBytesToLong(((PointRangeQuery) filters[i].getQuery()).getLowerPoint(), 0)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,24 +164,20 @@ public final class CompositeAggregator extends BucketsAggregator {
this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size, rawAfterKey);
this.rawAfterKey = rawAfterKey;

fastFilterContext = new FastFilterRewriteHelper.FastFilterContext();
fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(context);
if (!FastFilterRewriteHelper.isCompositeAggRewriteable(sourceConfigs)) return;
fastFilterContext.setAggregationType(
new CompositeAggregationType()
);
fastFilterContext.setAggregationType(new CompositeAggregationType());
if (fastFilterContext.isRewriteable(parent, subAggregators.length)) {
// bucketOrds is the data structure for saving date histogram results
// bucketOrds is used for saving date histogram results
bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), CardinalityUpperBound.ONE);
// Currently the filter rewrite is only supported for date histograms
// FastFilterRewriteHelper.CompositeAggregationType aggregationType =
// (FastFilterRewriteHelper.CompositeAggregationType) fastFilterContext.aggregationType;
// preparedRounding = aggregationType.getRoundingPreparer();
fastFilterContext.buildFastFilter(
context
);
preparedRounding = ((CompositeAggregationType) fastFilterContext.aggregationType).getRoundingPrepared();
fastFilterContext.buildFastFilter();
}
}

/**
* Currently the filter rewrite is only supported for date histograms
*/
public class CompositeAggregationType extends FastFilterRewriteHelper.AbstractDateHistogramAggregationType {
private final RoundingValuesSource valuesSource;
private long afterKey = -1L;
Expand All @@ -197,20 +193,21 @@ public CompositeAggregationType() {
}
}

@Override
public Rounding getRounding(final long low, final long high) {
return valuesSource.getRounding();
}

public Rounding.Prepared getRoundingPrepared() {
return valuesSource.getPreparedRounding();
}

@Override
protected void processAfterKey(long[] bound, long interval) {
// afterKey is the last bucket key in previous response, and the bucket key
// is the minimum of all values in the bucket, so need to add the interval
bound[0] = afterKey + interval;
}

public Rounding.Prepared getRoundingPrepared() {
return valuesSource.getPreparedRounding();
}

public int getSize() {
return size;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ private AutoDateHistogramAggregator(
this.roundingPreparer = roundingPreparer;
this.preparedRounding = prepareRounding(0);

fastFilterContext = new FastFilterRewriteHelper.FastFilterContext();
fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(context);
fastFilterContext.setAggregationType(
new AutoHistogramAggregationType(
valuesSourceConfig.fieldType(),
Expand All @@ -166,32 +166,12 @@ private AutoDateHistogramAggregator(
)
);
if (fastFilterContext.isRewriteable(parent, subAggregators.length)) {
fastFilterContext.buildFastFilter(context);
fastFilterContext.buildFastFilter();
}
}

// private Rounding getMinimumRounding(final long low, final long high) {
// // max - min / targetBuckets = bestDuration
// // find the right innerInterval this bestDuration belongs to
// // since we cannot exceed targetBuckets, bestDuration should go up,
// // so the right innerInterval should be an upper bound
// long bestDuration = (high - low) / targetBuckets;
// while (roundingIdx < roundingInfos.length - 1) {
// final RoundingInfo curRoundingInfo = roundingInfos[roundingIdx];
// final int temp = curRoundingInfo.innerIntervals[curRoundingInfo.innerIntervals.length - 1];
// // If the interval duration is covered by the maximum inner interval,
// // we can start with this outer interval for creating the buckets
// if (bestDuration <= temp * curRoundingInfo.roughEstimateDurationMillis) {
// break;
// }
// roundingIdx++;
// }
//
// preparedRounding = prepareRounding(roundingIdx);
// return roundingInfos[roundingIdx].rounding;
// }

private class AutoHistogramAggregationType extends FastFilterRewriteHelper.AbstractDateHistogramAggregationType {

public AutoHistogramAggregationType(MappedFieldType fieldType, boolean missing, boolean hasScript) {
super(fieldType, missing, hasScript);
}
Expand Down
Loading

0 comments on commit 486b8b8

Please sign in to comment.