Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: bowenlan-amzn <[email protected]>
  • Loading branch information
bowenlan-amzn committed Jan 16, 2024
1 parent 81d01fa commit c6874c1
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 132 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Performance improvement for MultiTerm Queries on Keyword fields ([#7057](https://github.com/opensearch-project/OpenSearch/issues/7057))
- Refactor common parts from the Rounding class into a separate 'round' package ([#11023](https://github.com/opensearch-project/OpenSearch/issues/11023))
- Performance improvement for date histogram aggregations without sub-aggregations ([#11083](https://github.com/opensearch-project/OpenSearch/pull/11083))
- Apply the fast filter optimization to composite aggregation ([#11505](https://github.com/opensearch-project/OpenSearch/pull/11083))
- Apply the fast filter optimization to composite aggregation of date histogram source ([#11505](https://github.com/opensearch-project/OpenSearch/pull/11083))
- Disable concurrent aggs for Diversified Sampler and Sampler aggs ([#11087](https://github.com/opensearch-project/OpenSearch/issues/11087))
- Made leader/follower check timeout setting dynamic ([#10528](https://github.com/opensearch-project/OpenSearch/pull/10528))
- Improved performance of numeric exact-match queries ([#11209](https://github.com/opensearch-project/OpenSearch/pull/11209))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,15 @@
* <li> date histogram : date range filter.
* Applied: DateHistogramAggregator, AutoDateHistogramAggregator, CompositeAggregator </li>
* </ul>
*
* @opensearch.internal
*/
public class FastFilterRewriteHelper {

private static final int MAX_NUM_FILTER_BUCKETS = 1024;
private static final Map<Class<?>, Function<Query, Query>> queryWrappers;

// Initialize the wrappers map for unwrapping the query
// Initialize the wrapper map for unwrapping the query
static {
queryWrappers = new HashMap<>();
queryWrappers.put(ConstantScoreQuery.class, q -> ((ConstantScoreQuery) q).getQuery());
Expand All @@ -77,9 +78,9 @@ private static Query unwrapIntoConcreteQuery(Query query) {
}

/**
* Finds the min and max bounds for segments within the passed search context
* Finds the min and max bounds of field values for the shard
*/
private static long[] getIndexBoundsFromLeaves(final SearchContext context, final String fieldName) throws IOException {
private static long[] getIndexBounds(final SearchContext context, final String fieldName) throws IOException {
final List<LeafReaderContext> leaves = context.searcher().getIndexReader().leaves();
long min = Long.MAX_VALUE, max = Long.MIN_VALUE;
// Since the query does not specify bounds for aggregation, we can
Expand All @@ -103,7 +104,7 @@ private static long[] getIndexBoundsFromLeaves(final SearchContext context, fina
*/
public static long[] getAggregationBounds(final SearchContext context, final String fieldName) throws IOException {
final Query cq = unwrapIntoConcreteQuery(context.query());
final long[] indexBounds = getIndexBoundsFromLeaves(context, fieldName);
final long[] indexBounds = getIndexBounds(context, fieldName);
if (cq instanceof PointRangeQuery) {
final PointRangeQuery prq = (PointRangeQuery) cq;
// Ensure that the query and aggregation are on the same field
Expand All @@ -117,8 +118,14 @@ public static long[] getAggregationBounds(final SearchContext context, final Str
} else if (cq instanceof MatchAllDocsQuery) {
return indexBounds;
}

return null;
// Check if the top-level query (which may be a PRQ on another field) is functionally match-all
Weight weight = context.searcher().createWeight(context.query(), ScoreMode.COMPLETE_NO_SCORES, 1f);
for (LeafReaderContext ctx : context.searcher().getIndexReader().leaves()) {
if (weight.count(ctx) != ctx.reader().numDocs()) {
return null;
}
}
return indexBounds;
}

/**
Expand Down Expand Up @@ -180,33 +187,55 @@ protected String toString(int dimension, byte[] value) {
}

/**
* @param computeBounds get the lower and upper bound of the field in a shard search
* @param roundingFunction produce Rounding that contains interval of date range.
* Rounding is computed dynamically using the bounds in AutoDateHistogram
* @param preparedRoundingSupplier produce PreparedRounding to round values at call-time
* Context object to do fast filter optimization
*/
public static void buildFastFilter(
SearchContext context,
CheckedFunction<FastFilterContext, long[], IOException> computeBounds,
Function<long[], Rounding> roundingFunction,
Supplier<Rounding.Prepared> preparedRoundingSupplier,
FastFilterContext fastFilterContext
) throws IOException {
assert fastFilterContext.fieldType instanceof DateFieldMapper.DateFieldType;
DateFieldMapper.DateFieldType fieldType = (DateFieldMapper.DateFieldType) fastFilterContext.fieldType;
final long[] bounds = computeBounds.apply(fastFilterContext); // TODO b do we need to pass in the context? or specific things
if (bounds != null) {
public static class FastFilterContext {
private Weight[] filters = null;
public AggregationType aggregationType;

public FastFilterContext() {}

private void setFilters(Weight[] filters) {
this.filters = filters;
}

public void setAggregationType(AggregationType aggregationType) {
this.aggregationType = aggregationType;
}

public boolean isRewriteable(final Object parent, final int subAggLength) {
return aggregationType.isRewriteable(parent, subAggLength);
}

/**
* This filter build method is for date histogram aggregation type
*
* @param computeBounds get the lower and upper bound of the field in a shard search
* @param roundingFunction produce Rounding that contains interval of date range.
* Rounding is computed dynamically using the bounds in AutoDateHistogram
* @param preparedRoundingSupplier produce PreparedRounding to round values at call-time
*/
public void buildFastFilter(
SearchContext context,
CheckedFunction<DateHistogramAggregationType, long[], IOException> computeBounds,
Function<long[], Rounding> roundingFunction,
Supplier<Rounding.Prepared> preparedRoundingSupplier
) throws IOException {
assert this.aggregationType instanceof DateHistogramAggregationType;
DateHistogramAggregationType aggregationType = (DateHistogramAggregationType) this.aggregationType;
DateFieldMapper.DateFieldType fieldType = aggregationType.getFieldType();
final long[] bounds = computeBounds.apply(aggregationType);
if (bounds == null) return;

final Rounding rounding = roundingFunction.apply(bounds);
final OptionalLong intervalOpt = Rounding.getInterval(rounding);
if (intervalOpt.isEmpty()) {
return;
}
if (intervalOpt.isEmpty()) return;
final long interval = intervalOpt.getAsLong();

// afterKey is the last bucket key in previous response, while the bucket key
// is the start of the bucket values, so add the interval
if (fastFilterContext.afterKey != -1) {
bounds[0] = fastFilterContext.afterKey + interval;
if (aggregationType instanceof CompositeAggregationType && ((CompositeAggregationType) aggregationType).afterKey != -1) {
bounds[0] = ((CompositeAggregationType) aggregationType).afterKey + interval;
}

final Weight[] filters = FastFilterRewriteHelper.createFilterForAggregations(
Expand All @@ -218,103 +247,81 @@ public static void buildFastFilter(
bounds[0],
bounds[1]
);
fastFilterContext.setFilters(filters);
this.setFilters(filters);
}
}

/**
* Encapsulates metadata about a value source needed to rewrite
* Different types have different pre-conditions, filter building logic, etc.
*/
public static class FastFilterContext {
private boolean missing = false; // TODO b confirm UT that can catch this
private boolean hasScript = false;
private boolean showOtherBucket = false;
public interface AggregationType {
boolean isRewriteable(Object parent, int subAggLength);
}

/**
* For date histogram aggregation
*/
public static class DateHistogramAggregationType implements AggregationType {
private final MappedFieldType fieldType;
private final boolean missing;
private final boolean hasScript;

private long afterKey = -1L;
private int size = Integer.MAX_VALUE; // only used by composite aggregation for pagination
private Weight[] filters = null;

private final Type type;

private RoundingValuesSource valuesSource = null;

public FastFilterContext(MappedFieldType fieldType) {
public DateHistogramAggregationType(MappedFieldType fieldType, boolean missing, boolean hasScript) {
this.fieldType = fieldType;
this.type = Type.DATE_HISTO;
this.missing = missing;
this.hasScript = hasScript;
}

public FastFilterContext(CompositeValuesSourceConfig[] sourceConfigs, CompositeKey rawAfterKey, List<DocValueFormat> formats) {
if (sourceConfigs.length == 1 && sourceConfigs[0].valuesSource() instanceof RoundingValuesSource) {
this.fieldType = sourceConfigs[0].fieldType();
this.valuesSource = (RoundingValuesSource) sourceConfigs[0].valuesSource();
this.missing = sourceConfigs[0].missingBucket();
this.hasScript = sourceConfigs[0].hasScript();
if (rawAfterKey != null) {
assert rawAfterKey.size() == 1 && formats.size() == 1;
this.afterKey = formats.get(0).parseLong(rawAfterKey.get(0).toString(), false, () -> {
throw new IllegalArgumentException("now() is not supported in [after] key");
});
}
} else {
this.fieldType = null;
@Override
public boolean isRewriteable(Object parent, int subAggLength) {
if (parent == null && subAggLength == 0 && !missing && !hasScript) {
return fieldType != null && fieldType instanceof DateFieldMapper.DateFieldType;
}
this.type = Type.DATE_HISTO;
}

public FastFilterContext(Type type) {
this.fieldType = null;
this.type = type;
return false;
}

public DateFieldMapper.DateFieldType getFieldType() {
assert fieldType instanceof DateFieldMapper.DateFieldType;
return (DateFieldMapper.DateFieldType) fieldType;
}
}

public RoundingValuesSource getDateHistogramSource() {
return valuesSource;
}

public void setSize(int size) {
/**
* For composite aggregation with date histogram as a source
*/
public static class CompositeAggregationType extends DateHistogramAggregationType {
private final RoundingValuesSource valuesSource;
private long afterKey = -1L;
private final int size;

public CompositeAggregationType(
CompositeValuesSourceConfig[] sourceConfigs,
CompositeKey rawAfterKey,
List<DocValueFormat> formats,
int size
) {
super(sourceConfigs[0].fieldType(), sourceConfigs[0].missingBucket(), sourceConfigs[0].hasScript());
this.valuesSource = (RoundingValuesSource) sourceConfigs[0].valuesSource();
this.size = size;
if (rawAfterKey != null) {
assert rawAfterKey.size() == 1 && formats.size() == 1;
this.afterKey = formats.get(0).parseLong(rawAfterKey.get(0).toString(), false, () -> {
throw new IllegalArgumentException("now() is not supported in [after] key");
});
}
}

public void setFilters(Weight[] filters) {
this.filters = filters;
}

public void setMissing(boolean missing) {
this.missing = missing;
}

public void setHasScript(boolean hasScript) {
this.hasScript = hasScript;
public Rounding getRounding() {
return valuesSource.getRounding();
}

public void setShowOtherBucket(boolean showOtherBucket) {
this.showOtherBucket = showOtherBucket;
}

public boolean isRewriteable(final Object parent, final int subAggLength) {
if (parent == null && subAggLength == 0 && !missing && !hasScript) {
if (type == Type.FILTERS) {
return !showOtherBucket;
} else if (type == Type.DATE_HISTO) {
return fieldType != null && fieldType instanceof DateFieldMapper.DateFieldType;
}
}
return false;
public Rounding.Prepared getRoundingPreparer() {
return valuesSource.getPreparedRounding();
}
}

/**
* Different types have different pre-conditions, filter building logic, etc.
*/
public enum Type {
FILTERS,
DATE_HISTO
}
public static boolean isCompositeAggRewriteable(CompositeValuesSourceConfig[] sourceConfigs) {
return sourceConfigs.length == 1 && sourceConfigs[0].valuesSource() instanceof RoundingValuesSource;
}

public static long getBucketOrd(long bucketOrd) {
Expand All @@ -326,7 +333,7 @@ public static long getBucketOrd(long bucketOrd) {
}

/**
* This should be executed for each segment
* This is executed for each segment by passing the leaf reader context
*
* @param incrementDocCount takes in the bucket key value and the bucket count
*/
Expand All @@ -339,7 +346,6 @@ public static boolean tryFastFilterAggregation(
if (fastFilterContext.filters == null) return false;

final Weight[] filters = fastFilterContext.filters;
// TODO b refactor the type conversion to the context
final int[] counts = new int[filters.length];
int i;
for (i = 0; i < filters.length; i++) {
Expand All @@ -352,18 +358,23 @@ public static boolean tryFastFilterAggregation(
}

int s = 0;
int size = Integer.MAX_VALUE;
for (i = 0; i < filters.length; i++) {
if (counts[i] > 0) {
long bucketKey = i; // the index of filters is the key for filters aggregation
if (fastFilterContext.type == FastFilterContext.Type.DATE_HISTO) {
final DateFieldMapper.DateFieldType fieldType = (DateFieldMapper.DateFieldType) fastFilterContext.fieldType;
if (fastFilterContext.aggregationType instanceof DateHistogramAggregationType) {
final DateFieldMapper.DateFieldType fieldType = ((DateHistogramAggregationType) fastFilterContext.aggregationType)
.getFieldType();
bucketKey = fieldType.convertNanosToMillis(
NumericUtils.sortableBytesToLong(((PointRangeQuery) filters[i].getQuery()).getLowerPoint(), 0)
);
if (fastFilterContext.aggregationType instanceof CompositeAggregationType) {
size = ((CompositeAggregationType) fastFilterContext.aggregationType).size;
}
}
incrementDocCount.accept(bucketKey, counts[i]);
s++;
if (s > fastFilterContext.size) return true;
if (s > size) return true;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,20 +163,23 @@ final class CompositeAggregator extends BucketsAggregator {
this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size, rawAfterKey);
this.rawAfterKey = rawAfterKey;

fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(sourceConfigs, rawAfterKey, formats);
fastFilterContext = new FastFilterRewriteHelper.FastFilterContext();
if (!FastFilterRewriteHelper.isCompositeAggRewriteable(sourceConfigs)) return;
fastFilterContext.setAggregationType(
new FastFilterRewriteHelper.CompositeAggregationType(sourceConfigs, rawAfterKey, formats, size)
);
if (fastFilterContext.isRewriteable(parent, subAggregators.length)) {
// Currently the filter rewrite is only supported for date histograms
RoundingValuesSource dateHistogramSource = fastFilterContext.getDateHistogramSource();
preparedRounding = dateHistogramSource.getPreparedRounding();
// bucketOrds is the data structure for saving date histogram results
bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), CardinalityUpperBound.ONE);
fastFilterContext.setSize(size);
FastFilterRewriteHelper.buildFastFilter(
// Currently the filter rewrite is only supported for date histograms
FastFilterRewriteHelper.CompositeAggregationType aggregationType =
(FastFilterRewriteHelper.CompositeAggregationType) fastFilterContext.aggregationType;
preparedRounding = aggregationType.getRoundingPreparer();
fastFilterContext.buildFastFilter(
context,
fc -> FastFilterRewriteHelper.getAggregationBounds(context, fc.getFieldType().name()),
x -> dateHistogramSource.getRounding(),
() -> preparedRounding,
fastFilterContext
x -> aggregationType.getRounding(),
() -> preparedRounding
);
}
}
Expand Down Expand Up @@ -513,9 +516,14 @@ private void processLeafFromQuery(LeafReaderContext ctx, Sort indexSortPrefix) t

@Override
protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
boolean optimized = FastFilterRewriteHelper.tryFastFilterAggregation(ctx, fastFilterContext, (key, count) -> {
incrementBucketDocCount(FastFilterRewriteHelper.getBucketOrd(bucketOrds.add(0, preparedRounding.round(key))), count);
});
boolean optimized = FastFilterRewriteHelper.tryFastFilterAggregation(
ctx,
fastFilterContext,
(key, count) -> incrementBucketDocCount(
FastFilterRewriteHelper.getBucketOrd(bucketOrds.add(0, preparedRounding.round(key))),
count
)
);
if (optimized) throw new CollectionTerminatedException();

finishLeaf();
Expand Down
Loading

0 comments on commit c6874c1

Please sign in to comment.