Skip to content

Commit

Permalink
Avoid extending LeafCollectors
Browse files Browse the repository at this point in the history
Signed-off-by: Sandesh Kumar <[email protected]>
  • Loading branch information
sandeshkr419 committed Dec 3, 2024
1 parent 80a8ac9 commit e1898f1
Show file tree
Hide file tree
Showing 7 changed files with 235 additions and 112 deletions.
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.aggregations;

import java.io.IOException;

public abstract class StarTreeBucketCollector extends LeafBucketCollector {

public abstract void collectStarEntry(int starTreeEntry, long bucket) throws IOException;
}
/// *
// * SPDX-License-Identifier: Apache-2.0
// *
// * The OpenSearch Contributors require contributions made to
// * this file be licensed under the Apache-2.0 license or a
// * compatible open source license.
// */
//
// package org.opensearch.search.aggregations;
//
// import java.io.IOException;
//
// public abstract class StarTreeBucketCollector extends LeafBucketCollector {
//
// public abstract void collectStarEntry(int starTreeEntry, long bucket) throws IOException;
// }
Original file line number Diff line number Diff line change
@@ -1,59 +1,59 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/
package org.opensearch.search.aggregations;

import org.apache.lucene.search.Scorable;
import org.opensearch.common.lucene.ScorerAware;

import java.io.IOException;

/**
* A {@link LeafBucketCollector} that delegates all calls to the sub leaf
* aggregator and sets the scorer on its source of values if it implements
* {@link ScorerAware}.
*
* @opensearch.internal
*/
public class StarTreeLeafBucketCollectorBase extends StarTreeBucketCollector {
private final LeafBucketCollector sub;
private final ScorerAware values;

/**
* @param sub The leaf collector for sub aggregations.
* @param values The values. {@link ScorerAware#setScorer} will be called automatically on them if they implement {@link ScorerAware}.
*/
public StarTreeLeafBucketCollectorBase(LeafBucketCollector sub, Object values) {
this.sub = sub;
if (values instanceof ScorerAware) {
this.values = (ScorerAware) values;
} else {
this.values = null;
}
}

@Override
public void setScorer(Scorable s) throws IOException {
sub.setScorer(s);
if (values != null) {
values.setScorer(s);
}
}

@Override
public void collect(int doc, long bucket) throws IOException {
sub.collect(doc, bucket);
}

@Override
public void collectStarEntry(int starTreeEntry, long bucket) throws IOException {}
}
/// *
// * SPDX-License-Identifier: Apache-2.0
// *
// * The OpenSearch Contributors require contributions made to
// * this file be licensed under the Apache-2.0 license or a
// * compatible open source license.
// */
//
/// *
// * Modifications Copyright OpenSearch Contributors. See
// * GitHub history for details.
// */
// package org.opensearch.search.aggregations;
//
// import org.apache.lucene.search.Scorable;
// import org.opensearch.common.lucene.ScorerAware;
//
// import java.io.IOException;
//
/// **
// * A {@link LeafBucketCollector} that delegates all calls to the sub leaf
// * aggregator and sets the scorer on its source of values if it implements
// * {@link ScorerAware}.
// *
// * @opensearch.internal
// */
// public class StarTreeLeafBucketCollectorBase extends StarTreeBucketCollector {
// private final LeafBucketCollector sub;
// private final ScorerAware values;
//
// /**
// * @param sub The leaf collector for sub aggregations.
// * @param values The values. {@link ScorerAware#setScorer} will be called automatically on them if they implement {@link ScorerAware}.
// */
// public StarTreeLeafBucketCollectorBase(LeafBucketCollector sub, Object values) {
// this.sub = sub;
// if (values instanceof ScorerAware) {
// this.values = (ScorerAware) values;
// } else {
// this.values = null;
// }
// }
//
// @Override
// public void setScorer(Scorable s) throws IOException {
// sub.setScorer(s);
// if (values != null) {
// values.setScorer(s);
// }
// }
//
// @Override
// public void collect(int doc, long bucket) throws IOException {
// sub.collect(doc, bucket);
// }
//
// @Override
// public void collectStarEntry(int starTreeEntry, long bucket) throws IOException {}
// }
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.opensearch.search.aggregations.InternalAggregation;
import org.opensearch.search.aggregations.InternalAggregations;
import org.opensearch.search.aggregations.LeafBucketCollector;
import org.opensearch.search.aggregations.StarTreeBucketCollector;
import org.opensearch.search.aggregations.bucket.global.GlobalAggregator;
import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
import org.opensearch.search.aggregations.support.AggregationPath;
Expand Down Expand Up @@ -130,12 +129,13 @@ public final void collectExistingBucket(LeafBucketCollector subCollector, int do
subCollector.collect(doc, bucketOrd);
}

public final void collectStarTreeBucket(StarTreeBucketCollector subCollector, long docCount, long bucketOrd, int entryBit)
public final void collectStarTreeBucket(long docCount, long bucketOrd)
throws IOException {
if (docCounts.increment(bucketOrd, docCount) == docCount) {
multiBucketConsumer.accept(0);
}
subCollector.collectStarEntry(entryBit, bucketOrd);
// Only collect own bucket & not sub-aggregator buckets
// subCollector.collectStarEntry(entryBit, bucketOrd);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@
import org.opensearch.search.aggregations.InternalAggregation;
import org.opensearch.search.aggregations.LeafBucketCollector;
import org.opensearch.search.aggregations.LeafBucketCollectorBase;
import org.opensearch.search.aggregations.StarTreeBucketCollector;
import org.opensearch.search.aggregations.bucket.BucketsAggregator;
import org.opensearch.search.aggregations.bucket.filterrewrite.DateHistogramAggregatorBridge;
import org.opensearch.search.aggregations.bucket.filterrewrite.FilterRewriteOptimizationContext;
import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
import org.opensearch.search.aggregations.metrics.StarTreeCollector;
import org.opensearch.search.aggregations.support.ValuesSource;
import org.opensearch.search.aggregations.support.ValuesSourceConfig;
import org.opensearch.search.internal.SearchContext;
Expand Down Expand Up @@ -180,6 +180,8 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol
if (optimized) throw new CollectionTerminatedException();

SortedNumericDocValues values = valuesSource.longValues(ctx);

// Will migrate this to a separate precompute utility
CompositeIndexFieldInfo supportedStarTree = getSupportedStarTree(this.context);
if (supportedStarTree != null) {
StarTreeValues starTreeValues = getStarTreeValues(ctx, supportedStarTree);
Expand Down Expand Up @@ -213,15 +215,21 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol
long bucketOrd = bucketOrds.add(0, dimensionValue);
if (bucketOrd < 0) {
bucketOrd = -1 - bucketOrd;
collectStarTreeBucket((StarTreeBucketCollector) sub, metricValue, bucketOrd, bit);
collectStarTreeBucket(metricValue, bucketOrd);
} else {
grow(bucketOrd + 1);
collectStarTreeBucket((StarTreeBucketCollector) sub, metricValue, bucketOrd, bit);
collectStarTreeBucket(metricValue, bucketOrd);
}
}
}
}
}

// Run preCompute for all sub-aggregators
for (Aggregator subAggregator : subAggregators) {
// assuming query-matching was already done and only supported query shapes are executed here
((StarTreeCollector) subAggregator).preCompute(ctx, supportedStarTree, bucketOrds);
}
throw new CollectionTerminatedException();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.aggregations.metrics;

import org.apache.lucene.index.LeafReaderContext;
import org.opensearch.index.codec.composite.CompositeIndexFieldInfo;
import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;

import java.io.IOException;

public interface StarTreeCollector {
// public void collectStarEntry(int starTreeEntryBit, long bucket) throws IOException;

public void preCompute(LeafReaderContext ctx, CompositeIndexFieldInfo starTree, LongKeyedBucketOrds bucketOrds) throws IOException;
}
Loading

0 comments on commit e1898f1

Please sign in to comment.