Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not evaluate shard_size and shard_min_doc_count at segment slice level #9085

Merged
merged 4 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add support for aggregation profiler with concurrent aggregation ([#8801](https://github.com/opensearch-project/OpenSearch/pull/8801))
- [Remove] Deprecated Fractional ByteSizeValue support #9005 ([#9005](https://github.com/opensearch-project/OpenSearch/pull/9005))
- Make MultiBucketConsumerService thread safe to use across slices during search ([#9047](https://github.com/opensearch-project/OpenSearch/pull/9047))
- Change shard_size and shard_min_doc_count evaluation to happen in shard level reduce phase ([#9085](https://github.com/opensearch-project/OpenSearch/pull/9085))

### Deprecated

Expand All @@ -129,4 +130,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Security

[Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.10...2.x
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.10...2.x
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.opensearch.search.aggregations.InternalAggregations;
import org.opensearch.search.aggregations.MultiBucketConsumerService;
import org.opensearch.search.aggregations.bucket.terms.StringTerms;
import org.opensearch.search.aggregations.bucket.terms.TermsAggregator;
import org.opensearch.search.aggregations.pipeline.PipelineAggregator;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.query.QuerySearchResult;
Expand Down Expand Up @@ -170,15 +171,14 @@ private StringTerms newTerms(Random rand, BytesRef[] dict, boolean withNested) {
"terms",
BucketOrder.key(true),
BucketOrder.count(false),
topNSize,
1,
Collections.emptyMap(),
DocValueFormat.RAW,
numShards,
true,
0,
buckets,
0
0,
new TermsAggregator.BucketCountThresholds(1, 0, topNSize, numShards)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.opensearch.search.aggregations.bucket.terms.TermsAggregator;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -86,15 +87,14 @@ private StringTerms newTerms(boolean withNested) {
"test",
BucketOrder.key(true),
BucketOrder.key(true),
buckets,
1,
null,
DocValueFormat.RAW,
buckets,
false,
100000,
resultBuckets,
0
0,
new TermsAggregator.BucketCountThresholds(1, 0, buckets, buckets)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
* aggregation operators
*/
class AggregationCollectorManager implements CollectorManager<Collector, ReduceableSearchResult> {
private final SearchContext context;
protected final SearchContext context;
private final CheckedFunction<SearchContext, List<Aggregator>, IOException> aggProvider;
private final String collectorReason;

Expand Down Expand Up @@ -63,18 +63,11 @@ public ReduceableSearchResult reduce(Collection<Collector> collectors) throws IO
}

final InternalAggregations internalAggregations = InternalAggregations.from(internals);
// Reduce the aggregations across slices before sending to the coordinator. We will perform shard level reduce iff multiple slices
// were created to execute this request and it used concurrent segment search path
// TODO: Add the check for flag that the request was executed using concurrent search
if (collectors.size() > 1) {
// using reduce is fine here instead of topLevelReduce as pipeline aggregation is evaluated on the coordinator after all
// documents are collected across shards for an aggregation
return new AggregationReduceableSearchResult(
InternalAggregations.reduce(Collections.singletonList(internalAggregations), context.partialOnShard())
);
} else {
return new AggregationReduceableSearchResult(internalAggregations);
}
return buildAggregationResult(internalAggregations);
}

protected AggregationReduceableSearchResult buildAggregationResult(InternalAggregations internalAggregations) {
return new AggregationReduceableSearchResult(internalAggregations);
}

static Collector createCollector(SearchContext context, List<Aggregator> collectors, String reason) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.search.profile.query.CollectorResult;

import java.io.IOException;
import java.util.Collections;
import java.util.Objects;

/**
Expand All @@ -38,4 +39,13 @@ public Collector newCollector() throws IOException {
return super.newCollector();
}
}

@Override
protected AggregationReduceableSearchResult buildAggregationResult(InternalAggregations internalAggregations) {
// Reduce the aggregations across slices before sending to the coordinator. We will perform shard level reduce as long as any slices
// were created so that we can apply shard level bucket count thresholds in the reduce phase.
return new AggregationReduceableSearchResult(
InternalAggregations.reduce(Collections.singletonList(internalAggregations), context.partialOnShard())
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.rest.action.search.RestSearchAction;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds;
import org.opensearch.search.aggregations.bucket.terms.TermsAggregator;
import org.opensearch.search.aggregations.pipeline.PipelineAggregator;
import org.opensearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree;
import org.opensearch.search.aggregations.support.AggregationPath;
Expand Down Expand Up @@ -160,6 +162,18 @@ public boolean isSliceLevel() {
return this.isSliceLevel;
}

/**
* For slice level partial reduce we will apply shard level `shard_size` and `shard_min_doc_count` limits
* whereas for coordinator level partial reduce it will use top level `size` and `min_doc_count`
*/
public LocalBucketCountThresholds asLocalBucketCountThresholds(TermsAggregator.BucketCountThresholds bucketCountThresholds) {
if (isSliceLevel()) {
return new LocalBucketCountThresholds(bucketCountThresholds.getShardMinDocCount(), bucketCountThresholds.getShardSize());
} else {
return new LocalBucketCountThresholds(bucketCountThresholds.getMinDocCount(), bucketCountThresholds.getRequiredSize());
}
}

public BigArrays bigArrays() {
return bigArrays;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.search.profile.query.CollectorResult;

import java.io.IOException;
import java.util.Collections;
import java.util.Objects;

/**
Expand All @@ -38,4 +39,13 @@ public Collector newCollector() throws IOException {
return super.newCollector();
}
}

@Override
protected AggregationReduceableSearchResult buildAggregationResult(InternalAggregations internalAggregations) {
// Reduce the aggregations across slices before sending to the coordinator. We will perform shard level reduce as long as any slices
// were created so that we can apply shard level bucket count thresholds in the reduce phase.
return new AggregationReduceableSearchResult(
InternalAggregations.reduce(Collections.singletonList(internalAggregations), context.partialOnShard())
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.aggregations.bucket;

import org.opensearch.search.aggregations.bucket.terms.TermsAggregator;

/**
* BucketCountThresholds type that holds the local (either shard level or request level) bucket count thresholds in minDocCount and requireSize fields.
* Similar to {@link TermsAggregator.BucketCountThresholds} however only provides getters for the local members and no setters.
*
* @opensearch.internal
*/
public class LocalBucketCountThresholds {

private final long minDocCount;
private final int requiredSize;

public LocalBucketCountThresholds(long localminDocCount, int localRequiredSize) {
this.minDocCount = localminDocCount;
this.requiredSize = localRequiredSize;
}

public int getRequiredSize() {
return requiredSize;
}

public long getMinDocCount() {
return minDocCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,14 @@ protected StringTerms buildEmptyTermsAggregation() {
name,
order,
order,
bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(),
metadata(),
format,
bucketCountThresholds.getShardSize(),
showTermDocCountError,
0,
emptyList(),
0
0,
bucketCountThresholds
);
}

Expand All @@ -95,14 +94,13 @@ protected SignificantStringTerms buildEmptySignificantTermsAggregation(long subs
int supersetSize = topReader.numDocs();
return new SignificantStringTerms(
name,
bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(),
metadata(),
format,
subsetSize,
supersetSize,
significanceHeuristic,
emptyList()
emptyList(),
bucketCountThresholds
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,29 +130,27 @@ public DoubleTerms(
String name,
BucketOrder reduceOrder,
BucketOrder order,
int requiredSize,
long minDocCount,
Map<String, Object> metadata,
DocValueFormat format,
int shardSize,
boolean showTermDocCountError,
long otherDocCount,
List<Bucket> buckets,
long docCountError
long docCountError,
TermsAggregator.BucketCountThresholds bucketCountThresholds
) {
super(
name,
reduceOrder,
order,
requiredSize,
minDocCount,
metadata,
format,
shardSize,
showTermDocCountError,
otherDocCount,
buckets,
docCountError
docCountError,
bucketCountThresholds
);
}

Expand All @@ -174,15 +172,14 @@ public DoubleTerms create(List<Bucket> buckets) {
name,
reduceOrder,
order,
requiredSize,
minDocCount,
metadata,
format,
shardSize,
showTermDocCountError,
otherDocCount,
buckets,
docCountError
docCountError,
bucketCountThresholds
);
}

Expand All @@ -204,15 +201,14 @@ protected DoubleTerms create(String name, List<Bucket> buckets, BucketOrder redu
name,
reduceOrder,
order,
requiredSize,
minDocCount,
getMetadata(),
format,
shardSize,
showTermDocCountError,
otherDocCount,
buckets,
docCountError
docCountError,
bucketCountThresholds
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.opensearch.search.aggregations.InternalOrder;
import org.opensearch.search.aggregations.LeafBucketCollector;
import org.opensearch.search.aggregations.LeafBucketCollectorBase;
import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds;
import org.opensearch.search.aggregations.bucket.terms.SignificanceLookup.BackgroundFrequencyForBytes;
import org.opensearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic;
import org.opensearch.search.aggregations.support.ValuesSource;
Expand Down Expand Up @@ -603,6 +604,7 @@ abstract class ResultStrategy<
TB extends InternalMultiBucketAggregation.InternalBucket> implements Releasable {

private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
LocalBucketCountThresholds localBucketCountThresholds = context.asLocalBucketCountThresholds(bucketCountThresholds);
if (valueCount == 0) { // no context in this reader
InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
Expand All @@ -615,11 +617,11 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws
long[] otherDocCount = new long[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
final int size;
if (bucketCountThresholds.getMinDocCount() == 0) {
if (localBucketCountThresholds.getMinDocCount() == 0) {
// if minDocCount == 0 then we can end up with more buckets then maxBucketOrd() returns
size = (int) Math.min(valueCount, bucketCountThresholds.getShardSize());
size = (int) Math.min(valueCount, localBucketCountThresholds.getRequiredSize());
} else {
size = (int) Math.min(maxBucketOrd(), bucketCountThresholds.getShardSize());
size = (int) Math.min(maxBucketOrd(), localBucketCountThresholds.getRequiredSize());
}
PriorityQueue<TB> ordered = buildPriorityQueue(size);
final int finalOrdIdx = ordIdx;
Expand All @@ -630,7 +632,7 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws
@Override
public void accept(long globalOrd, long bucketOrd, long docCount) throws IOException {
otherDocCount[finalOrdIdx] += docCount;
if (docCount >= bucketCountThresholds.getShardMinDocCount()) {
if (docCount >= localBucketCountThresholds.getMinDocCount()) {
if (spare == null) {
spare = buildEmptyTemporaryBucket();
}
Expand Down Expand Up @@ -799,15 +801,14 @@ StringTerms buildResult(long owningBucketOrd, long otherDocCount, StringTerms.Bu
name,
reduceOrder,
order,
bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(),
metadata(),
format,
bucketCountThresholds.getShardSize(),
showTermDocCountError,
otherDocCount,
Arrays.asList(topBuckets),
0
0,
bucketCountThresholds
);
}

Expand Down Expand Up @@ -924,14 +925,13 @@ void buildSubAggs(SignificantStringTerms.Bucket[][] topBucketsPreOrd) throws IOE
SignificantStringTerms buildResult(long owningBucketOrd, long otherDocCount, SignificantStringTerms.Bucket[] topBuckets) {
return new SignificantStringTerms(
name,
bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(),
metadata(),
format,
subsetSize(owningBucketOrd),
supersetSize,
significanceHeuristic,
Arrays.asList(topBuckets)
Arrays.asList(topBuckets),
bucketCountThresholds
);
}

Expand Down
Loading