Skip to content

Commit

Permalink
Use BucketCountThresholds in InternalTerms and InternalAggregations a…
Browse files Browse the repository at this point in the history
…nd do not apply shard level thresholds at slice level for Concurrent Segment Search

Signed-off-by: Jay Deng <[email protected]>
  • Loading branch information
jed326 authored and Jay Deng committed Aug 8, 2023
1 parent 6eb87b5 commit 5a9a704
Show file tree
Hide file tree
Showing 36 changed files with 350 additions and 345 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add support for aggregation profiler with concurrent aggregation ([#8801](https://github.com/opensearch-project/OpenSearch/pull/8801))
- [Remove] Deprecated Fractional ByteSizeValue support #9005 ([#9005](https://github.com/opensearch-project/OpenSearch/pull/9005))
- Make MultiBucketConsumerService thread safe to use across slices during search ([#9047](https://github.com/opensearch-project/OpenSearch/pull/9047))
- Change shard_size and shard_min_doc_count evaluation to happen in shard level reduce phase ([#9085](https://github.com/opensearch-project/OpenSearch/pull/9085))

### Deprecated

Expand All @@ -129,4 +130,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Security

[Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.10...2.x
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.10...2.x
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.opensearch.search.aggregations.InternalAggregations;
import org.opensearch.search.aggregations.MultiBucketConsumerService;
import org.opensearch.search.aggregations.bucket.terms.StringTerms;
import org.opensearch.search.aggregations.bucket.terms.TermsAggregator;
import org.opensearch.search.aggregations.pipeline.PipelineAggregator;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.query.QuerySearchResult;
Expand Down Expand Up @@ -170,15 +171,13 @@ private StringTerms newTerms(Random rand, BytesRef[] dict, boolean withNested) {
"terms",
BucketOrder.key(true),
BucketOrder.count(false),
topNSize,
1,
Collections.emptyMap(),
DocValueFormat.RAW,
numShards,
true,
0,
buckets,
0
0,
new TermsAggregator.BucketCountThresholds(1, 0, topNSize, numShards)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.opensearch.search.aggregations.bucket.terms.TermsAggregator;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -86,15 +87,13 @@ private StringTerms newTerms(boolean withNested) {
"test",
BucketOrder.key(true),
BucketOrder.key(true),
buckets,
1,
null,
DocValueFormat.RAW,
buckets,
false,
100000,
resultBuckets,
0
0,
new TermsAggregator.BucketCountThresholds(1, 0, buckets, buckets)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public ReduceableSearchResult reduce(Collection<Collector> collectors) throws IO
// Reduce the aggregations across slices before sending to the coordinator. We will perform shard level reduce iff multiple slices
// were created to execute this request and it used concurrent segment search path
// TODO: Add the check for flag that the request was executed using concurrent search
if (collectors.size() > 1) {
if (collectors.size() >= 1) {
// using reduce is fine here instead of topLevelReduce as pipeline aggregation is evaluated on the coordinator after all
// documents are collected across shards for an aggregation
return new AggregationReduceableSearchResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,20 +72,7 @@ abstract class AbstractStringTermsAggregator extends TermsAggregator {
}

protected StringTerms buildEmptyTermsAggregation() {
return new StringTerms(
name,
order,
order,
bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(),
metadata(),
format,
bucketCountThresholds.getShardSize(),
showTermDocCountError,
0,
emptyList(),
0
);
return new StringTerms(name, order, order, metadata(), format, showTermDocCountError, 0, emptyList(), 0, bucketCountThresholds);
}

protected SignificantStringTerms buildEmptySignificantTermsAggregation(long subsetSize, SignificanceHeuristic significanceHeuristic) {
Expand All @@ -95,14 +82,13 @@ protected SignificantStringTerms buildEmptySignificantTermsAggregation(long subs
int supersetSize = topReader.numDocs();
return new SignificantStringTerms(
name,
bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(),
metadata(),
format,
subsetSize,
supersetSize,
significanceHeuristic,
emptyList()
emptyList(),
bucketCountThresholds
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,29 +130,25 @@ public DoubleTerms(
String name,
BucketOrder reduceOrder,
BucketOrder order,
int requiredSize,
long minDocCount,
Map<String, Object> metadata,
DocValueFormat format,
int shardSize,
boolean showTermDocCountError,
long otherDocCount,
List<Bucket> buckets,
long docCountError
long docCountError,
TermsAggregator.BucketCountThresholds bucketCountThresholds
) {
super(
name,
reduceOrder,
order,
requiredSize,
minDocCount,
metadata,
format,
shardSize,
showTermDocCountError,
otherDocCount,
buckets,
docCountError
docCountError,
bucketCountThresholds
);
}

Expand All @@ -174,15 +170,13 @@ public DoubleTerms create(List<Bucket> buckets) {
name,
reduceOrder,
order,
requiredSize,
minDocCount,
metadata,
format,
shardSize,
showTermDocCountError,
otherDocCount,
buckets,
docCountError
docCountError,
bucketCountThresholds
);
}

Expand All @@ -204,15 +198,13 @@ protected DoubleTerms create(String name, List<Bucket> buckets, BucketOrder redu
name,
reduceOrder,
order,
requiredSize,
minDocCount,
getMetadata(),
format,
shardSize,
showTermDocCountError,
otherDocCount,
buckets,
docCountError
docCountError,
bucketCountThresholds
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,15 @@ abstract class ResultStrategy<
TB extends InternalMultiBucketAggregation.InternalBucket> implements Releasable {

private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
int requiredSizeLocal;
long minDocCountLocal;
if (context.isConcurrentSegmentSearchEnabled()) {
requiredSizeLocal = Integer.MAX_VALUE;
minDocCountLocal = 0;
} else {
requiredSizeLocal = bucketCountThresholds.getShardSize();
minDocCountLocal = bucketCountThresholds.getShardMinDocCount();
}
if (valueCount == 0) { // no context in this reader
InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
Expand All @@ -615,11 +624,11 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws
long[] otherDocCount = new long[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
final int size;
if (bucketCountThresholds.getMinDocCount() == 0) {
if (minDocCountLocal == 0) {
// if minDocCount == 0 then we can end up with more buckets then maxBucketOrd() returns
size = (int) Math.min(valueCount, bucketCountThresholds.getShardSize());
size = (int) Math.min(valueCount, requiredSizeLocal);
} else {
size = (int) Math.min(maxBucketOrd(), bucketCountThresholds.getShardSize());
size = (int) Math.min(maxBucketOrd(), requiredSizeLocal);
}
PriorityQueue<TB> ordered = buildPriorityQueue(size);
final int finalOrdIdx = ordIdx;
Expand All @@ -630,7 +639,7 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws
@Override
public void accept(long globalOrd, long bucketOrd, long docCount) throws IOException {
otherDocCount[finalOrdIdx] += docCount;
if (docCount >= bucketCountThresholds.getShardMinDocCount()) {
if (docCount >= minDocCountLocal) {
if (spare == null) {
spare = buildEmptyTemporaryBucket();
}
Expand Down Expand Up @@ -799,15 +808,13 @@ StringTerms buildResult(long owningBucketOrd, long otherDocCount, StringTerms.Bu
name,
reduceOrder,
order,
bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(),
metadata(),
format,
bucketCountThresholds.getShardSize(),
showTermDocCountError,
otherDocCount,
Arrays.asList(topBuckets),
0
0,
bucketCountThresholds
);
}

Expand Down Expand Up @@ -924,14 +931,13 @@ void buildSubAggs(SignificantStringTerms.Bucket[][] topBucketsPreOrd) throws IOE
SignificantStringTerms buildResult(long owningBucketOrd, long otherDocCount, SignificantStringTerms.Bucket[] topBuckets) {
return new SignificantStringTerms(
name,
bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(),
metadata(),
format,
subsetSize(owningBucketOrd),
supersetSize,
significanceHeuristic,
Arrays.asList(topBuckets)
Arrays.asList(topBuckets),
bucketCountThresholds
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,15 @@ public abstract class InternalMappedSignificantTerms<

protected InternalMappedSignificantTerms(
String name,
int requiredSize,
long minDocCount,
Map<String, Object> metadata,
DocValueFormat format,
long subsetSize,
long supersetSize,
SignificanceHeuristic significanceHeuristic,
List<B> buckets
List<B> buckets,
TermsAggregator.BucketCountThresholds bucketCountThresholds
) {
super(name, requiredSize, minDocCount, metadata);
super(name, bucketCountThresholds, metadata);
this.format = format;
this.buckets = buckets;
this.subsetSize = subsetSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
*/
public abstract class InternalMappedTerms<A extends InternalTerms<A, B>, B extends InternalTerms.Bucket<B>> extends InternalTerms<A, B> {
protected final DocValueFormat format;
protected final int shardSize;
protected final boolean showTermDocCountError;
protected final long otherDocCount;
protected final List<B> buckets;
Expand All @@ -64,19 +63,16 @@ protected InternalMappedTerms(
String name,
BucketOrder reduceOrder,
BucketOrder order,
int requiredSize,
long minDocCount,
Map<String, Object> metadata,
DocValueFormat format,
int shardSize,
boolean showTermDocCountError,
long otherDocCount,
List<B> buckets,
long docCountError
long docCountError,
TermsAggregator.BucketCountThresholds bucketCountThresholds
) {
super(name, reduceOrder, order, requiredSize, minDocCount, metadata);
super(name, reduceOrder, order, bucketCountThresholds, metadata);
this.format = format;
this.shardSize = shardSize;
this.showTermDocCountError = showTermDocCountError;
this.otherDocCount = otherDocCount;
this.docCountError = docCountError;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,6 @@ public int compare(List<Object> thisObjects, List<Object> thatObjects) {
}
}

private final int shardSize;
private final boolean showTermDocCountError;
private final long otherDocCount;
private final List<DocValueFormat> termFormats;
Expand All @@ -233,18 +232,16 @@ public InternalMultiTerms(
String name,
BucketOrder reduceOrder,
BucketOrder order,
int requiredSize,
long minDocCount,
Map<String, Object> metadata,
int shardSize,
boolean showTermDocCountError,
long otherDocCount,
long docCountError,
List<DocValueFormat> formats,
List<Bucket> buckets
List<Bucket> buckets,
TermsAggregator.BucketCountThresholds bucketCountThresholds
) {
super(name, reduceOrder, order, requiredSize, minDocCount, metadata);
this.shardSize = shardSize;
super(name, reduceOrder, order, bucketCountThresholds, metadata);
this.shardSize = bucketCountThresholds.getShardSize();
this.showTermDocCountError = showTermDocCountError;
this.otherDocCount = otherDocCount;
this.termFormats = formats;
Expand Down Expand Up @@ -278,15 +275,13 @@ public InternalMultiTerms create(List<Bucket> buckets) {
name,
reduceOrder,
order,
requiredSize,
minDocCount,
metadata,
shardSize,
showTermDocCountError,
otherDocCount,
docCountError,
termFormats,
buckets
buckets,
bucketCountThresholds
);
}

Expand Down Expand Up @@ -357,15 +352,13 @@ protected InternalMultiTerms create(
name,
reduceOrder,
order,
requiredSize,
minDocCount,
metadata,
shardSize,
showTermDocCountError,
otherDocCount,
docCountError,
termFormats,
buckets
buckets,
bucketCountThresholds
);
}

Expand Down
Loading

0 comments on commit 5a9a704

Please sign in to comment.