Skip to content

Commit

Permalink
temp
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Deng committed Aug 3, 2023
1 parent 4ad4182 commit 95e8eab
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -617,9 +617,9 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws
final int size;
if (bucketCountThresholds.getMinDocCount() == 0) {
// if minDocCount == 0 then we can end up with more buckets then maxBucketOrd() returns
size = (int) Math.min(valueCount, bucketCountThresholds.getShardSize());
size = context.isConcurrentSegmentSearchEnabled() ? (int) valueCount : (int) Math.min(valueCount, bucketCountThresholds.getShardSize());
} else {
size = (int) Math.min(maxBucketOrd(), bucketCountThresholds.getShardSize());
size = context.isConcurrentSegmentSearchEnabled() ? (int) maxBucketOrd() : (int) Math.min(maxBucketOrd(), bucketCountThresholds.getShardSize());
}
PriorityQueue<TB> ordered = buildPriorityQueue(size);
final int finalOrdIdx = ordIdx;
Expand All @@ -630,7 +630,8 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws
@Override
public void accept(long globalOrd, long bucketOrd, long docCount) throws IOException {
otherDocCount[finalOrdIdx] += docCount;
if (docCount >= bucketCountThresholds.getShardMinDocCount()) {
// Don't evaluate shard_min_doc_count at the slice level for concurrent segment search
if (context.isConcurrentSegmentSearchEnabled() || docCount >= bucketCountThresholds.getShardMinDocCount()) {
if (spare == null) {
spare = buildEmptyTemporaryBucket();
}
Expand Down Expand Up @@ -795,7 +796,7 @@ StringTerms buildResult(long owningBucketOrd, long otherDocCount, StringTerms.Bu
} else {
reduceOrder = order;
}
return new StringTerms(
StringTerms stringTerms = new StringTerms(
name,
reduceOrder,
order,
Expand All @@ -809,6 +810,8 @@ StringTerms buildResult(long owningBucketOrd, long otherDocCount, StringTerms.Bu
Arrays.asList(topBuckets),
0
);
stringTerms.setShardMinDocCount(bucketCountThresholds.getShardMinDocCount());
return stringTerms;
}

@Override
Expand Down Expand Up @@ -922,7 +925,7 @@ void buildSubAggs(SignificantStringTerms.Bucket[][] topBucketsPreOrd) throws IOE

@Override
SignificantStringTerms buildResult(long owningBucketOrd, long otherDocCount, SignificantStringTerms.Bucket[] topBuckets) {
return new SignificantStringTerms(
SignificantStringTerms significantStringTerms = new SignificantStringTerms(
name,
bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(),
Expand All @@ -933,6 +936,9 @@ SignificantStringTerms buildResult(long owningBucketOrd, long otherDocCount, Sig
significanceHeuristic,
Arrays.asList(topBuckets)
);
significantStringTerms.setShardSize(bucketCountThresholds.getShardSize());
significantStringTerms.setShardMinDocCount(bucketCountThresholds.getShardMinDocCount());
return significantStringTerms;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,15 @@ public final XContentBuilder toXContent(XContentBuilder builder, Params params)

protected final int requiredSize;
protected final long minDocCount;
protected int shardSize;
protected long shardMinDocCount;

protected InternalSignificantTerms(String name, int requiredSize, long minDocCount, Map<String, Object> metadata) {
super(name, metadata);
this.requiredSize = requiredSize;
this.minDocCount = minDocCount;
shardSize = 0;
shardMinDocCount = 0;
}

/**
Expand All @@ -222,8 +226,25 @@ protected final void doWriteTo(StreamOutput out) throws IOException {
@Override
public abstract List<B> getBuckets();

public int getShardSize() {
return shardSize;
}

public void setShardSize(int shardSize) {
this.shardSize = shardSize;
}

public long getShardMinDocCount() {
return shardMinDocCount;
}

public void setShardMinDocCount(long shardMinDocCount) {
this.shardMinDocCount = shardMinDocCount;
}

@Override
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
// TODO: Refactor this into slice level and shard level reduce in different helper functions
long globalSubsetSize = 0;
long globalSupersetSize = 0;
// Compute the overall result set size and the corpus size using the
Expand Down Expand Up @@ -265,21 +286,43 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
}
}
SignificanceHeuristic heuristic = getSignificanceHeuristic().rewrite(reduceContext);
final int size = reduceContext.isFinalReduce() == false ? buckets.size() : Math.min(requiredSize, buckets.size());
// Apply shard_size limit at slice level reduce
final int size;
if (reduceContext.isFinalReduce()) {
size = Math.min(requiredSize, buckets.size());
} else if (reduceContext.isSliceLevel()) {
size = Math.min(getShardSize(), buckets.size());
} else {
size = buckets.size();
}
BucketSignificancePriorityQueue<B> ordered = new BucketSignificancePriorityQueue<>(size);
for (Map.Entry<String, List<B>> entry : buckets.entrySet()) {
List<B> sameTermBuckets = entry.getValue();
final B b = reduceBucket(sameTermBuckets, reduceContext);
b.updateScore(heuristic);
if (((b.score > 0) && (b.subsetDf >= minDocCount)) || reduceContext.isFinalReduce() == false) {
B removed = ordered.insertWithOverflow(b);
if (removed == null) {
reduceContext.consumeBucketsAndMaybeBreak(1);
// this needs to be simplified greatly
if (reduceContext.isSliceLevel()) {
if ((b.score > 0) && (b.subsetDf >= getShardMinDocCount())) {
B removed = ordered.insertWithOverflow(b);
if (removed == null) {
reduceContext.consumeBucketsAndMaybeBreak(1);
} else {
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(removed));
}
} else {
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(removed));
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(b));
}
} else {
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(b));
if (((b.score > 0) && (b.subsetDf >= minDocCount)) || reduceContext.isFinalReduce() == false) {
B removed = ordered.insertWithOverflow(b);
if (removed == null) {
reduceContext.consumeBucketsAndMaybeBreak(1);
} else {
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(removed));
}
} else {
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(b));
}
}
}
B[] list = createBucketsArray(ordered.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ public int hashCode() {
protected final BucketOrder order;
protected final int requiredSize;
protected final long minDocCount;
protected long shardMinDocCount;

/**
* Creates a new {@link InternalTerms}
Expand All @@ -246,6 +247,7 @@ protected InternalTerms(
this.order = order;
this.requiredSize = requiredSize;
this.minDocCount = minDocCount;
this.shardMinDocCount = 0;
}

/**
Expand Down Expand Up @@ -329,17 +331,29 @@ protected boolean lessThan(IteratorAndCurrent<B> a, IteratorAndCurrent<B> b) {
pq.add(new IteratorAndCurrent(terms.getBuckets().iterator()));
}
}
List<B> reducedBuckets = new ArrayList<>();
;
final BucketPriorityQueue<B> reducedBuckets;
// list of buckets coming from different shards that have the same key
List<B> currentBuckets = new ArrayList<>();

// Apply shard_size parameter at the slice reduce level if it is > 0
if (reduceContext.isSliceLevel() && getShardSize() > 0) {
final int size = Math.min(getShardSize(), aggregations.size());
reducedBuckets = new BucketPriorityQueue<>(size, order.comparator());
} else {
reducedBuckets = new BucketPriorityQueue<>(requiredSize, order.comparator());
}

B lastBucket = null;
while (pq.size() > 0) {
final IteratorAndCurrent<B> top = pq.top();
assert lastBucket == null || cmp.compare(top.current(), lastBucket) >= 0;
if (lastBucket != null && cmp.compare(top.current(), lastBucket) != 0) {
// the key changes, reduce what we already buffered and reset the buffer for current buckets
final B reduced = reduceBucket(currentBuckets, reduceContext);
reducedBuckets.add(reduced);
if (!reduceContext.isSliceLevel() || reduced.getDocCount() >= getShardMinDocCount()) {
reducedBuckets.insertWithOverflow(reduced);
}
currentBuckets.clear();
}
lastBucket = top.current();
Expand All @@ -355,9 +369,16 @@ protected boolean lessThan(IteratorAndCurrent<B> a, IteratorAndCurrent<B> b) {

if (currentBuckets.isEmpty() == false) {
final B reduced = reduceBucket(currentBuckets, reduceContext);
reducedBuckets.add(reduced);
if (!reduceContext.isSliceLevel() || reduced.getDocCount() >= getShardMinDocCount()) {
reducedBuckets.insertWithOverflow(reduced);
}
}
return reducedBuckets;

// Shards must return buckets sorted by key
List<B> result = new ArrayList<>();
reducedBuckets.forEach(result::add);
result.sort(cmp);
return result;
}

private List<B> reduceLegacy(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
Expand Down Expand Up @@ -521,6 +542,14 @@ protected B reduceBucket(List<B> buckets, ReduceContext context) {
return createBucket(docCount, aggs, docCountError, buckets.get(0));
}

protected void setShardMinDocCount(long shardMinDocCount) {
this.shardMinDocCount = shardMinDocCount;
}

protected long getShardMinDocCount() {
return shardMinDocCount;
}

protected abstract void setDocCountError(long docCountError);

protected abstract int getShardSize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,16 +248,23 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws
long[] otherDocCounts = new long[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
collectZeroDocEntriesIfNeeded(owningBucketOrds[ordIdx]);
int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize());

// Do not apply shard_size at the slice level for concurrent segment search
int size;
if (context.isConcurrentSegmentSearchEnabled()) {
size = (int) bucketOrds.size();
} else {
size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize());
}
PriorityQueue<B> ordered = buildPriorityQueue(size);
B spare = null;
BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
Supplier<B> emptyBucketBuilder = emptyBucketBuilder(owningBucketOrds[ordIdx]);
while (ordsEnum.next()) {
long docCount = bucketDocCount(ordsEnum.ord());
otherDocCounts[ordIdx] += docCount;
if (docCount < bucketCountThresholds.getShardMinDocCount()) {
// Don't evaluate shard_min_doc_count at the slice level for concurrent segment search
if (!context.isConcurrentSegmentSearchEnabled() && docCount < bucketCountThresholds.getShardMinDocCount()) {
continue;
}
if (spare == null) {
Expand Down Expand Up @@ -450,7 +457,7 @@ StringTerms buildResult(long owningBucketOrd, long otherDocCount, StringTerms.Bu
} else {
reduceOrder = order;
}
return new StringTerms(
StringTerms stringTerms = new StringTerms(
name,
reduceOrder,
order,
Expand All @@ -464,6 +471,8 @@ StringTerms buildResult(long owningBucketOrd, long otherDocCount, StringTerms.Bu
Arrays.asList(topBuckets),
0
);
stringTerms.setShardMinDocCount(bucketCountThresholds.getShardMinDocCount());
return stringTerms;
}

@Override
Expand Down Expand Up @@ -570,7 +579,7 @@ void buildSubAggs(SignificantStringTerms.Bucket[][] topBucketsPerOrd) throws IOE

@Override
SignificantStringTerms buildResult(long owningBucketOrd, long otherDocCount, SignificantStringTerms.Bucket[] topBuckets) {
return new SignificantStringTerms(
SignificantStringTerms significantStringTerms = new SignificantStringTerms(
name,
bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(),
Expand All @@ -581,6 +590,9 @@ SignificantStringTerms buildResult(long owningBucketOrd, long otherDocCount, Sig
significanceHeuristic,
Arrays.asList(topBuckets)
);
significantStringTerms.setShardMinDocCount(bucketCountThresholds.getShardMinDocCount());
significantStringTerms.setShardSize(bucketCountThresholds.getShardSize());
return significantStringTerms;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,13 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
collectZeroDocEntriesIfNeeded(owningBucketOrds[ordIdx]);
long bucketsInOrd = bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]);

int size = (int) Math.min(bucketsInOrd, bucketCountThresholds.getShardSize());
// Do not apply shard_size at the slice level for concurrent segment search
int size;
if (context.isConcurrentSegmentSearchEnabled()) {
size = (int) bucketsInOrd;
} else {
size = (int) Math.min(bucketsInOrd, bucketCountThresholds.getShardSize());
}
PriorityQueue<InternalMultiTerms.Bucket> ordered = new BucketPriorityQueue<>(size, partiallyBuiltBucketComparator);
InternalMultiTerms.Bucket spare = null;
BytesRef dest = null;
Expand All @@ -136,7 +142,8 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
while (ordsEnum.next()) {
long docCount = bucketDocCount(ordsEnum.ord());
otherDocCounts[ordIdx] += docCount;
if (docCount < bucketCountThresholds.getShardMinDocCount()) {
// Don't evaluate shard_min_doc_count at the slice level for concurrent segment search
if (!context.isConcurrentSegmentSearchEnabled() && docCount < bucketCountThresholds.getShardMinDocCount()) {
continue;
}
if (spare == null) {
Expand Down Expand Up @@ -178,7 +185,7 @@ InternalMultiTerms buildResult(long owningBucketOrd, long otherDocCount, Interna
} else {
reduceOrder = order;
}
return new InternalMultiTerms(
InternalMultiTerms internalMultiTerms = new InternalMultiTerms(
name,
reduceOrder,
order,
Expand All @@ -192,6 +199,8 @@ InternalMultiTerms buildResult(long owningBucketOrd, long otherDocCount, Interna
formats,
List.of(topBuckets)
);
internalMultiTerms.setShardMinDocCount(bucketCountThresholds.getShardMinDocCount());
return internalMultiTerms;
}

@Override
Expand Down
Loading

0 comments on commit 95e8eab

Please sign in to comment.