From eafa41651ffc1610a0de267d5915e0bec4db9064 Mon Sep 17 00:00:00 2001
From: "opensearch-trigger-bot[bot]"
<98922864+opensearch-trigger-bot[bot]@users.noreply.github.com>
Date: Sat, 10 Feb 2024 05:30:14 +0000
Subject: [PATCH] Apply the fast filter optimization to composite aggregation
(#11505) (#11914) (#12283)
---
CHANGELOG.md | 1 +
.../bucket/FastFilterRewriteHelper.java | 385 ++++++++++++++++++
.../bucket/composite/CompositeAggregator.java | 98 ++++-
.../bucket/composite/CompositeKey.java | 6 +-
.../CompositeValuesCollectorQueue.java | 30 +-
.../CompositeValuesSourceConfig.java | 2 +-
.../DateHistogramValuesSourceBuilder.java | 2 +-
.../bucket/composite/InternalComposite.java | 6 +-
.../composite/PointsSortedDocsProducer.java | 6 +-
.../composite/RoundingValuesSource.java | 24 +-
.../AutoDateHistogramAggregator.java | 67 ++-
.../histogram/DateHistogramAggregator.java | 60 ++-
.../bucket/histogram/FilterRewriteHelper.java | 259 ------------
.../composite/CompositeAggregatorTests.java | 17 +-
14 files changed, 585 insertions(+), 378 deletions(-)
create mode 100644 server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java
delete mode 100644 server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/FilterRewriteHelper.java
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 937be982211a5..76010a56fd1a1 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -124,6 +124,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Performance improvement for MultiTerm Queries on Keyword fields ([#7057](https://github.com/opensearch-project/OpenSearch/issues/7057))
- Refactor common parts from the Rounding class into a separate 'round' package ([#11023](https://github.com/opensearch-project/OpenSearch/issues/11023))
- Performance improvement for date histogram aggregations without sub-aggregations ([#11083](https://github.com/opensearch-project/OpenSearch/pull/11083))
+- Apply the fast filter optimization to composite aggregation of date histogram source ([#11505](https://github.com/opensearch-project/OpenSearch/pull/11083))
- Disable concurrent aggs for Diversified Sampler and Sampler aggs ([#11087](https://github.com/opensearch-project/OpenSearch/issues/11087))
- Made leader/follower check timeout setting dynamic ([#10528](https://github.com/opensearch-project/OpenSearch/pull/10528))
- Improved performance of numeric exact-match queries ([#11209](https://github.com/opensearch-project/OpenSearch/pull/11209))
diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java
new file mode 100644
index 0000000000000..f377287d0b3bd
--- /dev/null
+++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java
@@ -0,0 +1,385 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.search.aggregations.bucket;
+
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.index.PointValues;
+import org.apache.lucene.search.ConstantScoreQuery;
+import org.apache.lucene.search.IndexOrDocValuesQuery;
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.search.PointRangeQuery;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.ScoreMode;
+import org.apache.lucene.search.Weight;
+import org.apache.lucene.util.NumericUtils;
+import org.opensearch.common.CheckedFunction;
+import org.opensearch.common.Rounding;
+import org.opensearch.common.lucene.search.function.FunctionScoreQuery;
+import org.opensearch.index.mapper.DateFieldMapper;
+import org.opensearch.index.mapper.MappedFieldType;
+import org.opensearch.index.query.DateRangeIncludingNowQuery;
+import org.opensearch.search.DocValueFormat;
+import org.opensearch.search.aggregations.bucket.composite.CompositeKey;
+import org.opensearch.search.aggregations.bucket.composite.CompositeValuesSourceConfig;
+import org.opensearch.search.aggregations.bucket.composite.RoundingValuesSource;
+import org.opensearch.search.internal.SearchContext;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalLong;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/**
+ * Utility class to help rewrite aggregations into filters.
+ * Instead of aggregation collects documents one by one, filter may count all documents that match in one pass.
+ *
+ * Currently supported rewrite:
+ *
+ *
date histogram : date range filter.
+ * Applied: DateHistogramAggregator, AutoDateHistogramAggregator, CompositeAggregator
+ *
+ *
+ * @opensearch.internal
+ */
+public final class FastFilterRewriteHelper {
+
+ private FastFilterRewriteHelper() {}
+
+ private static final int MAX_NUM_FILTER_BUCKETS = 1024;
+ private static final Map, Function> queryWrappers;
+
+ // Initialize the wrapper map for unwrapping the query
+ static {
+ queryWrappers = new HashMap<>();
+ queryWrappers.put(ConstantScoreQuery.class, q -> ((ConstantScoreQuery) q).getQuery());
+ queryWrappers.put(FunctionScoreQuery.class, q -> ((FunctionScoreQuery) q).getSubQuery());
+ queryWrappers.put(DateRangeIncludingNowQuery.class, q -> ((DateRangeIncludingNowQuery) q).getQuery());
+ queryWrappers.put(IndexOrDocValuesQuery.class, q -> ((IndexOrDocValuesQuery) q).getIndexQuery());
+ }
+
+ /**
+ * Recursively unwraps query into the concrete form
+ * for applying the optimization
+ */
+ private static Query unwrapIntoConcreteQuery(Query query) {
+ while (queryWrappers.containsKey(query.getClass())) {
+ query = queryWrappers.get(query.getClass()).apply(query);
+ }
+
+ return query;
+ }
+
+ /**
+ * Finds the min and max bounds of field values for the shard
+ */
+ private static long[] getIndexBounds(final SearchContext context, final String fieldName) throws IOException {
+ final List leaves = context.searcher().getIndexReader().leaves();
+ long min = Long.MAX_VALUE, max = Long.MIN_VALUE;
+ // Since the query does not specify bounds for aggregation, we can
+ // build the global min/max from local min/max within each segment
+ for (LeafReaderContext leaf : leaves) {
+ final PointValues values = leaf.reader().getPointValues(fieldName);
+ if (values != null) {
+ min = Math.min(min, NumericUtils.sortableBytesToLong(values.getMinPackedValue(), 0));
+ max = Math.max(max, NumericUtils.sortableBytesToLong(values.getMaxPackedValue(), 0));
+ }
+ }
+
+ if (min == Long.MAX_VALUE || max == Long.MIN_VALUE) return null;
+
+ return new long[] { min, max };
+ }
+
+ /**
+ * This method also acts as a pre-condition check for the optimization,
+ * returns null if the optimization cannot be applied
+ */
+ public static long[] getAggregationBounds(final SearchContext context, final String fieldName) throws IOException {
+ final Query cq = unwrapIntoConcreteQuery(context.query());
+ final long[] indexBounds = getIndexBounds(context, fieldName);
+ if (cq instanceof PointRangeQuery) {
+ final PointRangeQuery prq = (PointRangeQuery) cq;
+ // Ensure that the query and aggregation are on the same field
+ if (prq.getField().equals(fieldName)) {
+ return new long[] {
+ // Minimum bound for aggregation is the max between query and global
+ Math.max(NumericUtils.sortableBytesToLong(prq.getLowerPoint(), 0), indexBounds[0]),
+ // Maximum bound for aggregation is the min between query and global
+ Math.min(NumericUtils.sortableBytesToLong(prq.getUpperPoint(), 0), indexBounds[1]) };
+ }
+ } else if (cq instanceof MatchAllDocsQuery) {
+ return indexBounds;
+ }
+ // Check if the top-level query (which may be a PRQ on another field) is functionally match-all
+ Weight weight = context.searcher().createWeight(context.query(), ScoreMode.COMPLETE_NO_SCORES, 1f);
+ for (LeafReaderContext ctx : context.searcher().getIndexReader().leaves()) {
+ if (weight.count(ctx) != ctx.reader().numDocs()) {
+ return null;
+ }
+ }
+ return indexBounds;
+ }
+
+ /**
+ * Creates the date range filters for aggregations using the interval, min/max
+ * bounds and the rounding values
+ */
+ private static Weight[] createFilterForAggregations(
+ final SearchContext context,
+ final long interval,
+ final Rounding.Prepared preparedRounding,
+ final String field,
+ final DateFieldMapper.DateFieldType fieldType,
+ long low,
+ final long high
+ ) throws IOException {
+ // Calculate the number of buckets using range and interval
+ long roundedLow = preparedRounding.round(fieldType.convertNanosToMillis(low));
+ long prevRounded = roundedLow;
+ int bucketCount = 0;
+ while (roundedLow <= fieldType.convertNanosToMillis(high)) {
+ bucketCount++;
+ if (bucketCount > MAX_NUM_FILTER_BUCKETS) return null;
+ // Below rounding is needed as the interval could return in
+ // non-rounded values for something like calendar month
+ roundedLow = preparedRounding.round(roundedLow + interval);
+ if (prevRounded == roundedLow) break; // prevents getting into an infinite loop
+ prevRounded = roundedLow;
+ }
+
+ Weight[] filters = null;
+ if (bucketCount > 0) {
+ filters = new Weight[bucketCount];
+ roundedLow = preparedRounding.round(fieldType.convertNanosToMillis(low));
+
+ int i = 0;
+ while (i < bucketCount) {
+ // Calculate the lower bucket bound
+ final byte[] lower = new byte[8];
+ NumericUtils.longToSortableBytes(i == 0 ? low : fieldType.convertRoundedMillisToNanos(roundedLow), lower, 0);
+
+ // Calculate the upper bucket bound
+ roundedLow = preparedRounding.round(roundedLow + interval);
+ final byte[] upper = new byte[8];
+ NumericUtils.longToSortableBytes(i + 1 == bucketCount ? high :
+ // Subtract -1 if the minimum is roundedLow as roundedLow itself
+ // is included in the next bucket
+ fieldType.convertRoundedMillisToNanos(roundedLow) - 1, upper, 0);
+
+ filters[i++] = context.searcher().createWeight(new PointRangeQuery(field, lower, upper, 1) {
+ @Override
+ protected String toString(int dimension, byte[] value) {
+ return null;
+ }
+ }, ScoreMode.COMPLETE_NO_SCORES, 1);
+ }
+ }
+
+ return filters;
+ }
+
+ /**
+ * Context object to do fast filter optimization
+ */
+ public static class FastFilterContext {
+ private Weight[] filters = null;
+ public AggregationType aggregationType;
+
+ public FastFilterContext() {}
+
+ private void setFilters(Weight[] filters) {
+ this.filters = filters;
+ }
+
+ public void setAggregationType(AggregationType aggregationType) {
+ this.aggregationType = aggregationType;
+ }
+
+ public boolean isRewriteable(final Object parent, final int subAggLength) {
+ return aggregationType.isRewriteable(parent, subAggLength);
+ }
+
+ /**
+ * This filter build method is for date histogram aggregation type
+ *
+ * @param computeBounds get the lower and upper bound of the field in a shard search
+ * @param roundingFunction produce Rounding that contains interval of date range.
+ * Rounding is computed dynamically using the bounds in AutoDateHistogram
+ * @param preparedRoundingSupplier produce PreparedRounding to round values at call-time
+ */
+ public void buildFastFilter(
+ SearchContext context,
+ CheckedFunction computeBounds,
+ Function roundingFunction,
+ Supplier preparedRoundingSupplier
+ ) throws IOException {
+ assert this.aggregationType instanceof DateHistogramAggregationType;
+ DateHistogramAggregationType aggregationType = (DateHistogramAggregationType) this.aggregationType;
+ DateFieldMapper.DateFieldType fieldType = aggregationType.getFieldType();
+ final long[] bounds = computeBounds.apply(aggregationType);
+ if (bounds == null) return;
+
+ final Rounding rounding = roundingFunction.apply(bounds);
+ final OptionalLong intervalOpt = Rounding.getInterval(rounding);
+ if (intervalOpt.isEmpty()) return;
+ final long interval = intervalOpt.getAsLong();
+
+ // afterKey is the last bucket key in previous response, while the bucket key
+ // is the start of the bucket values, so add the interval
+ if (aggregationType instanceof CompositeAggregationType && ((CompositeAggregationType) aggregationType).afterKey != -1) {
+ bounds[0] = ((CompositeAggregationType) aggregationType).afterKey + interval;
+ }
+
+ final Weight[] filters = FastFilterRewriteHelper.createFilterForAggregations(
+ context,
+ interval,
+ preparedRoundingSupplier.get(),
+ fieldType.name(),
+ fieldType,
+ bounds[0],
+ bounds[1]
+ );
+ this.setFilters(filters);
+ }
+ }
+
+ /**
+ * Different types have different pre-conditions, filter building logic, etc.
+ */
+ public interface AggregationType {
+ boolean isRewriteable(Object parent, int subAggLength);
+ }
+
+ /**
+ * For date histogram aggregation
+ */
+ public static class DateHistogramAggregationType implements AggregationType {
+ private final MappedFieldType fieldType;
+ private final boolean missing;
+ private final boolean hasScript;
+
+ public DateHistogramAggregationType(MappedFieldType fieldType, boolean missing, boolean hasScript) {
+ this.fieldType = fieldType;
+ this.missing = missing;
+ this.hasScript = hasScript;
+ }
+
+ @Override
+ public boolean isRewriteable(Object parent, int subAggLength) {
+ if (parent == null && subAggLength == 0 && !missing && !hasScript) {
+ return fieldType != null && fieldType instanceof DateFieldMapper.DateFieldType;
+ }
+ return false;
+ }
+
+ public DateFieldMapper.DateFieldType getFieldType() {
+ assert fieldType instanceof DateFieldMapper.DateFieldType;
+ return (DateFieldMapper.DateFieldType) fieldType;
+ }
+ }
+
+ /**
+ * For composite aggregation with date histogram as a source
+ */
+ public static class CompositeAggregationType extends DateHistogramAggregationType {
+ private final RoundingValuesSource valuesSource;
+ private long afterKey = -1L;
+ private final int size;
+
+ public CompositeAggregationType(
+ CompositeValuesSourceConfig[] sourceConfigs,
+ CompositeKey rawAfterKey,
+ List formats,
+ int size
+ ) {
+ super(sourceConfigs[0].fieldType(), sourceConfigs[0].missingBucket(), sourceConfigs[0].hasScript());
+ this.valuesSource = (RoundingValuesSource) sourceConfigs[0].valuesSource();
+ this.size = size;
+ if (rawAfterKey != null) {
+ assert rawAfterKey.size() == 1 && formats.size() == 1;
+ this.afterKey = formats.get(0).parseLong(rawAfterKey.get(0).toString(), false, () -> {
+ throw new IllegalArgumentException("now() is not supported in [after] key");
+ });
+ }
+ }
+
+ public Rounding getRounding() {
+ return valuesSource.getRounding();
+ }
+
+ public Rounding.Prepared getRoundingPreparer() {
+ return valuesSource.getPreparedRounding();
+ }
+ }
+
+ public static boolean isCompositeAggRewriteable(CompositeValuesSourceConfig[] sourceConfigs) {
+ return sourceConfigs.length == 1 && sourceConfigs[0].valuesSource() instanceof RoundingValuesSource;
+ }
+
+ public static long getBucketOrd(long bucketOrd) {
+ if (bucketOrd < 0) { // already seen
+ bucketOrd = -1 - bucketOrd;
+ }
+
+ return bucketOrd;
+ }
+
+ /**
+ * This is executed for each segment by passing the leaf reader context
+ *
+ * @param incrementDocCount takes in the bucket key value and the bucket count
+ */
+ public static boolean tryFastFilterAggregation(
+ final LeafReaderContext ctx,
+ FastFilterContext fastFilterContext,
+ final BiConsumer incrementDocCount
+ ) throws IOException {
+ if (fastFilterContext == null) return false;
+ if (fastFilterContext.filters == null) return false;
+
+ final Weight[] filters = fastFilterContext.filters;
+ final int[] counts = new int[filters.length];
+ int i;
+ for (i = 0; i < filters.length; i++) {
+ counts[i] = filters[i].count(ctx);
+ if (counts[i] == -1) {
+ // Cannot use the optimization if any of the counts
+ // is -1 indicating the segment might have deleted documents
+ return false;
+ }
+ }
+
+ int s = 0;
+ int size = Integer.MAX_VALUE;
+ for (i = 0; i < filters.length; i++) {
+ if (counts[i] > 0) {
+ long bucketKey = i; // the index of filters is the key for filters aggregation
+ if (fastFilterContext.aggregationType instanceof DateHistogramAggregationType) {
+ final DateFieldMapper.DateFieldType fieldType = ((DateHistogramAggregationType) fastFilterContext.aggregationType)
+ .getFieldType();
+ bucketKey = fieldType.convertNanosToMillis(
+ NumericUtils.sortableBytesToLong(((PointRangeQuery) filters[i].getQuery()).getLowerPoint(), 0)
+ );
+ if (fastFilterContext.aggregationType instanceof CompositeAggregationType) {
+ size = ((CompositeAggregationType) fastFilterContext.aggregationType).size;
+ }
+ }
+ incrementDocCount.accept(bucketKey, counts[i]);
+ s++;
+ if (s > size) return true;
+ }
+ }
+
+ return true;
+ }
+}
diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java
index 317c2a357bac5..822b8a6c4b118 100644
--- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java
+++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java
@@ -56,7 +56,9 @@
import org.apache.lucene.search.Weight;
import org.apache.lucene.search.comparators.LongComparator;
import org.apache.lucene.util.Bits;
+import org.apache.lucene.util.CollectionUtil;
import org.apache.lucene.util.RoaringDocIdSet;
+import org.opensearch.common.Rounding;
import org.opensearch.common.lease.Releasables;
import org.opensearch.index.IndexSortConfig;
import org.opensearch.lucene.queries.SearchAfterSortedDocQuery;
@@ -71,7 +73,9 @@
import org.opensearch.search.aggregations.MultiBucketCollector;
import org.opensearch.search.aggregations.MultiBucketConsumerService;
import org.opensearch.search.aggregations.bucket.BucketsAggregator;
+import org.opensearch.search.aggregations.bucket.FastFilterRewriteHelper;
import org.opensearch.search.aggregations.bucket.missing.MissingOrder;
+import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.searchafter.SearchAfterBuilder;
import org.opensearch.search.sort.SortAndFormats;
@@ -80,6 +84,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.LongUnaryOperator;
@@ -111,6 +116,10 @@ final class CompositeAggregator extends BucketsAggregator {
private boolean earlyTerminated;
+ private final FastFilterRewriteHelper.FastFilterContext fastFilterContext;
+ private LongKeyedBucketOrds bucketOrds = null;
+ private Rounding.Prepared preparedRounding = null;
+
CompositeAggregator(
String name,
AggregatorFactories factories,
@@ -154,12 +163,33 @@ final class CompositeAggregator extends BucketsAggregator {
}
this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size, rawAfterKey);
this.rawAfterKey = rawAfterKey;
+
+ fastFilterContext = new FastFilterRewriteHelper.FastFilterContext();
+ if (!FastFilterRewriteHelper.isCompositeAggRewriteable(sourceConfigs)) return;
+ fastFilterContext.setAggregationType(
+ new FastFilterRewriteHelper.CompositeAggregationType(sourceConfigs, rawAfterKey, formats, size)
+ );
+ if (fastFilterContext.isRewriteable(parent, subAggregators.length)) {
+ // bucketOrds is the data structure for saving date histogram results
+ bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), CardinalityUpperBound.ONE);
+ // Currently the filter rewrite is only supported for date histograms
+ FastFilterRewriteHelper.CompositeAggregationType aggregationType =
+ (FastFilterRewriteHelper.CompositeAggregationType) fastFilterContext.aggregationType;
+ preparedRounding = aggregationType.getRoundingPreparer();
+ fastFilterContext.buildFastFilter(
+ context,
+ fc -> FastFilterRewriteHelper.getAggregationBounds(context, fc.getFieldType().name()),
+ x -> aggregationType.getRounding(),
+ () -> preparedRounding
+ );
+ }
}
@Override
protected void doClose() {
try {
Releasables.close(queue);
+ Releasables.close(bucketOrds);
} finally {
Releasables.close(sources);
}
@@ -187,12 +217,14 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
}
int num = Math.min(size, queue.size());
- final InternalComposite.InternalBucket[] buckets = new InternalComposite.InternalBucket[num];
+ InternalComposite.InternalBucket[] buckets = new InternalComposite.InternalBucket[num];
+
long[] bucketOrdsToCollect = new long[queue.size()];
for (int i = 0; i < queue.size(); i++) {
bucketOrdsToCollect[i] = i;
}
InternalAggregations[] subAggsForBuckets = buildSubAggsForBuckets(bucketOrdsToCollect);
+
while (queue.size() > 0) {
int slot = queue.pop();
CompositeKey key = queue.toCompositeKey(slot);
@@ -208,6 +240,43 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
aggs
);
}
+
+ // Build results from fast filters optimization
+ if (bucketOrds != null) {
+ // CompositeKey is the value of bucket key
+ final Map bucketMap = new HashMap<>();
+ // Some segments may not be optimized, so buckets may contain results from the queue.
+ for (InternalComposite.InternalBucket internalBucket : buckets) {
+ bucketMap.put(internalBucket.getRawKey(), internalBucket);
+ }
+ // Loop over the buckets in the bucketOrds, and populate the map accordingly
+ LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(0);
+ while (ordsEnum.next()) {
+ Long bucketKeyValue = ordsEnum.value();
+ CompositeKey key = new CompositeKey(bucketKeyValue);
+ if (bucketMap.containsKey(key)) {
+ long docCount = bucketDocCount(ordsEnum.ord()) + bucketMap.get(key).getDocCount();
+ bucketMap.get(key).setDocCount(docCount);
+ } else {
+ InternalComposite.InternalBucket bucket = new InternalComposite.InternalBucket(
+ sourceNames,
+ formats,
+ key,
+ reverseMuls,
+ missingOrders,
+ bucketDocCount(ordsEnum.ord()),
+ buildEmptySubAggregations()
+ );
+ bucketMap.put(key, bucket);
+ }
+ }
+ // since a map is not sorted structure, sort it before transform back to buckets
+ List bucketList = new ArrayList<>(bucketMap.values());
+ CollectionUtil.introSort(bucketList, InternalComposite.InternalBucket::compareKey);
+ buckets = bucketList.subList(0, Math.min(size, bucketList.size())).toArray(InternalComposite.InternalBucket[]::new);
+ num = buckets.length;
+ }
+
CompositeKey lastBucket = num > 0 ? buckets[num - 1].getRawKey() : null;
return new InternalAggregation[] {
new InternalComposite(
@@ -296,7 +365,7 @@ private Sort buildIndexSortPrefix(LeafReaderContext context) throws IOException
if (indexSortField.getReverse() != (source.reverseMul == -1)) {
if (i == 0) {
- // the leading index sort matches the leading source field but the order is reversed
+ // the leading index sort matches the leading source field, but the order is reversed,
// so we don't check the other sources.
return new Sort(indexSortField);
}
@@ -304,8 +373,8 @@ private Sort buildIndexSortPrefix(LeafReaderContext context) throws IOException
}
sortFields.add(indexSortField);
if (sourceConfig.valuesSource() instanceof RoundingValuesSource) {
- // the rounding "squashes" many values together, that breaks the ordering of sub-values
- // so we ignore subsequent source even if they match the index sort.
+ // the rounding "squashes" many values together, that breaks the ordering of sub-values,
+ // so we ignore the subsequent sources even if they match the index sort.
break;
}
}
@@ -448,6 +517,16 @@ private void processLeafFromQuery(LeafReaderContext ctx, Sort indexSortPrefix) t
@Override
protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
+ boolean optimized = FastFilterRewriteHelper.tryFastFilterAggregation(
+ ctx,
+ fastFilterContext,
+ (key, count) -> incrementBucketDocCount(
+ FastFilterRewriteHelper.getBucketOrd(bucketOrds.add(0, preparedRounding.round(key))),
+ count
+ )
+ );
+ if (optimized) throw new CollectionTerminatedException();
+
finishLeaf();
boolean fillDocIdSet = deferredCollectors != NO_OP_COLLECTOR;
@@ -477,9 +556,10 @@ protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucket
docIdSetBuilder = new RoaringDocIdSet.Builder(ctx.reader().maxDoc());
}
if (rawAfterKey != null && sortPrefixLen > 0) {
- // We have an after key and index sort is applicable so we jump directly to the doc
- // that is after the index sort prefix using the rawAfterKey and we start collecting
- // document from there.
+ // We have an after key and index sort is applicable, so we jump directly to the doc
+ // after the index sort prefix using the rawAfterKey and we start collecting
+ // documents from there.
+ assert indexSortPrefix != null;
processLeafFromQuery(ctx, indexSortPrefix);
throw new CollectionTerminatedException();
} else {
@@ -507,6 +587,8 @@ public void collect(int doc, long bucket) throws IOException {
try {
long docCount = docCountProvider.getDocCount(doc);
if (queue.addIfCompetitive(indexSortPrefix, docCount)) {
+ // one doc may contain multiple values, we iterate over and collect one by one
+ // so the same doc can appear multiple times here
if (builder != null && lastDoc != doc) {
builder.add(doc);
lastDoc = doc;
@@ -569,7 +651,7 @@ private LeafBucketCollector getSecondPassCollector(LeafBucketCollector subCollec
@Override
public void collect(int doc, long zeroBucket) throws IOException {
assert zeroBucket == 0;
- Integer slot = queue.compareCurrent();
+ Integer slot = queue.getCurrentSlot();
if (slot != null) {
// The candidate key is a top bucket.
// We can defer the collection of this document/bucket to the sub collector
diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeKey.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeKey.java
index 5ddeb22d33a6f..338ebdc66eef7 100644
--- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeKey.java
+++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeKey.java
@@ -44,7 +44,7 @@
*
* @opensearch.internal
*/
-class CompositeKey implements Writeable {
+public class CompositeKey implements Writeable {
private final Comparable[] values;
CompositeKey(Comparable... values) {
@@ -64,11 +64,11 @@ Comparable[] values() {
return values;
}
- int size() {
+ public int size() {
return values.length;
}
- Comparable get(int pos) {
+ public Comparable get(int pos) {
assert pos < values.length;
return values[pos];
}
diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java
index 6ee1682a7b196..2c4d451322bca 100644
--- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java
+++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java
@@ -47,6 +47,8 @@
/**
* A specialized {@link PriorityQueue} implementation for composite buckets.
+ * Can think of this as a max heap that holds the top small buckets slots in order.
+ * Each slot holds the values of the composite bucket key it represents.
*
* @opensearch.internal
*/
@@ -77,7 +79,7 @@ public int hashCode() {
private final BigArrays bigArrays;
private final int maxSize;
- private final Map map;
+ private final Map map; // to quickly find the slot for a value
private final SingleDimensionValuesSource>[] arrays;
private LongArray docCounts;
@@ -108,7 +110,7 @@ public int hashCode() {
@Override
protected boolean lessThan(Integer a, Integer b) {
- return compare(a, b) > 0;
+ return compare(a, b) > 0; // max heap
}
/**
@@ -119,10 +121,10 @@ boolean isFull() {
}
/**
- * Compares the current candidate with the values in the queue and returns
+ * Try to get the slot of the current/candidate values in the queue and returns
* the slot if the candidate is already in the queue or null if the candidate is not present.
*/
- Integer compareCurrent() {
+ Integer getCurrentSlot() {
return map.get(new Slot(CANDIDATE_SLOT));
}
@@ -281,32 +283,34 @@ boolean addIfCompetitive(long inc) {
*/
boolean addIfCompetitive(int indexSortSourcePrefix, long inc) {
// checks if the candidate key is competitive
- Integer topSlot = compareCurrent();
- if (topSlot != null) {
+ Integer curSlot = getCurrentSlot();
+ if (curSlot != null) {
// this key is already in the top N, skip it
- docCounts.increment(topSlot, inc);
+ docCounts.increment(curSlot, inc);
return true;
}
+
if (afterKeyIsSet) {
int cmp = compareCurrentWithAfter();
if (cmp <= 0) {
if (indexSortSourcePrefix < 0 && cmp == indexSortSourcePrefix) {
- // the leading index sort is in the reverse order of the leading source
+ // the leading index sort is and the leading source order are both reversed,
// so we can early terminate when we reach a document that is smaller
// than the after key (collected on a previous page).
throw new CollectionTerminatedException();
}
- // key was collected on a previous page, skip it (>= afterKey).
+ // the key was collected on a previous page, skip it.
return false;
}
}
+
+ // the heap is full, check if the candidate key larger than max heap top
if (size() >= maxSize) {
- // the tree map is full, check if the candidate key should be kept
int cmp = compare(CANDIDATE_SLOT, top());
if (cmp > 0) {
if (cmp <= indexSortSourcePrefix) {
- // index sort guarantees that there is no key greater or equal than the
- // current one in the subsequent documents so we can early terminate.
+ // index sort guarantees the following documents will have a key larger than the current candidate,
+ // so we can early terminate.
throw new CollectionTerminatedException();
}
// the candidate key is not competitive, skip it.
@@ -324,7 +328,7 @@ boolean addIfCompetitive(int indexSortSourcePrefix, long inc) {
} else {
newSlot = size();
}
- // move the candidate key to its new slot
+ // move the candidate key to its new slot by copy its values to the new slot
copyCurrent(newSlot, inc);
map.put(new Slot(newSlot), newSlot);
add(newSlot);
diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesSourceConfig.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesSourceConfig.java
index 788a4ddc15374..5289b3a34ab34 100644
--- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesSourceConfig.java
+++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesSourceConfig.java
@@ -156,7 +156,7 @@ public MissingOrder missingOrder() {
/**
* Returns true if the source contains a script that can change the value.
*/
- protected boolean hasScript() {
+ public boolean hasScript() {
return hasScript;
}
diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java
index 4ec309b819183..cf2f70a548913 100644
--- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java
+++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java
@@ -303,7 +303,7 @@ public static void register(ValuesSourceRegistry.Builder builder) {
// TODO once composite is plugged in to the values source registry or at least understands Date values source types use it
// here
Rounding.Prepared preparedRounding = rounding.prepareForUnknown();
- RoundingValuesSource vs = new RoundingValuesSource(numeric, preparedRounding);
+ RoundingValuesSource vs = new RoundingValuesSource(numeric, preparedRounding, rounding);
// is specified in the builder.
final DocValueFormat docValueFormat = format == null ? DocValueFormat.RAW : valuesSourceConfig.format();
final MappedFieldType fieldType = valuesSourceConfig.fieldType();
diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/InternalComposite.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/InternalComposite.java
index fab9d11dd33e2..e4d9052f5c1a4 100644
--- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/InternalComposite.java
+++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/InternalComposite.java
@@ -351,7 +351,7 @@ public static class InternalBucket extends InternalMultiBucketAggregation.Intern
KeyComparable {
private final CompositeKey key;
- private final long docCount;
+ private long docCount;
private final InternalAggregations aggregations;
private final transient int[] reverseMuls;
private final transient MissingOrder[] missingOrders;
@@ -448,6 +448,10 @@ public long getDocCount() {
return docCount;
}
+ public void setDocCount(long docCount) {
+ this.docCount = docCount;
+ }
+
@Override
public Aggregations getAggregations() {
return aggregations;
diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/PointsSortedDocsProducer.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/PointsSortedDocsProducer.java
index 3d6730203b6ae..dc130eb54c0ea 100644
--- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/PointsSortedDocsProducer.java
+++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/PointsSortedDocsProducer.java
@@ -68,6 +68,7 @@ DocIdSet processLeaf(Query query, CompositeValuesCollectorQueue queue, LeafReade
// no value for the field
return DocIdSet.EMPTY;
}
+
long lowerBucket = Long.MIN_VALUE;
Comparable lowerValue = queue.getLowerValueLeadSource();
if (lowerValue != null) {
@@ -76,7 +77,6 @@ DocIdSet processLeaf(Query query, CompositeValuesCollectorQueue queue, LeafReade
}
lowerBucket = (Long) lowerValue;
}
-
long upperBucket = Long.MAX_VALUE;
Comparable upperValue = queue.getUpperValueLeadSource();
if (upperValue != null) {
@@ -85,6 +85,7 @@ DocIdSet processLeaf(Query query, CompositeValuesCollectorQueue queue, LeafReade
}
upperBucket = (Long) upperValue;
}
+
DocIdSetBuilder builder = fillDocIdSet ? new DocIdSetBuilder(context.reader().maxDoc(), values, field) : null;
Visitor visitor = new Visitor(context, queue, builder, values.getBytesPerDimension(), lowerBucket, upperBucket);
try {
@@ -146,6 +147,7 @@ public void visit(int docID, byte[] packedValue) throws IOException {
}
long bucket = bucketFunction.applyAsLong(packedValue);
+ // process previous bucket when new bucket appears
if (first == false && bucket != lastBucket) {
final DocIdSet docIdSet = bucketDocsBuilder.build();
if (processBucket(queue, context, docIdSet.iterator(), lastBucket, builder) &&
@@ -182,13 +184,13 @@ public PointValues.Relation compare(byte[] minPackedValue, byte[] maxPackedValue
return PointValues.Relation.CELL_OUTSIDE_QUERY;
}
}
-
if (upperBucket != Long.MAX_VALUE) {
long minBucket = bucketFunction.applyAsLong(minPackedValue);
if (minBucket > upperBucket) {
return PointValues.Relation.CELL_OUTSIDE_QUERY;
}
}
+
return PointValues.Relation.CELL_CROSSES_QUERY;
}
diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/RoundingValuesSource.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/RoundingValuesSource.java
index 89315724ff9ed..3f5cf919f1755 100644
--- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/RoundingValuesSource.java
+++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/RoundingValuesSource.java
@@ -47,17 +47,19 @@
*
* @opensearch.internal
*/
-class RoundingValuesSource extends ValuesSource.Numeric {
+public class RoundingValuesSource extends ValuesSource.Numeric {
private final ValuesSource.Numeric vs;
- private final Rounding.Prepared rounding;
+ private final Rounding.Prepared preparedRounding;
+ private final Rounding rounding;
/**
- *
- * @param vs The original values source
- * @param rounding How to round the values
+ * @param vs The original values source
+ * @param preparedRounding How to round the values
+ * @param rounding The rounding strategy
*/
- RoundingValuesSource(Numeric vs, Rounding.Prepared rounding) {
+ RoundingValuesSource(Numeric vs, Rounding.Prepared preparedRounding, Rounding rounding) {
this.vs = vs;
+ this.preparedRounding = preparedRounding;
this.rounding = rounding;
}
@@ -71,8 +73,16 @@ public boolean isBigInteger() {
return false;
}
+ public Rounding.Prepared getPreparedRounding() {
+ return preparedRounding;
+ }
+
+ public Rounding getRounding() {
+ return rounding;
+ }
+
public long round(long value) {
- return rounding.round(value);
+ return preparedRounding.round(value);
}
@Override
diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java
index a71c15d551927..0ea820abbedf4 100644
--- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java
+++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java
@@ -33,8 +33,8 @@
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
+import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.ScoreMode;
-import org.apache.lucene.search.Weight;
import org.apache.lucene.util.CollectionUtil;
import org.opensearch.common.Rounding;
import org.opensearch.common.Rounding.Prepared;
@@ -42,7 +42,6 @@
import org.opensearch.common.util.IntArray;
import org.opensearch.common.util.LongArray;
import org.opensearch.core.common.util.ByteArray;
-import org.opensearch.index.mapper.DateFieldMapper;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.aggregations.Aggregator;
import org.opensearch.search.aggregations.AggregatorFactories;
@@ -53,6 +52,7 @@
import org.opensearch.search.aggregations.LeafBucketCollectorBase;
import org.opensearch.search.aggregations.bucket.DeferableBucketAggregator;
import org.opensearch.search.aggregations.bucket.DeferringBucketCollector;
+import org.opensearch.search.aggregations.bucket.FastFilterRewriteHelper;
import org.opensearch.search.aggregations.bucket.MergingBucketsDeferringCollector;
import org.opensearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder.RoundingInfo;
import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
@@ -127,14 +127,14 @@ static AutoDateHistogramAggregator build(
* {@link MergingBucketsDeferringCollector#mergeBuckets(long[])}.
*/
private MergingBucketsDeferringCollector deferringCollector;
- private final Weight[] filters;
- private final DateFieldMapper.DateFieldType fieldType;
protected final RoundingInfo[] roundingInfos;
protected final int targetBuckets;
protected int roundingIdx;
protected Rounding.Prepared preparedRounding;
+ private final FastFilterRewriteHelper.FastFilterContext fastFilterContext;
+
private AutoDateHistogramAggregator(
String name,
AggregatorFactories factories,
@@ -156,23 +156,23 @@ private AutoDateHistogramAggregator(
this.roundingPreparer = roundingPreparer;
this.preparedRounding = prepareRounding(0);
- FilterRewriteHelper.FilterContext filterContext = FilterRewriteHelper.buildFastFilterContext(
- parent(),
- subAggregators.length,
- context,
- b -> getMinimumRounding(b[0], b[1]),
- // Passing prepared rounding as supplier to ensure the correct prepared
- // rounding is set as it is done during getMinimumRounding
- () -> preparedRounding,
- valuesSourceConfig,
- fc -> FilterRewriteHelper.getAggregationBounds(context, fc.field())
+ fastFilterContext = new FastFilterRewriteHelper.FastFilterContext();
+ fastFilterContext.setAggregationType(
+ new FastFilterRewriteHelper.DateHistogramAggregationType(
+ valuesSourceConfig.fieldType(),
+ valuesSourceConfig.missing() != null,
+ valuesSourceConfig.script() != null
+ )
);
- if (filterContext != null) {
- fieldType = filterContext.fieldType;
- filters = filterContext.filters;
- } else {
- fieldType = null;
- filters = null;
+ if (fastFilterContext.isRewriteable(parent, subAggregators.length)) {
+ fastFilterContext.buildFastFilter(
+ context,
+ fc -> FastFilterRewriteHelper.getAggregationBounds(context, fc.getFieldType().name()),
+ b -> getMinimumRounding(b[0], b[1]),
+ // Passing prepared rounding as supplier to ensure the correct prepared
+ // rounding is set as it is done during getMinimumRounding
+ () -> preparedRounding
+ );
}
}
@@ -226,28 +226,21 @@ public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBuc
return LeafBucketCollector.NO_OP_COLLECTOR;
}
+ boolean optimized = FastFilterRewriteHelper.tryFastFilterAggregation(
+ ctx,
+ fastFilterContext,
+ (key, count) -> incrementBucketDocCount(
+ FastFilterRewriteHelper.getBucketOrd(getBucketOrds().add(0, preparedRounding.round(key))),
+ count
+ )
+ );
+ if (optimized) throw new CollectionTerminatedException();
+
final SortedNumericDocValues values = valuesSource.longValues(ctx);
final LeafBucketCollector iteratingCollector = getLeafCollector(values, sub);
-
- // Need to be declared as final and array for usage within the
- // LeafBucketCollectorBase subclass below
- final boolean[] useOpt = new boolean[1];
- useOpt[0] = filters != null;
-
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
- // Try fast filter aggregation if the filters have been created
- // Skip if tried before and gave incorrect/incomplete results
- if (useOpt[0]) {
- useOpt[0] = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType, (key, count) -> {
- incrementBucketDocCount(
- FilterRewriteHelper.getBucketOrd(getBucketOrds().add(owningBucketOrd, preparedRounding.round(key))),
- count
- );
- });
- }
-
iteratingCollector.collect(doc, owningBucketOrd);
}
};
diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java
index 8437e1dce9fe0..b95bd093b82a6 100644
--- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java
+++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java
@@ -33,13 +33,12 @@
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
+import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.ScoreMode;
-import org.apache.lucene.search.Weight;
import org.apache.lucene.util.CollectionUtil;
import org.opensearch.common.Nullable;
import org.opensearch.common.Rounding;
import org.opensearch.common.lease.Releasables;
-import org.opensearch.index.mapper.DateFieldMapper;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.aggregations.Aggregator;
import org.opensearch.search.aggregations.AggregatorFactories;
@@ -49,8 +48,8 @@
import org.opensearch.search.aggregations.LeafBucketCollector;
import org.opensearch.search.aggregations.LeafBucketCollectorBase;
import org.opensearch.search.aggregations.bucket.BucketsAggregator;
+import org.opensearch.search.aggregations.bucket.FastFilterRewriteHelper;
import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
-import org.opensearch.search.aggregations.support.FieldContext;
import org.opensearch.search.aggregations.support.ValuesSource;
import org.opensearch.search.aggregations.support.ValuesSourceConfig;
import org.opensearch.search.internal.SearchContext;
@@ -81,9 +80,9 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg
private final long minDocCount;
private final LongBounds extendedBounds;
private final LongBounds hardBounds;
- private final Weight[] filters;
private final LongKeyedBucketOrds bucketOrds;
- private final DateFieldMapper.DateFieldType fieldType;
+
+ private final FastFilterRewriteHelper.FastFilterContext fastFilterContext;
DateHistogramAggregator(
String name,
@@ -116,26 +115,21 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg
bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), cardinality);
- FilterRewriteHelper.FilterContext filterContext = FilterRewriteHelper.buildFastFilterContext(
- parent,
- subAggregators.length,
- context,
- x -> rounding,
- () -> preparedRounding,
- valuesSourceConfig,
- this::computeBounds
+ fastFilterContext = new FastFilterRewriteHelper.FastFilterContext();
+ fastFilterContext.setAggregationType(
+ new FastFilterRewriteHelper.DateHistogramAggregationType(
+ valuesSourceConfig.fieldType(),
+ valuesSourceConfig.missing() != null,
+ valuesSourceConfig.script() != null
+ )
);
- if (filterContext != null) {
- fieldType = filterContext.fieldType;
- filters = filterContext.filters;
- } else {
- filters = null;
- fieldType = null;
+ if (fastFilterContext.isRewriteable(parent, subAggregators.length)) {
+ fastFilterContext.buildFastFilter(context, this::computeBounds, x -> rounding, () -> preparedRounding);
}
}
- private long[] computeBounds(final FieldContext fieldContext) throws IOException {
- final long[] bounds = FilterRewriteHelper.getAggregationBounds(context, fieldContext.field());
+ private long[] computeBounds(final FastFilterRewriteHelper.DateHistogramAggregationType fieldContext) throws IOException {
+ final long[] bounds = FastFilterRewriteHelper.getAggregationBounds(context, fieldContext.getFieldType().name());
if (bounds != null) {
// Update min/max limit if user specified any hard bounds
if (hardBounds != null) {
@@ -160,26 +154,20 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol
return LeafBucketCollector.NO_OP_COLLECTOR;
}
- // Need to be declared as final and array for usage within the
- // LeafBucketCollectorBase subclass below
- final boolean[] useOpt = new boolean[1];
- useOpt[0] = filters != null;
+ boolean optimized = FastFilterRewriteHelper.tryFastFilterAggregation(
+ ctx,
+ fastFilterContext,
+ (key, count) -> incrementBucketDocCount(
+ FastFilterRewriteHelper.getBucketOrd(bucketOrds.add(0, preparedRounding.round(key))),
+ count
+ )
+ );
+ if (optimized) throw new CollectionTerminatedException();
SortedNumericDocValues values = valuesSource.longValues(ctx);
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
- // Try fast filter aggregation if the filters have been created
- // Skip if tried before and gave incorrect/incomplete results
- if (useOpt[0]) {
- useOpt[0] = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType, (key, count) -> {
- incrementBucketDocCount(
- FilterRewriteHelper.getBucketOrd(bucketOrds.add(owningBucketOrd, preparedRounding.round(key))),
- count
- );
- });
- }
-
if (values.advanceExact(doc)) {
int valuesCount = values.docValueCount();
diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/FilterRewriteHelper.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/FilterRewriteHelper.java
deleted file mode 100644
index 29cecd5b382cd..0000000000000
--- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/FilterRewriteHelper.java
+++ /dev/null
@@ -1,259 +0,0 @@
-/*
- * SPDX-License-Identifier: Apache-2.0
- *
- * The OpenSearch Contributors require contributions made to
- * this file be licensed under the Apache-2.0 license or a
- * compatible open source license.
- */
-
-package org.opensearch.search.aggregations.bucket.histogram;
-
-import org.apache.lucene.index.LeafReaderContext;
-import org.apache.lucene.index.PointValues;
-import org.apache.lucene.search.CollectionTerminatedException;
-import org.apache.lucene.search.ConstantScoreQuery;
-import org.apache.lucene.search.IndexOrDocValuesQuery;
-import org.apache.lucene.search.MatchAllDocsQuery;
-import org.apache.lucene.search.PointRangeQuery;
-import org.apache.lucene.search.Query;
-import org.apache.lucene.search.ScoreMode;
-import org.apache.lucene.search.Weight;
-import org.apache.lucene.util.NumericUtils;
-import org.opensearch.common.CheckedFunction;
-import org.opensearch.common.Rounding;
-import org.opensearch.common.lucene.search.function.FunctionScoreQuery;
-import org.opensearch.index.mapper.DateFieldMapper;
-import org.opensearch.index.query.DateRangeIncludingNowQuery;
-import org.opensearch.search.aggregations.support.FieldContext;
-import org.opensearch.search.aggregations.support.ValuesSourceConfig;
-import org.opensearch.search.internal.SearchContext;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.OptionalLong;
-import java.util.function.BiConsumer;
-import java.util.function.Function;
-import java.util.function.Supplier;
-
-/**
- * Helpers functions to rewrite and optimize aggregations using
- * range filter queries
- *
- * @opensearch.internal
- */
-public class FilterRewriteHelper {
-
- static class FilterContext {
- final DateFieldMapper.DateFieldType fieldType;
- final Weight[] filters;
-
- public FilterContext(DateFieldMapper.DateFieldType fieldType, Weight[] filters) {
- this.fieldType = fieldType;
- this.filters = filters;
- }
- }
-
- private static final int MAX_NUM_FILTER_BUCKETS = 1024;
- private static final Map, Function> queryWrappers;
-
- // Initialize the wrappers map for unwrapping the query
- static {
- queryWrappers = new HashMap<>();
- queryWrappers.put(ConstantScoreQuery.class, q -> ((ConstantScoreQuery) q).getQuery());
- queryWrappers.put(FunctionScoreQuery.class, q -> ((FunctionScoreQuery) q).getSubQuery());
- queryWrappers.put(DateRangeIncludingNowQuery.class, q -> ((DateRangeIncludingNowQuery) q).getQuery());
- queryWrappers.put(IndexOrDocValuesQuery.class, q -> ((IndexOrDocValuesQuery) q).getIndexQuery());
- }
-
- /**
- * Recursively unwraps query into the concrete form
- * for applying the optimization
- */
- private static Query unwrapIntoConcreteQuery(Query query) {
- while (queryWrappers.containsKey(query.getClass())) {
- query = queryWrappers.get(query.getClass()).apply(query);
- }
-
- return query;
- }
-
- /**
- * Finds the min and max bounds for segments within the passed search context
- */
- private static long[] getIndexBoundsFromLeaves(final SearchContext context, final String fieldName) throws IOException {
- final List leaves = context.searcher().getIndexReader().leaves();
- long min = Long.MAX_VALUE, max = Long.MIN_VALUE;
- // Since the query does not specify bounds for aggregation, we can
- // build the global min/max from local min/max within each segment
- for (LeafReaderContext leaf : leaves) {
- final PointValues values = leaf.reader().getPointValues(fieldName);
- if (values != null) {
- min = Math.min(min, NumericUtils.sortableBytesToLong(values.getMinPackedValue(), 0));
- max = Math.max(max, NumericUtils.sortableBytesToLong(values.getMaxPackedValue(), 0));
- }
- }
-
- if (min == Long.MAX_VALUE || max == Long.MIN_VALUE) return null;
-
- return new long[] { min, max };
- }
-
- static long[] getAggregationBounds(final SearchContext context, final String fieldName) throws IOException {
- final Query cq = unwrapIntoConcreteQuery(context.query());
- final long[] indexBounds = getIndexBoundsFromLeaves(context, fieldName);
- if (cq instanceof PointRangeQuery) {
- final PointRangeQuery prq = (PointRangeQuery) cq;
- // Ensure that the query and aggregation are on the same field
- if (prq.getField().equals(fieldName)) {
- return new long[] {
- // Minimum bound for aggregation is the max between query and global
- Math.max(NumericUtils.sortableBytesToLong(prq.getLowerPoint(), 0), indexBounds[0]),
- // Maximum bound for aggregation is the min between query and global
- Math.min(NumericUtils.sortableBytesToLong(prq.getUpperPoint(), 0), indexBounds[1]) };
- }
- } else if (cq instanceof MatchAllDocsQuery) {
- return indexBounds;
- }
-
- return null;
- }
-
- /**
- * Creates the range query filters for aggregations using the interval, min/max
- * bounds and the rounding values
- */
- private static Weight[] createFilterForAggregations(
- final SearchContext context,
- final Rounding rounding,
- final Rounding.Prepared preparedRounding,
- final String field,
- final DateFieldMapper.DateFieldType fieldType,
- final long low,
- final long high
- ) throws IOException {
- final OptionalLong intervalOpt = Rounding.getInterval(rounding);
- if (intervalOpt.isEmpty()) {
- return null;
- }
-
- final long interval = intervalOpt.getAsLong();
- // Calculate the number of buckets using range and interval
- long roundedLow = preparedRounding.round(fieldType.convertNanosToMillis(low));
- long prevRounded = roundedLow;
- int bucketCount = 0;
- while (roundedLow <= fieldType.convertNanosToMillis(high)) {
- bucketCount++;
- // Below rounding is needed as the interval could return in
- // non-rounded values for something like calendar month
- roundedLow = preparedRounding.round(roundedLow + interval);
- if (prevRounded == roundedLow) break;
- prevRounded = roundedLow;
- }
-
- Weight[] filters = null;
- if (bucketCount > 0 && bucketCount <= MAX_NUM_FILTER_BUCKETS) {
- int i = 0;
- filters = new Weight[bucketCount];
- roundedLow = preparedRounding.round(fieldType.convertNanosToMillis(low));
- while (i < bucketCount) {
- // Calculate the lower bucket bound
- final byte[] lower = new byte[8];
- NumericUtils.longToSortableBytes(i == 0 ? low : fieldType.convertRoundedMillisToNanos(roundedLow), lower, 0);
- // Calculate the upper bucket bound
- final byte[] upper = new byte[8];
- roundedLow = preparedRounding.round(roundedLow + interval);
- // Subtract -1 if the minimum is roundedLow as roundedLow itself
- // is included in the next bucket
- NumericUtils.longToSortableBytes(
- i + 1 == bucketCount ? high : fieldType.convertRoundedMillisToNanos(roundedLow) - 1,
- upper,
- 0
- );
- filters[i++] = context.searcher().createWeight(new PointRangeQuery(field, lower, upper, 1) {
- @Override
- protected String toString(int dimension, byte[] value) {
- return null;
- }
- }, ScoreMode.COMPLETE_NO_SCORES, 1);
- }
- }
-
- return filters;
- }
-
- static FilterContext buildFastFilterContext(
- final Object parent,
- final int subAggLength,
- SearchContext context,
- Function roundingFunction,
- Supplier preparedRoundingSupplier,
- ValuesSourceConfig valuesSourceConfig,
- CheckedFunction computeBounds
- ) throws IOException {
- // Create the filters for fast aggregation only if the query is instance
- // of point range query and there aren't any parent/sub aggregations
- if (parent == null && subAggLength == 0 && valuesSourceConfig.missing() == null && valuesSourceConfig.script() == null) {
- final FieldContext fieldContext = valuesSourceConfig.fieldContext();
- if (fieldContext != null) {
- final String fieldName = fieldContext.field();
- final long[] bounds = computeBounds.apply(fieldContext);
- if (bounds != null) {
- assert fieldContext.fieldType() instanceof DateFieldMapper.DateFieldType;
- final DateFieldMapper.DateFieldType fieldType = (DateFieldMapper.DateFieldType) fieldContext.fieldType();
- final Rounding rounding = roundingFunction.apply(bounds);
- final Weight[] filters = FilterRewriteHelper.createFilterForAggregations(
- context,
- rounding,
- preparedRoundingSupplier.get(),
- fieldName,
- fieldType,
- bounds[0],
- bounds[1]
- );
- return new FilterContext(fieldType, filters);
- }
- }
- }
- return null;
- }
-
- static long getBucketOrd(long bucketOrd) {
- if (bucketOrd < 0) { // already seen
- bucketOrd = -1 - bucketOrd;
- }
-
- return bucketOrd;
- }
-
- static boolean tryFastFilterAggregation(
- final LeafReaderContext ctx,
- final Weight[] filters,
- final DateFieldMapper.DateFieldType fieldType,
- final BiConsumer incrementDocCount
- ) throws IOException {
- final int[] counts = new int[filters.length];
- int i;
- for (i = 0; i < filters.length; i++) {
- counts[i] = filters[i].count(ctx);
- if (counts[i] == -1) {
- // Cannot use the optimization if any of the counts
- // is -1 indicating the segment might have deleted documents
- return false;
- }
- }
-
- for (i = 0; i < filters.length; i++) {
- if (counts[i] > 0) {
- incrementDocCount.accept(
- fieldType.convertNanosToMillis(
- NumericUtils.sortableBytesToLong(((PointRangeQuery) filters[i].getQuery()).getLowerPoint(), 0)
- ),
- counts[i]
- );
- }
- }
- throw new CollectionTerminatedException();
- }
-}
diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java
index eabc4b7764eed..b581e552fec4f 100644
--- a/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java
+++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java
@@ -1279,7 +1279,7 @@ public void testWithDateHistogram() throws IOException {
},
(result) -> {
assertEquals(3, result.getBuckets().size());
- assertEquals("{date=1508457600000}", result.afterKey().toString());
+ assertEquals("{date=1508457600000}", result.afterKey().toString()); // 2017-10-20T00:00:00
assertEquals("{date=1474329600000}", result.getBuckets().get(0).getKeyAsString());
assertEquals(2L, result.getBuckets().get(0).getDocCount());
assertEquals("{date=1508371200000}", result.getBuckets().get(1).getKeyAsString());
@@ -1300,9 +1300,8 @@ public void testWithDateHistogram() throws IOException {
DateHistogramValuesSourceBuilder histo = new DateHistogramValuesSourceBuilder("date").field("date")
.calendarInterval(DateHistogramInterval.days(1));
return new CompositeAggregationBuilder("name", Collections.singletonList(histo)).aggregateAfter(
- createAfterKey("date", 1474329600000L)
+ createAfterKey("date", 1474329600000L) // 2016-09-20T00:00:00
);
-
},
(result) -> {
assertEquals(2, result.getBuckets().size());
@@ -2242,21 +2241,20 @@ private , V extends Comparable> void testRandomTerms(
Function