diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java index 4cf710232c7a0..0ec03a6f56dd9 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java @@ -20,12 +20,10 @@ import org.apache.lucene.util.PriorityQueue; import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.LongArray; import org.elasticsearch.common.util.LongHash; import org.elasticsearch.common.util.ObjectArray; import org.elasticsearch.common.util.ObjectArrayPriorityQueue; -import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.search.DocValueFormat; @@ -102,14 +100,14 @@ public GlobalOrdinalsStringTermsAggregator( this.valueCount = valuesSupplier.get().getValueCount(); this.acceptedGlobalOrdinals = acceptedOrds; if (remapGlobalOrds) { - this.collectionStrategy = new RemapGlobalOrds(cardinality, excludeDeletedDocs); + this.collectionStrategy = new RemapGlobalOrds<>(this.resultStrategy, cardinality, excludeDeletedDocs); } else { this.collectionStrategy = cardinality.map(estimate -> { if (estimate > 1) { // This is a 500 class error, because we should never be able to reach it. throw new AggregationExecutionException("Dense ords don't know how to collect from many buckets"); } - return new DenseGlobalOrds(excludeDeletedDocs); + return new DenseGlobalOrds<>(this.resultStrategy, excludeDeletedDocs); }); } } @@ -193,7 +191,13 @@ public void collect(int doc, long owningBucketOrd) throws IOException { @Override public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException { - return resultStrategy.buildAggregations(owningBucketOrds); + if (valueCount == 0) { // no context in this reader + return GlobalOrdinalsStringTermsAggregator.this.buildAggregations( + Math.toIntExact(owningBucketOrds.size()), + ordIdx -> resultStrategy.buildNoValuesResult(owningBucketOrds.get(ordIdx)) + ); + } + return collectionStrategy.buildAggregations(owningBucketOrds); } @Override @@ -401,8 +405,8 @@ private void mapSegmentCountsToGlobalCounts(LongUnaryOperator mapping) throws IO * The {@link GlobalOrdinalsStringTermsAggregator} uses one of these * to collect the global ordinals by calling * {@link CollectionStrategy#collectGlobalOrd} for each global ordinal - * that it hits and then calling {@link CollectionStrategy#forEach} - * once to iterate on the results. + * that it hits and then calling {@link CollectionStrategy#buildAggregations} + * to generate the results. */ abstract static class CollectionStrategy implements Releasable { /** @@ -438,15 +442,9 @@ abstract static class CollectionStrategy implements Releasable { abstract long globalOrdToBucketOrd(long owningBucketOrd, long globalOrd); /** - * Iterate all of the buckets. Implementations take into account - * the {@link BucketCountThresholds}. In particular, - * if the {@link BucketCountThresholds#getMinDocCount()} is 0 then - * they'll make sure to iterate a bucket even if it was never - * {{@link #collectGlobalOrd collected}. - * If {@link BucketCountThresholds#getMinDocCount()} is not 0 then - * they'll skip all global ords that weren't collected. + * Create the aggregation result */ - abstract void forEach(long owningBucketOrd, BucketInfoConsumer consumer) throws IOException; + abstract InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException; } interface BucketInfoConsumer { @@ -457,12 +455,17 @@ interface BucketInfoConsumer { * {@linkplain CollectionStrategy} that just uses the global ordinal as the * bucket ordinal. */ - class DenseGlobalOrds extends CollectionStrategy { + class DenseGlobalOrds< + R extends InternalAggregation, + B extends InternalMultiBucketAggregation.InternalBucket, + TB extends InternalMultiBucketAggregation.InternalBucket> extends CollectionStrategy { private final boolean excludeDeletedDocs; + private final ResultStrategy collectionStrategy; - DenseGlobalOrds(boolean excludeDeletedDocs) { + DenseGlobalOrds(ResultStrategy collectionStrategy, boolean excludeDeletedDocs) { this.excludeDeletedDocs = excludeDeletedDocs; + this.collectionStrategy = collectionStrategy; } @Override @@ -492,9 +495,7 @@ long globalOrdToBucketOrd(long owningBucketOrd, long globalOrd) { return globalOrd; } - @Override - void forEach(long owningBucketOrd, BucketInfoConsumer consumer) throws IOException { - assert owningBucketOrd == 0; + private void collect(BucketInfoConsumer consumer) throws IOException { if (excludeDeletedDocs) { forEachExcludeDeletedDocs(consumer); } else { @@ -518,7 +519,7 @@ private void forEachAllowDeletedDocs(BucketInfoConsumer consumer) throws IOExcep * Excludes deleted docs in the results by cross-checking with liveDocs. */ private void forEachExcludeDeletedDocs(BucketInfoConsumer consumer) throws IOException { - try (LongHash accepted = new LongHash(20, new BigArrays(null, null, ""))) { + try (LongHash accepted = new LongHash(20, bigArrays())) { for (LeafReaderContext ctx : searcher().getTopReaderContext().leaves()) { LeafReader reader = ctx.reader(); Bits liveDocs = reader.getLiveDocs(); @@ -550,6 +551,55 @@ private void forEachExcludeDeletedDocs(BucketInfoConsumer consumer) throws IOExc @Override public void close() {} + + @Override + InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException { + assert owningBucketOrds.size() == 1 && owningBucketOrds.get(0) == 0; + try ( + LongArray otherDocCount = bigArrays().newLongArray(1, true); + ObjectArray topBucketsPreOrd = collectionStrategy.buildTopBucketsPerOrd(1) + ) { + GlobalOrdLookupFunction lookupGlobalOrd = valuesSupplier.get()::lookupOrd; + final int size = (int) Math.min(valueCount, bucketCountThresholds.getShardSize()); + try (ObjectArrayPriorityQueue ordered = collectionStrategy.buildPriorityQueue(size)) { + BucketUpdater updater = collectionStrategy.bucketUpdater(0, lookupGlobalOrd); + collect(new BucketInfoConsumer() { + TB spare = null; + + @Override + public void accept(long globalOrd, long bucketOrd, long docCount) throws IOException { + otherDocCount.increment(0, docCount); + if (docCount >= bucketCountThresholds.getShardMinDocCount()) { + if (spare == null) { + checkRealMemoryCBForInternalBucket(); + spare = collectionStrategy.buildEmptyTemporaryBucket(); + } + updater.updateBucket(spare, globalOrd, bucketOrd, docCount); + spare = ordered.insertWithOverflow(spare); + } + } + }); + + // Get the top buckets + topBucketsPreOrd.set(0, collectionStrategy.buildBuckets((int) ordered.size())); + for (int i = (int) ordered.size() - 1; i >= 0; --i) { + checkRealMemoryCBForInternalBucket(); + B bucket = collectionStrategy.convertTempBucketToRealBucket(ordered.pop(), lookupGlobalOrd); + topBucketsPreOrd.get(0)[i] = bucket; + otherDocCount.increment(0, -bucket.getDocCount()); + } + } + collectionStrategy.buildSubAggs(topBucketsPreOrd); + return GlobalOrdinalsStringTermsAggregator.this.buildAggregations( + Math.toIntExact(owningBucketOrds.size()), + ordIdx -> collectionStrategy.buildResult( + owningBucketOrds.get(ordIdx), + otherDocCount.get(ordIdx), + topBucketsPreOrd.get(ordIdx) + ) + ); + } + } } /** @@ -558,13 +608,22 @@ public void close() {} * {@link DenseGlobalOrds} when collecting every ordinal, but significantly * less when collecting only a few. */ - private class RemapGlobalOrds extends CollectionStrategy { + private class RemapGlobalOrds< + R extends InternalAggregation, + B extends InternalMultiBucketAggregation.InternalBucket, + TB extends InternalMultiBucketAggregation.InternalBucket> extends CollectionStrategy { private final LongKeyedBucketOrds bucketOrds; private final boolean excludeDeletedDocs; + private final ResultStrategy collectionStrategy; - private RemapGlobalOrds(CardinalityUpperBound cardinality, boolean excludeDeletedDocs) { + private RemapGlobalOrds( + ResultStrategy collectionStrategy, + CardinalityUpperBound cardinality, + boolean excludeDeletedDocs + ) { bucketOrds = LongKeyedBucketOrds.buildForValueRange(bigArrays(), cardinality, 0, valueCount - 1); this.excludeDeletedDocs = excludeDeletedDocs; + this.collectionStrategy = collectionStrategy; } @Override @@ -596,30 +655,14 @@ long globalOrdToBucketOrd(long owningBucketOrd, long globalOrd) { return bucketOrds.find(owningBucketOrd, globalOrd); } - @Override - void forEach(long owningBucketOrd, BucketInfoConsumer consumer) throws IOException { + private void collectZeroDocEntriesIfNeeded(long owningBucketOrd) throws IOException { if (excludeDeletedDocs) { - forEachExcludeDeletedDocs(owningBucketOrd, consumer); - } else { - forEachAllowDeletedDocs(owningBucketOrd, consumer); - } - } - - void forEachAllowDeletedDocs(long owningBucketOrd, BucketInfoConsumer consumer) throws IOException { - if (bucketCountThresholds.getMinDocCount() == 0) { + forEachExcludeDeletedDocs(owningBucketOrd); + } else if (bucketCountThresholds.getMinDocCount() == 0) { for (long globalOrd = 0; globalOrd < valueCount; globalOrd++) { - if (false == acceptedGlobalOrdinals.test(globalOrd)) { - continue; - } - addBucketForMinDocCountZero(owningBucketOrd, globalOrd, consumer, null); - } - } else { - LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrd); - while (ordsEnum.next()) { - if (false == acceptedGlobalOrdinals.test(ordsEnum.value())) { - continue; + if (acceptedGlobalOrdinals.test(globalOrd)) { + bucketOrds.add(owningBucketOrd, globalOrd); } - consumer.accept(ordsEnum.value(), ordsEnum.ord(), bucketDocCount(ordsEnum.ord())); } } } @@ -627,9 +670,9 @@ void forEachAllowDeletedDocs(long owningBucketOrd, BucketInfoConsumer consumer) /** * Excludes deleted docs in the results by cross-checking with liveDocs. */ - void forEachExcludeDeletedDocs(long owningBucketOrd, BucketInfoConsumer consumer) throws IOException { + private void forEachExcludeDeletedDocs(long owningBucketOrd) throws IOException { assert bucketCountThresholds.getMinDocCount() == 0; - try (LongHash accepted = new LongHash(20, new BigArrays(null, null, ""))) { + try (LongHash accepted = new LongHash(20, bigArrays())) { for (LeafReaderContext ctx : searcher().getTopReaderContext().leaves()) { LeafReader reader = ctx.reader(); Bits liveDocs = reader.getLiveDocs(); @@ -646,7 +689,8 @@ void forEachExcludeDeletedDocs(long owningBucketOrd, BucketInfoConsumer consumer if (false == acceptedGlobalOrdinals.test(globalOrd)) { continue; } - addBucketForMinDocCountZero(owningBucketOrd, globalOrd, consumer, accepted); + bucketOrds.add(owningBucketOrd, globalOrd); + accepted.add(globalOrd); } } } @@ -655,110 +699,71 @@ void forEachExcludeDeletedDocs(long owningBucketOrd, BucketInfoConsumer consumer } } - private void addBucketForMinDocCountZero( - long owningBucketOrd, - long globalOrd, - BucketInfoConsumer consumer, - @Nullable LongHash accepted - ) throws IOException { - /* - * Use `add` instead of `find` here to assign an ordinal - * even if the global ord wasn't found so we can build - * sub-aggregations without trouble even though we haven't - * hit any documents for them. This is wasteful, but - * settings minDocCount == 0 is wasteful in general..... - */ - long bucketOrd = bucketOrds.add(owningBucketOrd, globalOrd); - long docCount; - if (bucketOrd < 0) { - bucketOrd = -1 - bucketOrd; - docCount = bucketDocCount(bucketOrd); - } else { - docCount = 0; - } - assert globalOrd >= 0; - consumer.accept(globalOrd, bucketOrd, docCount); - if (accepted != null) { - accepted.add(globalOrd); - } - } - @Override public void close() { bucketOrds.close(); } - } - - /** - * Strategy for building results. - */ - abstract class ResultStrategy< - R extends InternalAggregation, - B extends InternalMultiBucketAggregation.InternalBucket, - TB extends InternalMultiBucketAggregation.InternalBucket> implements Releasable { - - private InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException { - if (valueCount == 0) { // no context in this reader - return GlobalOrdinalsStringTermsAggregator.this.buildAggregations( - Math.toIntExact(owningBucketOrds.size()), - ordIdx -> buildNoValuesResult(owningBucketOrds.get(ordIdx)) - ); - } + @Override + InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException { try ( LongArray otherDocCount = bigArrays().newLongArray(owningBucketOrds.size(), true); - ObjectArray topBucketsPreOrd = buildTopBucketsPerOrd(owningBucketOrds.size()) + ObjectArray topBucketsPreOrd = collectionStrategy.buildTopBucketsPerOrd(owningBucketOrds.size()) ) { GlobalOrdLookupFunction lookupGlobalOrd = valuesSupplier.get()::lookupOrd; for (long ordIdx = 0; ordIdx < topBucketsPreOrd.size(); ordIdx++) { - final int size; - if (bucketCountThresholds.getMinDocCount() == 0) { - // if minDocCount == 0 then we can end up with more buckets then maxBucketOrd() returns - size = (int) Math.min(valueCount, bucketCountThresholds.getShardSize()); - } else { - size = (int) Math.min(maxBucketOrd(), bucketCountThresholds.getShardSize()); - } - try (ObjectArrayPriorityQueue ordered = buildPriorityQueue(size)) { - final long finalOrdIdx = ordIdx; - final long owningBucketOrd = owningBucketOrds.get(ordIdx); - BucketUpdater updater = bucketUpdater(owningBucketOrd, lookupGlobalOrd); - collectionStrategy.forEach(owningBucketOrd, new BucketInfoConsumer() { - TB spare = null; - - @Override - public void accept(long globalOrd, long bucketOrd, long docCount) throws IOException { - otherDocCount.increment(finalOrdIdx, docCount); - if (docCount >= bucketCountThresholds.getShardMinDocCount()) { - if (spare == null) { - checkRealMemoryCBForInternalBucket(); - spare = buildEmptyTemporaryBucket(); - } - updater.updateBucket(spare, globalOrd, bucketOrd, docCount); - spare = ordered.insertWithOverflow(spare); - } + long owningBucketOrd = owningBucketOrds.get(ordIdx); + collectZeroDocEntriesIfNeeded(owningBucketOrds.get(ordIdx)); + int size = (int) Math.min(bucketOrds.bucketsInOrd(owningBucketOrd), bucketCountThresholds.getShardSize()); + try (ObjectArrayPriorityQueue ordered = collectionStrategy.buildPriorityQueue(size)) { + BucketUpdater updater = collectionStrategy.bucketUpdater(owningBucketOrd, lookupGlobalOrd); + LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrd); + TB spare = null; + while (ordsEnum.next()) { + long docCount = bucketDocCount(ordsEnum.ord()); + otherDocCount.increment(ordIdx, docCount); + if (docCount < bucketCountThresholds.getShardMinDocCount()) { + continue; } - }); - + if (spare == null) { + checkRealMemoryCBForInternalBucket(); + spare = collectionStrategy.buildEmptyTemporaryBucket(); + } + updater.updateBucket(spare, ordsEnum.value(), ordsEnum.ord(), docCount); + spare = ordered.insertWithOverflow(spare); + } // Get the top buckets - topBucketsPreOrd.set(ordIdx, buildBuckets((int) ordered.size())); + topBucketsPreOrd.set(ordIdx, collectionStrategy.buildBuckets((int) ordered.size())); for (int i = (int) ordered.size() - 1; i >= 0; --i) { checkRealMemoryCBForInternalBucket(); - B bucket = convertTempBucketToRealBucket(ordered.pop(), lookupGlobalOrd); + B bucket = collectionStrategy.convertTempBucketToRealBucket(ordered.pop(), lookupGlobalOrd); topBucketsPreOrd.get(ordIdx)[i] = bucket; otherDocCount.increment(ordIdx, -bucket.getDocCount()); } } } - - buildSubAggs(topBucketsPreOrd); - + collectionStrategy.buildSubAggs(topBucketsPreOrd); return GlobalOrdinalsStringTermsAggregator.this.buildAggregations( Math.toIntExact(owningBucketOrds.size()), - ordIdx -> buildResult(owningBucketOrds.get(ordIdx), otherDocCount.get(ordIdx), topBucketsPreOrd.get(ordIdx)) + ordIdx -> collectionStrategy.buildResult( + owningBucketOrds.get(ordIdx), + otherDocCount.get(ordIdx), + topBucketsPreOrd.get(ordIdx) + ) ); } } + } + + /** + * Strategy for building results. + */ + abstract class ResultStrategy< + R extends InternalAggregation, + B extends InternalMultiBucketAggregation.InternalBucket, + TB extends InternalMultiBucketAggregation.InternalBucket> implements Releasable { + /** * Short description of the collection mechanism added to the profile * output to help with debugging. @@ -780,7 +785,7 @@ public void accept(long globalOrd, long bucketOrd, long docCount) throws IOExcep * Update fields in {@code spare} to reflect information collected for * this bucket ordinal. */ - abstract BucketUpdater bucketUpdater(long owningBucketOrd, GlobalOrdLookupFunction lookupGlobalOrd) throws IOException; + abstract BucketUpdater bucketUpdater(long owningBucketOrd, GlobalOrdLookupFunction lookupGlobalOrd); /** * Build a {@link PriorityQueue} to sort the buckets. After we've @@ -862,7 +867,7 @@ OrdBucket buildEmptyTemporaryBucket() { } @Override - BucketUpdater bucketUpdater(long owningBucketOrd, GlobalOrdLookupFunction lookupGlobalOrd) throws IOException { + BucketUpdater bucketUpdater(long owningBucketOrd, GlobalOrdLookupFunction lookupGlobalOrd) { return (spare, globalOrd, bucketOrd, docCount) -> { spare.globalOrd = globalOrd; spare.bucketOrd = bucketOrd;