Skip to content

Commit

Permalink
Introduce LocalBucketCountThresholds for local size and min_doc_count…
Browse files Browse the repository at this point in the history
… values

Signed-off-by: Jay Deng <[email protected]>
  • Loading branch information
jed326 authored and Jay Deng committed Aug 9, 2023
1 parent aac2559 commit 3b55340
Show file tree
Hide file tree
Showing 14 changed files with 89 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public ReduceableSearchResult reduce(Collection<Collector> collectors) throws IO
return buildAggregationResult(internalAggregations);
}

public AggregationReduceableSearchResult buildAggregationResult(InternalAggregations internalAggregations) {
protected AggregationReduceableSearchResult buildAggregationResult(InternalAggregations internalAggregations) {
return new AggregationReduceableSearchResult(internalAggregations);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public Collector newCollector() throws IOException {
}

@Override
public AggregationReduceableSearchResult buildAggregationResult(InternalAggregations internalAggregations) {
protected AggregationReduceableSearchResult buildAggregationResult(InternalAggregations internalAggregations) {
// Reduce the aggregations across slices before sending to the coordinator. We will perform shard level reduce as long as any slices
// were created so that we can apply shard level bucket count thresholds in the reduce phase.
return new AggregationReduceableSearchResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.rest.action.search.RestSearchAction;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds;
import org.opensearch.search.aggregations.bucket.terms.TermsAggregator;
import org.opensearch.search.aggregations.pipeline.PipelineAggregator;
import org.opensearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree;
Expand Down Expand Up @@ -161,14 +162,16 @@ public boolean isSliceLevel() {
return this.isSliceLevel;
}

// For slice level partial reduce we will apply shard level `shard_size` and `shard_min_doc_count` limits whereas for coordinator
// level partial reduce it will use top level `size` and `min_doc_count`
public int getRequiredSizeLocal(TermsAggregator.BucketCountThresholds bucketCountThresholds) {
return isSliceLevel() ? bucketCountThresholds.getShardSize() : bucketCountThresholds.getRequiredSize();
}

public long getMinDocCountLocal(TermsAggregator.BucketCountThresholds bucketCountThresholds) {
return isSliceLevel() ? bucketCountThresholds.getShardMinDocCount() : bucketCountThresholds.getMinDocCount();
/**
* For slice level partial reduce we will apply shard level `shard_size` and `shard_min_doc_count` limits
* whereas for coordinator level partial reduce it will use top level `size` and `min_doc_count`
*/
public LocalBucketCountThresholds asLocalBucketCountThresholds(TermsAggregator.BucketCountThresholds bucketCountThresholds) {
if (isSliceLevel()) {
return new LocalBucketCountThresholds(bucketCountThresholds.getShardMinDocCount(), bucketCountThresholds.getShardSize());
} else {
return new LocalBucketCountThresholds(bucketCountThresholds.getMinDocCount(), bucketCountThresholds.getRequiredSize());
}
}

public BigArrays bigArrays() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public Collector newCollector() throws IOException {
}

@Override
public AggregationReduceableSearchResult buildAggregationResult(InternalAggregations internalAggregations) {
protected AggregationReduceableSearchResult buildAggregationResult(InternalAggregations internalAggregations) {
// Reduce the aggregations across slices before sending to the coordinator. We will perform shard level reduce as long as any slices
// were created so that we can apply shard level bucket count thresholds in the reduce phase.
return new AggregationReduceableSearchResult(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.aggregations.bucket;

import org.opensearch.search.aggregations.bucket.terms.TermsAggregator;

/**
* BucketCountThresholds type that holds the local (either shard level or request level) bucket count thresholds in minDocCount and requireSize fields.
* Similar to {@link TermsAggregator.BucketCountThresholds} however only provides getters for the local members and no setters.
*
* @opensearch.internal
*/
public class LocalBucketCountThresholds {

private final long minDocCount;
private final int requiredSize;

public LocalBucketCountThresholds(long localminDocCount, int localRequiredSize) {
this.minDocCount = localminDocCount;
this.requiredSize = localRequiredSize;
}

public int getRequiredSize() {
return requiredSize;
}

public long getMinDocCount() {
return minDocCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.opensearch.search.aggregations.InternalOrder;
import org.opensearch.search.aggregations.LeafBucketCollector;
import org.opensearch.search.aggregations.LeafBucketCollectorBase;
import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds;
import org.opensearch.search.aggregations.bucket.terms.SignificanceLookup.BackgroundFrequencyForBytes;
import org.opensearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic;
import org.opensearch.search.aggregations.support.ValuesSource;
Expand Down Expand Up @@ -603,6 +604,7 @@ abstract class ResultStrategy<
TB extends InternalMultiBucketAggregation.InternalBucket> implements Releasable {

private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
LocalBucketCountThresholds localBucketCountThresholds = context.asLocalBucketCountThresholds(bucketCountThresholds);
if (valueCount == 0) { // no context in this reader
InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
Expand All @@ -615,11 +617,11 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws
long[] otherDocCount = new long[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
final int size;
if (context.getMinDocCountLocal(bucketCountThresholds) == 0) {
if (localBucketCountThresholds.getMinDocCount() == 0) {
// if minDocCount == 0 then we can end up with more buckets then maxBucketOrd() returns
size = (int) Math.min(valueCount, context.getRequiredSizeLocal(bucketCountThresholds));
size = (int) Math.min(valueCount, localBucketCountThresholds.getRequiredSize());
} else {
size = (int) Math.min(maxBucketOrd(), context.getRequiredSizeLocal(bucketCountThresholds));
size = (int) Math.min(maxBucketOrd(), localBucketCountThresholds.getRequiredSize());
}
PriorityQueue<TB> ordered = buildPriorityQueue(size);
final int finalOrdIdx = ordIdx;
Expand All @@ -630,7 +632,7 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws
@Override
public void accept(long globalOrd, long bucketOrd, long docCount) throws IOException {
otherDocCount[finalOrdIdx] += docCount;
if (docCount >= context.getMinDocCountLocal(bucketCountThresholds)) {
if (docCount >= localBucketCountThresholds.getMinDocCount()) {
if (spare == null) {
spare = buildEmptyTemporaryBucket();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.search.aggregations.InternalAggregation;
import org.opensearch.search.aggregations.InternalAggregations;
import org.opensearch.search.aggregations.InternalMultiBucketAggregation;
import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds;
import org.opensearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic;

import java.io.IOException;
Expand Down Expand Up @@ -233,7 +234,7 @@ protected final void doWriteTo(StreamOutput out) throws IOException {

@Override
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {

LocalBucketCountThresholds localBucketCountThresholds = reduceContext.asLocalBucketCountThresholds(bucketCountThresholds);
long globalSubsetSize = 0;
long globalSupersetSize = 0;
// Compute the overall result set size and the corpus size using the
Expand Down Expand Up @@ -278,7 +279,7 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
boolean isCoordinatorPartialReduce = reduceContext.isFinalReduce() == false && reduceContext.isSliceLevel() == false;
// Do not apply size threshold on coordinator partial reduce
final int size = !isCoordinatorPartialReduce
? Math.min(reduceContext.getRequiredSizeLocal(bucketCountThresholds), buckets.size())
? Math.min(localBucketCountThresholds.getRequiredSize(), buckets.size())
: buckets.size();
BucketSignificancePriorityQueue<B> ordered = new BucketSignificancePriorityQueue<>(size);
for (Map.Entry<String, List<B>> entry : buckets.entrySet()) {
Expand All @@ -287,7 +288,7 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
b.updateScore(heuristic);
// For concurrent search case we do not apply bucket count thresholds in buildAggregation and instead is done here during
// reduce. However, the bucket score is only evaluated at the final coordinator reduce.
boolean meetsThresholds = (b.subsetDf >= reduceContext.getMinDocCountLocal(bucketCountThresholds))
boolean meetsThresholds = (b.subsetDf >= localBucketCountThresholds.getMinDocCount())
&& (((b.score > 0) || reduceContext.isSliceLevel()));
if (isCoordinatorPartialReduce || meetsThresholds) {
B removed = ordered.insertWithOverflow(b);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.opensearch.search.aggregations.InternalOrder;
import org.opensearch.search.aggregations.KeyComparable;
import org.opensearch.search.aggregations.bucket.IteratorAndCurrent;
import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds;
import org.opensearch.search.aggregations.bucket.MultiBucketsAggregation;

import java.io.IOException;
Expand Down Expand Up @@ -388,6 +389,7 @@ private List<B> reduceLegacy(List<InternalAggregation> aggregations, ReduceConte
}

public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
LocalBucketCountThresholds localBucketCountThresholds = reduceContext.asLocalBucketCountThresholds(bucketCountThresholds);
long sumDocCountError = 0;
long otherDocCount = 0;
InternalTerms<A, B> referenceTerms = null;
Expand Down Expand Up @@ -448,7 +450,7 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
}
final B[] list;
if (reduceContext.isFinalReduce() || reduceContext.isSliceLevel()) {
final int size = Math.min(reduceContext.getRequiredSizeLocal(bucketCountThresholds), reducedBuckets.size());
final int size = Math.min(localBucketCountThresholds.getRequiredSize(), reducedBuckets.size());
// final comparator
final BucketPriorityQueue<B> ordered = new BucketPriorityQueue<>(size, order.comparator());
for (B bucket : reducedBuckets) {
Expand All @@ -458,7 +460,7 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
final long finalSumDocCountError = sumDocCountError;
bucket.setDocCountError(docCountError -> docCountError + finalSumDocCountError);
}
if (bucket.getDocCount() >= reduceContext.getMinDocCountLocal(bucketCountThresholds)) {
if (bucket.getDocCount() >= localBucketCountThresholds.getMinDocCount()) {
B removed = ordered.insertWithOverflow(bucket);
if (removed != null) {
otherDocCount += removed.getDocCount();
Expand All @@ -477,8 +479,8 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
} else {
// we can prune the list on partial reduce if the aggregation is ordered by key
// and not filtered (minDocCount == 0)
int size = isKeyOrder(order) && reduceContext.getMinDocCountLocal(bucketCountThresholds) == 0
? Math.min(reduceContext.getRequiredSizeLocal(bucketCountThresholds), reducedBuckets.size())
int size = isKeyOrder(order) && localBucketCountThresholds.getMinDocCount() == 0
? Math.min(localBucketCountThresholds.getRequiredSize(), reducedBuckets.size())
: reducedBuckets.size();
list = createBucketsArray(size);
for (int i = 0; i < size; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.opensearch.search.aggregations.InternalOrder;
import org.opensearch.search.aggregations.LeafBucketCollector;
import org.opensearch.search.aggregations.LeafBucketCollectorBase;
import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds;
import org.opensearch.search.aggregations.bucket.terms.SignificanceLookup.BackgroundFrequencyForBytes;
import org.opensearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic;
import org.opensearch.search.aggregations.support.ValuesSource;
Expand Down Expand Up @@ -244,11 +245,12 @@ abstract class ResultStrategy<R extends InternalAggregation, B extends InternalM
Releasable {

private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
LocalBucketCountThresholds localBucketCountThresholds = context.asLocalBucketCountThresholds(bucketCountThresholds);
B[][] topBucketsPerOrd = buildTopBucketsPerOrd(owningBucketOrds.length);
long[] otherDocCounts = new long[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
collectZeroDocEntriesIfNeeded(owningBucketOrds[ordIdx]);
int size = (int) Math.min(bucketOrds.size(), context.getRequiredSizeLocal(bucketCountThresholds));
int size = (int) Math.min(bucketOrds.size(), localBucketCountThresholds.getRequiredSize());

PriorityQueue<B> ordered = buildPriorityQueue(size);
B spare = null;
Expand All @@ -257,7 +259,7 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws
while (ordsEnum.next()) {
long docCount = bucketDocCount(ordsEnum.ord());
otherDocCounts[ordIdx] += docCount;
if (docCount < context.getMinDocCountLocal(bucketCountThresholds)) {
if (docCount < localBucketCountThresholds.getMinDocCount()) {
continue;
}
if (spare == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.opensearch.search.aggregations.InternalOrder;
import org.opensearch.search.aggregations.LeafBucketCollector;
import org.opensearch.search.aggregations.bucket.DeferableBucketAggregator;
import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds;
import org.opensearch.search.aggregations.support.AggregationPath;
import org.opensearch.search.aggregations.support.ValuesSource;
import org.opensearch.search.internal.SearchContext;
Expand Down Expand Up @@ -118,13 +119,14 @@ public MultiTermsAggregator(

@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
LocalBucketCountThresholds localBucketCountThresholds = context.asLocalBucketCountThresholds(bucketCountThresholds);
InternalMultiTerms.Bucket[][] topBucketsPerOrd = new InternalMultiTerms.Bucket[owningBucketOrds.length][];
long[] otherDocCounts = new long[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
collectZeroDocEntriesIfNeeded(owningBucketOrds[ordIdx]);
long bucketsInOrd = bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]);

int size = (int) Math.min(bucketsInOrd, context.getRequiredSizeLocal(bucketCountThresholds));
int size = (int) Math.min(bucketsInOrd, localBucketCountThresholds.getRequiredSize());
PriorityQueue<InternalMultiTerms.Bucket> ordered = new BucketPriorityQueue<>(size, partiallyBuiltBucketComparator);
InternalMultiTerms.Bucket spare = null;
BytesRef dest = null;
Expand All @@ -136,7 +138,7 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
while (ordsEnum.next()) {
long docCount = bucketDocCount(ordsEnum.ord());
otherDocCounts[ordIdx] += docCount;
if (docCount < context.getMinDocCountLocal(bucketCountThresholds)) {
if (docCount < localBucketCountThresholds.getMinDocCount()) {
continue;
}
if (spare == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.opensearch.search.aggregations.InternalOrder;
import org.opensearch.search.aggregations.LeafBucketCollector;
import org.opensearch.search.aggregations.LeafBucketCollectorBase;
import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds;
import org.opensearch.search.aggregations.bucket.terms.IncludeExclude.LongFilter;
import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds.BucketOrdsEnum;
import org.opensearch.search.aggregations.bucket.terms.SignificanceLookup.BackgroundFrequencyForLong;
Expand Down Expand Up @@ -173,21 +174,22 @@ abstract class ResultStrategy<R extends InternalAggregation, B extends InternalM
implements
Releasable {
private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
LocalBucketCountThresholds localBucketCountThresholds = context.asLocalBucketCountThresholds(bucketCountThresholds);
B[][] topBucketsPerOrd = buildTopBucketsPerOrd(owningBucketOrds.length);
long[] otherDocCounts = new long[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
collectZeroDocEntriesIfNeeded(owningBucketOrds[ordIdx]);
long bucketsInOrd = bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]);

int size = (int) Math.min(bucketsInOrd, context.getRequiredSizeLocal(bucketCountThresholds));
int size = (int) Math.min(bucketsInOrd, localBucketCountThresholds.getRequiredSize());
PriorityQueue<B> ordered = buildPriorityQueue(size);
B spare = null;
BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
Supplier<B> emptyBucketBuilder = emptyBucketBuilder(owningBucketOrds[ordIdx]);
while (ordsEnum.next()) {
long docCount = bucketDocCount(ordsEnum.ord());
otherDocCounts[ordIdx] += docCount;
if (docCount < context.getMinDocCountLocal(bucketCountThresholds)) {
if (docCount < localBucketCountThresholds.getMinDocCount()) {
continue;
}
if (spare == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ public abstract class TermsAggregator extends DeferableBucketAggregator {
*/
public static class BucketCountThresholds implements Writeable, ToXContentFragment {
private long minDocCount;
protected long shardMinDocCount;
private long shardMinDocCount;
private int requiredSize;
protected int shardSize;
private int shardSize;

public BucketCountThresholds(long minDocCount, long shardMinDocCount, int requiredSize, int shardSize) {
this.minDocCount = minDocCount;
Expand Down
Loading

0 comments on commit 3b55340

Please sign in to comment.