From e1898f17cc15dfdacea9bdf124dee2d715d9a305 Mon Sep 17 00:00:00 2001 From: Sandesh Kumar Date: Tue, 3 Dec 2024 13:53:39 -0800 Subject: [PATCH] Avoid extending LeafCollectors Signed-off-by: Sandesh Kumar --- .../aggregations/StarTreeBucketCollector.java | 32 ++--- .../StarTreeLeafBucketCollectorBase.java | 118 ++++++++-------- .../bucket/BucketsAggregator.java | 6 +- .../histogram/DateHistogramAggregator.java | 14 +- .../metrics/StarTreeCollector.java | 21 +++ .../aggregations/metrics/SumAggregator.java | 127 +++++++++++++----- .../search/internal/ContextIndexSearcher.java | 29 ++++ 7 files changed, 235 insertions(+), 112 deletions(-) create mode 100644 server/src/main/java/org/opensearch/search/aggregations/metrics/StarTreeCollector.java diff --git a/server/src/main/java/org/opensearch/search/aggregations/StarTreeBucketCollector.java b/server/src/main/java/org/opensearch/search/aggregations/StarTreeBucketCollector.java index 722bd147ff510..10d645f9ec867 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/StarTreeBucketCollector.java +++ b/server/src/main/java/org/opensearch/search/aggregations/StarTreeBucketCollector.java @@ -1,16 +1,16 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.search.aggregations; - -import java.io.IOException; - -public abstract class StarTreeBucketCollector extends LeafBucketCollector { - - public abstract void collectStarEntry(int starTreeEntry, long bucket) throws IOException; -} +/// * +// * SPDX-License-Identifier: Apache-2.0 +// * +// * The OpenSearch Contributors require contributions made to +// * this file be licensed under the Apache-2.0 license or a +// * compatible open source license. +// */ +// +// package org.opensearch.search.aggregations; +// +// import java.io.IOException; +// +// public abstract class StarTreeBucketCollector extends LeafBucketCollector { +// +// public abstract void collectStarEntry(int starTreeEntry, long bucket) throws IOException; +// } diff --git a/server/src/main/java/org/opensearch/search/aggregations/StarTreeLeafBucketCollectorBase.java b/server/src/main/java/org/opensearch/search/aggregations/StarTreeLeafBucketCollectorBase.java index 458f85bfbf57c..cf50f3c76ae7e 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/StarTreeLeafBucketCollectorBase.java +++ b/server/src/main/java/org/opensearch/search/aggregations/StarTreeLeafBucketCollectorBase.java @@ -1,59 +1,59 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -/* - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ -package org.opensearch.search.aggregations; - -import org.apache.lucene.search.Scorable; -import org.opensearch.common.lucene.ScorerAware; - -import java.io.IOException; - -/** - * A {@link LeafBucketCollector} that delegates all calls to the sub leaf - * aggregator and sets the scorer on its source of values if it implements - * {@link ScorerAware}. - * - * @opensearch.internal - */ -public class StarTreeLeafBucketCollectorBase extends StarTreeBucketCollector { - private final LeafBucketCollector sub; - private final ScorerAware values; - - /** - * @param sub The leaf collector for sub aggregations. - * @param values The values. {@link ScorerAware#setScorer} will be called automatically on them if they implement {@link ScorerAware}. - */ - public StarTreeLeafBucketCollectorBase(LeafBucketCollector sub, Object values) { - this.sub = sub; - if (values instanceof ScorerAware) { - this.values = (ScorerAware) values; - } else { - this.values = null; - } - } - - @Override - public void setScorer(Scorable s) throws IOException { - sub.setScorer(s); - if (values != null) { - values.setScorer(s); - } - } - - @Override - public void collect(int doc, long bucket) throws IOException { - sub.collect(doc, bucket); - } - - @Override - public void collectStarEntry(int starTreeEntry, long bucket) throws IOException {} -} +/// * +// * SPDX-License-Identifier: Apache-2.0 +// * +// * The OpenSearch Contributors require contributions made to +// * this file be licensed under the Apache-2.0 license or a +// * compatible open source license. +// */ +// +/// * +// * Modifications Copyright OpenSearch Contributors. See +// * GitHub history for details. +// */ +// package org.opensearch.search.aggregations; +// +// import org.apache.lucene.search.Scorable; +// import org.opensearch.common.lucene.ScorerAware; +// +// import java.io.IOException; +// +/// ** +// * A {@link LeafBucketCollector} that delegates all calls to the sub leaf +// * aggregator and sets the scorer on its source of values if it implements +// * {@link ScorerAware}. +// * +// * @opensearch.internal +// */ +// public class StarTreeLeafBucketCollectorBase extends StarTreeBucketCollector { +// private final LeafBucketCollector sub; +// private final ScorerAware values; +// +// /** +// * @param sub The leaf collector for sub aggregations. +// * @param values The values. {@link ScorerAware#setScorer} will be called automatically on them if they implement {@link ScorerAware}. +// */ +// public StarTreeLeafBucketCollectorBase(LeafBucketCollector sub, Object values) { +// this.sub = sub; +// if (values instanceof ScorerAware) { +// this.values = (ScorerAware) values; +// } else { +// this.values = null; +// } +// } +// +// @Override +// public void setScorer(Scorable s) throws IOException { +// sub.setScorer(s); +// if (values != null) { +// values.setScorer(s); +// } +// } +// +// @Override +// public void collect(int doc, long bucket) throws IOException { +// sub.collect(doc, bucket); +// } +// +// @Override +// public void collectStarEntry(int starTreeEntry, long bucket) throws IOException {} +// } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/BucketsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/BucketsAggregator.java index 3ed3cc99a60cc..76cdaf5af06ad 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/BucketsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/BucketsAggregator.java @@ -43,7 +43,6 @@ import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.InternalAggregations; import org.opensearch.search.aggregations.LeafBucketCollector; -import org.opensearch.search.aggregations.StarTreeBucketCollector; import org.opensearch.search.aggregations.bucket.global.GlobalAggregator; import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds; import org.opensearch.search.aggregations.support.AggregationPath; @@ -130,12 +129,13 @@ public final void collectExistingBucket(LeafBucketCollector subCollector, int do subCollector.collect(doc, bucketOrd); } - public final void collectStarTreeBucket(StarTreeBucketCollector subCollector, long docCount, long bucketOrd, int entryBit) + public final void collectStarTreeBucket(long docCount, long bucketOrd) throws IOException { if (docCounts.increment(bucketOrd, docCount) == docCount) { multiBucketConsumer.accept(0); } - subCollector.collectStarEntry(entryBit, bucketOrd); + // Only collect own bucket & not sub-aggregator buckets + // subCollector.collectStarEntry(entryBit, bucketOrd); } /** diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java index 6cdeda9f4cd04..d60861ef6a78d 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java @@ -52,11 +52,11 @@ import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.LeafBucketCollector; import org.opensearch.search.aggregations.LeafBucketCollectorBase; -import org.opensearch.search.aggregations.StarTreeBucketCollector; import org.opensearch.search.aggregations.bucket.BucketsAggregator; import org.opensearch.search.aggregations.bucket.filterrewrite.DateHistogramAggregatorBridge; import org.opensearch.search.aggregations.bucket.filterrewrite.FilterRewriteOptimizationContext; import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds; +import org.opensearch.search.aggregations.metrics.StarTreeCollector; import org.opensearch.search.aggregations.support.ValuesSource; import org.opensearch.search.aggregations.support.ValuesSourceConfig; import org.opensearch.search.internal.SearchContext; @@ -180,6 +180,8 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol if (optimized) throw new CollectionTerminatedException(); SortedNumericDocValues values = valuesSource.longValues(ctx); + + // Will migrate this to a separate precompute utility CompositeIndexFieldInfo supportedStarTree = getSupportedStarTree(this.context); if (supportedStarTree != null) { StarTreeValues starTreeValues = getStarTreeValues(ctx, supportedStarTree); @@ -213,15 +215,21 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol long bucketOrd = bucketOrds.add(0, dimensionValue); if (bucketOrd < 0) { bucketOrd = -1 - bucketOrd; - collectStarTreeBucket((StarTreeBucketCollector) sub, metricValue, bucketOrd, bit); + collectStarTreeBucket(metricValue, bucketOrd); } else { grow(bucketOrd + 1); - collectStarTreeBucket((StarTreeBucketCollector) sub, metricValue, bucketOrd, bit); + collectStarTreeBucket(metricValue, bucketOrd); } } } } } + + // Run preCompute for all sub-aggregators + for (Aggregator subAggregator : subAggregators) { + // assuming query-matching was already done and only supported query shapes are executed here + ((StarTreeCollector) subAggregator).preCompute(ctx, supportedStarTree, bucketOrds); + } throw new CollectionTerminatedException(); } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/StarTreeCollector.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/StarTreeCollector.java new file mode 100644 index 0000000000000..48e12be179775 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/StarTreeCollector.java @@ -0,0 +1,21 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.aggregations.metrics; + +import org.apache.lucene.index.LeafReaderContext; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds; + +import java.io.IOException; + +public interface StarTreeCollector { + // public void collectStarEntry(int starTreeEntryBit, long bucket) throws IOException; + + public void preCompute(LeafReaderContext ctx, CompositeIndexFieldInfo starTree, LongKeyedBucketOrds bucketOrds) throws IOException; +} diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java index 3b41d70e62d4c..a453d71b4de9b 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java @@ -32,7 +32,9 @@ package org.opensearch.search.aggregations.metrics; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.util.FixedBitSet; import org.apache.lucene.util.NumericUtils; import org.opensearch.common.lease.Releasables; import org.opensearch.common.util.BigArrays; @@ -48,10 +50,11 @@ import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.LeafBucketCollector; import org.opensearch.search.aggregations.LeafBucketCollectorBase; -import org.opensearch.search.aggregations.StarTreeBucketCollector; +import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds; import org.opensearch.search.aggregations.support.ValuesSource; import org.opensearch.search.aggregations.support.ValuesSourceConfig; import org.opensearch.search.internal.SearchContext; +import org.opensearch.search.startree.StarTreeFilter; import java.io.IOException; import java.util.Map; @@ -64,13 +67,14 @@ * * @opensearch.internal */ -public class SumAggregator extends NumericMetricsAggregator.SingleValue { +public class SumAggregator extends NumericMetricsAggregator.SingleValue implements StarTreeCollector { private final ValuesSource.Numeric valuesSource; private final DocValueFormat format; private DoubleArray sums; private DoubleArray compensations; + SortedNumericStarTreeValuesIterator sumMetricValuesIterator; SumAggregator( String name, @@ -87,6 +91,7 @@ public class SumAggregator extends NumericMetricsAggregator.SingleValue { sums = context.bigArrays().newDoubleArray(1, true); compensations = context.bigArrays().newDoubleArray(1, true); } + sumMetricValuesIterator = null; } @Override @@ -140,35 +145,35 @@ public void collect(int doc, long bucket) throws IOException { public LeafBucketCollector getStarTreeCollector(LeafReaderContext ctx, LeafBucketCollector sub, CompositeIndexFieldInfo starTree) throws IOException { final CompensatedSum kahanSummation = new CompensatedSum(sums.get(0), 0); - if (parent != null && subAggregators.length == 0) { - return new StarTreeBucketCollector() { - StarTreeValues starTreeValues = getStarTreeValues(ctx, starTree); - // assert starTreeValues != null; - - // FixedBitSet matchingDocsBitSet = StarTreeFilter.getPredicateValueToFixedBitSetMap(starTreeValues, "@timestamp_month"); - - SortedNumericStarTreeValuesIterator metricValuesIterator = (SortedNumericStarTreeValuesIterator) starTreeValues - .getMetricValuesIterator("startree1_status_sum_metric"); - - @Override - public void collectStarEntry(int starTreeEntryBit, long bucket) throws IOException { - sums = context.bigArrays().grow(sums, bucket + 1); - // Advance the valuesIterator to the current bit - if (!metricValuesIterator.advanceExact(starTreeEntryBit)) { - return; // Skip if no entries for this document - } - double metricValue = NumericUtils.sortableLongToDouble(metricValuesIterator.nextValue()); - - double sum = sums.get(bucket); - - // sums = context.bigArrays().grow(sums, bucket + 1); - sums.set(bucket, metricValue + sum); - } - - @Override - public void collect(int doc, long owningBucketOrd) throws IOException {} - }; - } + // if (parent != null && subAggregators.length == 0) { + // return new StarTreeBucketCollector() { + // StarTreeValues starTreeValues = getStarTreeValues(ctx, starTree); + // // assert starTreeValues != null; + // + // // FixedBitSet matchingDocsBitSet = StarTreeFilter.getPredicateValueToFixedBitSetMap(starTreeValues, "@timestamp_month"); + // + // SortedNumericStarTreeValuesIterator metricValuesIterator = (SortedNumericStarTreeValuesIterator) starTreeValues + // .getMetricValuesIterator("startree1_status_sum_metric"); + // + // @Override + // public void collectStarEntry(int starTreeEntryBit, long bucket) throws IOException { + // sums = context.bigArrays().grow(sums, bucket + 1); + // // Advance the valuesIterator to the current bit + // if (!metricValuesIterator.advanceExact(starTreeEntryBit)) { + // return; // Skip if no entries for this document + // } + // double metricValue = NumericUtils.sortableLongToDouble(metricValuesIterator.nextValue()); + // + // double sum = sums.get(bucket); + // + // // sums = context.bigArrays().grow(sums, bucket + 1); + // sums.set(bucket, metricValue + sum); + // } + // + // @Override + // public void collect(int doc, long owningBucketOrd) throws IOException {} + // }; + // } return StarTreeQueryHelper.getStarTreeLeafCollector( context, valuesSource, @@ -206,4 +211,64 @@ public InternalAggregation buildEmptyAggregation() { public void doClose() { Releasables.close(sums, compensations); } + + // public SortedNumericStarTreeValuesIterator getMetricValueIterator() throws IOException { + // if (sumMetricValuesIterator == null) { + // sumMetricValuesIterator = (SortedNumericStarTreeValuesIterator) getStarTreeValues(ctx, null).getMetricValuesIterator( + // "startree1_status_sum_metric" + // ); + // } + // return sumMetricValuesIterator; + // } + + /** + * Pre-compute method to be invoked by parent aggregator + */ + @Override + public void preCompute(LeafReaderContext ctx, CompositeIndexFieldInfo starTree, LongKeyedBucketOrds bucketOrds) throws IOException { + StarTreeValues starTreeValues = getStarTreeValues(ctx, starTree); + // assert starTreeValues != null; + + FixedBitSet matchingDocsBitSet = StarTreeFilter.getPredicateValueToFixedBitSetMap(starTreeValues, "@timestamp_month"); + + SortedNumericStarTreeValuesIterator valuesIterator = (SortedNumericStarTreeValuesIterator) starTreeValues + .getDimensionValuesIterator("@timestamp_month"); + + SortedNumericStarTreeValuesIterator metricValuesIterator = (SortedNumericStarTreeValuesIterator) starTreeValues + .getMetricValuesIterator("startree1_status_sum_metric"); + + int numBits = matchingDocsBitSet.length(); + + if (numBits > 0) { + for (int bit = matchingDocsBitSet.nextSetBit(0); bit != DocIdSetIterator.NO_MORE_DOCS; bit = (bit + 1 < numBits) + ? matchingDocsBitSet.nextSetBit(bit + 1) + : DocIdSetIterator.NO_MORE_DOCS) { + + if (!valuesIterator.advanceExact(bit)) { + continue; + } + + for (int i = 0, count = valuesIterator.entryValueCount(); i < count; i++) { + long dimensionValue = valuesIterator.nextValue(); + + if (metricValuesIterator.advanceExact(bit)) { + double metricValue = NumericUtils.sortableLongToDouble(metricValuesIterator.nextValue()); + + long bucketOrd = bucketOrds.add(0, dimensionValue); + + // assert bucketOrd < 0; + + if (bucketOrd < 0) { + bucketOrd = -1 - bucketOrd; + // collectStarTreeBucket((StarTreeBucketCollector) sub, metricValue, bucketOrd, bit); + } + sums = context.bigArrays().grow(sums, bucketOrd + 1); + double sum = sums.get(bucketOrd); + sum = sum + metricValue; + sums.set(bucketOrd, sum); + } + } + } + } + } } diff --git a/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java b/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java index aa8212e8dad69..2e5f69ad633e6 100644 --- a/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java +++ b/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java @@ -329,6 +329,11 @@ protected void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collec searchContext.setSearchTimedOut(true); return; } + // Resolve via star-tree + // if (queryStarTree(ctx, leafCollector)) { + // return; + // } + // catch early terminated exception and rethrow? Bits liveDocs = ctx.reader().getLiveDocs(); BitSet liveDocsBitSet = getSparseBitSetOrNull(liveDocs); @@ -371,6 +376,30 @@ protected void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collec leafCollector.finish(); } + // private boolean queryStarTree(LeafReaderContext ctx, LeafCollector leafCollector) throws IOException { + // if(!(leafCollector instanceof StarTreeBucketCollector)) { + // return false; + // } + // CompositeIndexFieldInfo supportedStarTree = getSupportedStarTree(searchContext); + // if (supportedStarTree != null) { + // StarTreeValues starTreeValues = getStarTreeValues(ctx, supportedStarTree); + // assert starTreeValues != null; + // + // FixedBitSet matchingDocsBitSet = StarTreeFilter.getPredicateValueToFixedBitSetMap(starTreeValues, "@timestamp_month"); + // int numBits = matchingDocsBitSet.length(); + // + // if (numBits > 0) { + // for (int bit = matchingDocsBitSet.nextSetBit(0); bit != DocIdSetIterator.NO_MORE_DOCS; bit = (bit + 1 < numBits) + // ? matchingDocsBitSet.nextSetBit(bit + 1) + // : DocIdSetIterator.NO_MORE_DOCS) { + // ((StarTreeBucketCollector) leafCollector).collectStarEntry(bit, 0); + // } + // } + // return true; + // } + // return false; + // } + private Weight wrapWeight(Weight weight) { if (cancellable.isEnabled()) { return new Weight(weight.getQuery()) {