From ceeafcc0937c4c8b0e2ea4ac48fa93fed872df8d Mon Sep 17 00:00:00 2001 From: Ignacio Vera Date: Thu, 5 Dec 2024 16:33:11 +0100 Subject: [PATCH 1/3] Remove bucketOrd field from InternalTerms and friends (#118044) (#118074) The field bucketOrd is only used for building the aggregation but has no use after that. --- .../search/aggregations/BucketOrder.java | 8 +- .../search/aggregations/InternalOrder.java | 21 +-- .../countedterms/CountedTermsAggregator.java | 89 ++++++----- .../bucket/terms/BucketPriorityQueue.java | 8 +- .../BucketSignificancePriorityQueue.java | 6 +- .../GlobalOrdinalsStringTermsAggregator.java | 144 +++++++++++------- .../terms/InternalSignificantTerms.java | 15 +- .../bucket/terms/InternalTerms.java | 10 -- .../terms/MapStringTermsAggregator.java | 103 +++++++------ .../bucket/terms/NumericTermsAggregator.java | 115 ++++++++------ .../bucket/terms/TermsAggregator.java | 6 +- .../bucket/terms/TermsAggregatorFactory.java | 6 +- .../multiterms/InternalMultiTerms.java | 3 - .../multiterms/MultiTermsAggregator.java | 103 +++++++------ 14 files changed, 355 insertions(+), 282 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/BucketOrder.java b/server/src/main/java/org/elasticsearch/search/aggregations/BucketOrder.java index 2d360705f75b..c412ecb5d636 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/BucketOrder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/BucketOrder.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket; +import org.elasticsearch.search.aggregations.bucket.terms.BucketAndOrd; import org.elasticsearch.search.aggregations.support.AggregationPath; import org.elasticsearch.xcontent.ToXContentObject; @@ -20,13 +21,12 @@ import java.util.Comparator; import java.util.List; import java.util.function.BiFunction; -import java.util.function.ToLongFunction; /** * {@link Bucket} ordering strategy. Buckets can be order either as * "complete" buckets using {@link #comparator()} or against a combination * of the buckets internals with its ordinal with - * {@link #partiallyBuiltBucketComparator(ToLongFunction, Aggregator)}. + * {@link #partiallyBuiltBucketComparator(Aggregator)}. */ public abstract class BucketOrder implements ToXContentObject, Writeable { /** @@ -102,7 +102,7 @@ public final void validate(Aggregator aggregator) throws AggregationExecutionExc * to validate this order because doing so checks all of the appropriate * paths. */ - partiallyBuiltBucketComparator(null, aggregator); + partiallyBuiltBucketComparator(aggregator); } /** @@ -121,7 +121,7 @@ public final void validate(Aggregator aggregator) throws AggregationExecutionExc * with it all the time. *

*/ - public abstract Comparator partiallyBuiltBucketComparator(ToLongFunction ordinalReader, Aggregator aggregator); + public abstract Comparator> partiallyBuiltBucketComparator(Aggregator aggregator); /** * Build a comparator for fully built buckets. diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/InternalOrder.java b/server/src/main/java/org/elasticsearch/search/aggregations/InternalOrder.java index 043fab6f4f12..74534c275d11 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/InternalOrder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/InternalOrder.java @@ -16,6 +16,7 @@ import org.elasticsearch.core.RestApiVersion; import org.elasticsearch.search.aggregations.Aggregator.BucketComparator; import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket; +import org.elasticsearch.search.aggregations.bucket.terms.BucketAndOrd; import org.elasticsearch.search.aggregations.support.AggregationPath; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.search.sort.SortValue; @@ -31,7 +32,6 @@ import java.util.List; import java.util.Objects; import java.util.function.BiFunction; -import java.util.function.ToLongFunction; /** * Implementations for {@link Bucket} ordering strategies. @@ -64,10 +64,10 @@ public AggregationPath path() { } @Override - public Comparator partiallyBuiltBucketComparator(ToLongFunction ordinalReader, Aggregator aggregator) { + public Comparator> partiallyBuiltBucketComparator(Aggregator aggregator) { try { BucketComparator bucketComparator = path.bucketComparator(aggregator, order); - return (lhs, rhs) -> bucketComparator.compare(ordinalReader.applyAsLong(lhs), ordinalReader.applyAsLong(rhs)); + return (lhs, rhs) -> bucketComparator.compare(lhs.ord, rhs.ord); } catch (IllegalArgumentException e) { throw new AggregationExecutionException.InvalidPath("Invalid aggregation order path [" + path + "]. " + e.getMessage(), e); } @@ -189,12 +189,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } @Override - public Comparator partiallyBuiltBucketComparator(ToLongFunction ordinalReader, Aggregator aggregator) { - List> comparators = orderElements.stream() - .map(oe -> oe.partiallyBuiltBucketComparator(ordinalReader, aggregator)) - .toList(); + public Comparator> partiallyBuiltBucketComparator(Aggregator aggregator) { + List>> comparators = new ArrayList<>(orderElements.size()); + for (BucketOrder order : orderElements) { + comparators.add(order.partiallyBuiltBucketComparator(aggregator)); + } return (lhs, rhs) -> { - for (Comparator c : comparators) { + for (Comparator> c : comparators) { int result = c.compare(lhs, rhs); if (result != 0) { return result; @@ -300,9 +301,9 @@ byte id() { } @Override - public Comparator partiallyBuiltBucketComparator(ToLongFunction ordinalReader, Aggregator aggregator) { + public Comparator> partiallyBuiltBucketComparator(Aggregator aggregator) { Comparator comparator = comparator(); - return comparator::compare; + return (lhs, rhs) -> comparator.compare(lhs.bucket, rhs.bucket); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/countedterms/CountedTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/countedterms/CountedTermsAggregator.java index 192b0b3d7323..310fcd4fb611 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/countedterms/CountedTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/countedterms/CountedTermsAggregator.java @@ -13,6 +13,7 @@ import org.apache.lucene.index.SortedDocValues; import org.apache.lucene.index.SortedSetDocValues; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.util.IntArray; import org.elasticsearch.common.util.LongArray; import org.elasticsearch.common.util.ObjectArray; import org.elasticsearch.core.Releasables; @@ -26,6 +27,7 @@ import org.elasticsearch.search.aggregations.InternalOrder; import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; +import org.elasticsearch.search.aggregations.bucket.terms.BucketAndOrd; import org.elasticsearch.search.aggregations.bucket.terms.BucketPriorityQueue; import org.elasticsearch.search.aggregations.bucket.terms.BytesKeyedBucketOrds; import org.elasticsearch.search.aggregations.bucket.terms.InternalTerms; @@ -38,7 +40,6 @@ import java.util.Arrays; import java.util.Map; import java.util.function.BiConsumer; -import java.util.function.Supplier; import static java.util.Collections.emptyList; import static org.apache.lucene.index.SortedSetDocValues.NO_MORE_ORDS; @@ -115,51 +116,57 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw LongArray otherDocCounts = bigArrays().newLongArray(owningBucketOrds.size()); ObjectArray topBucketsPerOrd = bigArrays().newObjectArray(owningBucketOrds.size()) ) { - for (long ordIdx = 0; ordIdx < topBucketsPerOrd.size(); ordIdx++) { - int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize()); - - // as users can't control sort order, in practice we'll always sort by doc count descending - try ( - BucketPriorityQueue ordered = new BucketPriorityQueue<>( - size, - bigArrays(), - partiallyBuiltBucketComparator - ) - ) { - StringTerms.Bucket spare = null; - BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds.get(ordIdx)); - Supplier emptyBucketBuilder = () -> new StringTerms.Bucket( - new BytesRef(), - 0, - null, - false, - 0, - format - ); - while (ordsEnum.next()) { - long docCount = bucketDocCount(ordsEnum.ord()); - otherDocCounts.increment(ordIdx, docCount); - if (spare == null) { - checkRealMemoryCBForInternalBucket(); - spare = emptyBucketBuilder.get(); + try (IntArray bucketsToCollect = bigArrays().newIntArray(owningBucketOrds.size())) { + // find how many buckets we are going to collect + long ordsToCollect = 0; + for (long ordIdx = 0; ordIdx < owningBucketOrds.size(); ordIdx++) { + int size = (int) Math.min(bucketOrds.bucketsInOrd(owningBucketOrds.get(ordIdx)), bucketCountThresholds.getShardSize()); + bucketsToCollect.set(ordIdx, size); + ordsToCollect += size; + } + try (LongArray ordsArray = bigArrays().newLongArray(ordsToCollect)) { + long ordsCollected = 0; + for (long ordIdx = 0; ordIdx < owningBucketOrds.size(); ordIdx++) { + // as users can't control sort order, in practice we'll always sort by doc count descending + try ( + BucketPriorityQueue ordered = new BucketPriorityQueue<>( + bucketsToCollect.get(ordIdx), + bigArrays(), + order.partiallyBuiltBucketComparator(this) + ) + ) { + BucketAndOrd spare = null; + BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds.get(ordIdx)); + while (ordsEnum.next()) { + long docCount = bucketDocCount(ordsEnum.ord()); + otherDocCounts.increment(ordIdx, docCount); + if (spare == null) { + checkRealMemoryCBForInternalBucket(); + spare = new BucketAndOrd<>(new StringTerms.Bucket(new BytesRef(), 0, null, false, 0, format)); + } + ordsEnum.readValue(spare.bucket.getTermBytes()); + spare.bucket.setDocCount(docCount); + spare.ord = ordsEnum.ord(); + spare = ordered.insertWithOverflow(spare); + } + final int orderedSize = (int) ordered.size(); + final StringTerms.Bucket[] buckets = new StringTerms.Bucket[orderedSize]; + for (int i = orderedSize - 1; i >= 0; --i) { + BucketAndOrd bucketAndOrd = ordered.pop(); + buckets[i] = bucketAndOrd.bucket; + ordsArray.set(ordsCollected + i, bucketAndOrd.ord); + otherDocCounts.increment(ordIdx, -bucketAndOrd.bucket.getDocCount()); + bucketAndOrd.bucket.setTermBytes(BytesRef.deepCopyOf(bucketAndOrd.bucket.getTermBytes())); + } + topBucketsPerOrd.set(ordIdx, buckets); + ordsCollected += orderedSize; } - ordsEnum.readValue(spare.getTermBytes()); - spare.setDocCount(docCount); - spare.setBucketOrd(ordsEnum.ord()); - spare = ordered.insertWithOverflow(spare); - } - - topBucketsPerOrd.set(ordIdx, new StringTerms.Bucket[(int) ordered.size()]); - for (int i = (int) ordered.size() - 1; i >= 0; --i) { - topBucketsPerOrd.get(ordIdx)[i] = ordered.pop(); - otherDocCounts.increment(ordIdx, -topBucketsPerOrd.get(ordIdx)[i].getDocCount()); - topBucketsPerOrd.get(ordIdx)[i].setTermBytes(BytesRef.deepCopyOf(topBucketsPerOrd.get(ordIdx)[i].getTermBytes())); } + assert ordsCollected == ordsArray.size(); + buildSubAggsForAllBuckets(topBucketsPerOrd, ordsArray, InternalTerms.Bucket::setAggregations); } } - buildSubAggsForAllBuckets(topBucketsPerOrd, InternalTerms.Bucket::getBucketOrd, InternalTerms.Bucket::setAggregations); - return buildAggregations(Math.toIntExact(owningBucketOrds.size()), ordIdx -> { final BucketOrder reduceOrder; if (isKeyOrder(order) == false) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/BucketPriorityQueue.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/BucketPriorityQueue.java index 7f8e5c8c885f..9550003a5bd1 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/BucketPriorityQueue.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/BucketPriorityQueue.java @@ -13,17 +13,17 @@ import java.util.Comparator; -public class BucketPriorityQueue extends ObjectArrayPriorityQueue { +public class BucketPriorityQueue extends ObjectArrayPriorityQueue> { - private final Comparator comparator; + private final Comparator> comparator; - public BucketPriorityQueue(int size, BigArrays bigArrays, Comparator comparator) { + public BucketPriorityQueue(int size, BigArrays bigArrays, Comparator> comparator) { super(size, bigArrays); this.comparator = comparator; } @Override - protected boolean lessThan(B a, B b) { + protected boolean lessThan(BucketAndOrd a, BucketAndOrd b) { return comparator.compare(a, b) > 0; // reverse, since we reverse again when adding to a list } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/BucketSignificancePriorityQueue.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/BucketSignificancePriorityQueue.java index fe751c9e7918..4736f52d9362 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/BucketSignificancePriorityQueue.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/BucketSignificancePriorityQueue.java @@ -12,14 +12,14 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.ObjectArrayPriorityQueue; -public class BucketSignificancePriorityQueue extends ObjectArrayPriorityQueue { +public class BucketSignificancePriorityQueue extends ObjectArrayPriorityQueue> { public BucketSignificancePriorityQueue(int size, BigArrays bigArrays) { super(size, bigArrays); } @Override - protected boolean lessThan(SignificantTerms.Bucket o1, SignificantTerms.Bucket o2) { - return o1.getSignificanceScore() < o2.getSignificanceScore(); + protected boolean lessThan(BucketAndOrd o1, BucketAndOrd o2) { + return o1.bucket.getSignificanceScore() < o2.bucket.getSignificanceScore(); } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java index 037870016a5f..ee472bb2050a 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java @@ -20,6 +20,7 @@ import org.apache.lucene.util.PriorityQueue; import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.util.IntArray; import org.elasticsearch.common.util.LongArray; import org.elasticsearch.common.util.LongHash; import org.elasticsearch.common.util.ObjectArray; @@ -558,10 +559,10 @@ InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOExc ) { GlobalOrdLookupFunction lookupGlobalOrd = valuesSupplier.get()::lookupOrd; final int size = (int) Math.min(valueCount, bucketCountThresholds.getShardSize()); - try (ObjectArrayPriorityQueue ordered = collectionStrategy.buildPriorityQueue(size)) { + try (ObjectArrayPriorityQueue> ordered = collectionStrategy.buildPriorityQueue(size)) { BucketUpdater updater = collectionStrategy.bucketUpdater(0, lookupGlobalOrd); collect(new BucketInfoConsumer() { - TB spare = null; + BucketAndOrd spare = null; @Override public void accept(long globalOrd, long bucketOrd, long docCount) throws IOException { @@ -569,24 +570,31 @@ public void accept(long globalOrd, long bucketOrd, long docCount) throws IOExcep if (docCount >= bucketCountThresholds.getShardMinDocCount()) { if (spare == null) { checkRealMemoryCBForInternalBucket(); - spare = collectionStrategy.buildEmptyTemporaryBucket(); + spare = new BucketAndOrd<>(collectionStrategy.buildEmptyTemporaryBucket()); } - updater.updateBucket(spare, globalOrd, bucketOrd, docCount); + spare.ord = bucketOrd; + updater.updateBucket(spare.bucket, globalOrd, docCount); spare = ordered.insertWithOverflow(spare); } } }); // Get the top buckets - topBucketsPreOrd.set(0, collectionStrategy.buildBuckets((int) ordered.size())); - for (int i = (int) ordered.size() - 1; i >= 0; --i) { - checkRealMemoryCBForInternalBucket(); - B bucket = collectionStrategy.convertTempBucketToRealBucket(ordered.pop(), lookupGlobalOrd); - topBucketsPreOrd.get(0)[i] = bucket; - otherDocCount.increment(0, -bucket.getDocCount()); + int orderedSize = (int) ordered.size(); + try (LongArray ordsArray = bigArrays().newLongArray(orderedSize)) { + B[] buckets = collectionStrategy.buildBuckets(orderedSize); + for (int i = orderedSize - 1; i >= 0; --i) { + checkRealMemoryCBForInternalBucket(); + BucketAndOrd bucketAndOrd = ordered.pop(); + B bucket = collectionStrategy.convertTempBucketToRealBucket(bucketAndOrd.bucket, lookupGlobalOrd); + ordsArray.set(i, bucketAndOrd.ord); + buckets[i] = bucket; + otherDocCount.increment(0, -bucket.getDocCount()); + } + topBucketsPreOrd.set(0, buckets); + collectionStrategy.buildSubAggs(topBucketsPreOrd, ordsArray); } } - collectionStrategy.buildSubAggs(topBucketsPreOrd); return GlobalOrdinalsStringTermsAggregator.this.buildAggregations( Math.toIntExact(owningBucketOrds.size()), ordIdx -> collectionStrategy.buildResult( @@ -706,39 +714,61 @@ InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOExc LongArray otherDocCount = bigArrays().newLongArray(owningBucketOrds.size(), true); ObjectArray topBucketsPreOrd = collectionStrategy.buildTopBucketsPerOrd(owningBucketOrds.size()) ) { - GlobalOrdLookupFunction lookupGlobalOrd = valuesSupplier.get()::lookupOrd; - for (long ordIdx = 0; ordIdx < topBucketsPreOrd.size(); ordIdx++) { - long owningBucketOrd = owningBucketOrds.get(ordIdx); - collectZeroDocEntriesIfNeeded(owningBucketOrds.get(ordIdx)); - int size = (int) Math.min(bucketOrds.bucketsInOrd(owningBucketOrd), bucketCountThresholds.getShardSize()); - try (ObjectArrayPriorityQueue ordered = collectionStrategy.buildPriorityQueue(size)) { - BucketUpdater updater = collectionStrategy.bucketUpdater(owningBucketOrd, lookupGlobalOrd); - LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrd); - TB spare = null; - while (ordsEnum.next()) { - long docCount = bucketDocCount(ordsEnum.ord()); - otherDocCount.increment(ordIdx, docCount); - if (docCount < bucketCountThresholds.getShardMinDocCount()) { - continue; - } - if (spare == null) { - checkRealMemoryCBForInternalBucket(); - spare = collectionStrategy.buildEmptyTemporaryBucket(); + try (IntArray bucketsToCollect = bigArrays().newIntArray(owningBucketOrds.size())) { + long ordsToCollect = 0; + for (long ordIdx = 0; ordIdx < owningBucketOrds.size(); ordIdx++) { + final long owningBucketOrd = owningBucketOrds.get(ordIdx); + collectZeroDocEntriesIfNeeded(owningBucketOrd); + final int size = (int) Math.min(bucketOrds.bucketsInOrd(owningBucketOrd), bucketCountThresholds.getShardSize()); + ordsToCollect += size; + bucketsToCollect.set(ordIdx, size); + } + try (LongArray ordsArray = bigArrays().newLongArray(ordsToCollect)) { + long ordsCollected = 0; + GlobalOrdLookupFunction lookupGlobalOrd = valuesSupplier.get()::lookupOrd; + for (long ordIdx = 0; ordIdx < topBucketsPreOrd.size(); ordIdx++) { + long owningBucketOrd = owningBucketOrds.get(ordIdx); + try ( + ObjectArrayPriorityQueue> ordered = collectionStrategy.buildPriorityQueue( + bucketsToCollect.get(ordIdx) + ) + ) { + BucketUpdater updater = collectionStrategy.bucketUpdater(owningBucketOrd, lookupGlobalOrd); + LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrd); + BucketAndOrd spare = null; + while (ordsEnum.next()) { + long docCount = bucketDocCount(ordsEnum.ord()); + otherDocCount.increment(ordIdx, docCount); + if (docCount < bucketCountThresholds.getShardMinDocCount()) { + continue; + } + if (spare == null) { + checkRealMemoryCBForInternalBucket(); + spare = new BucketAndOrd<>(collectionStrategy.buildEmptyTemporaryBucket()); + } + updater.updateBucket(spare.bucket, ordsEnum.value(), docCount); + spare.ord = ordsEnum.ord(); + spare = ordered.insertWithOverflow(spare); + } + // Get the top buckets + int orderedSize = (int) ordered.size(); + B[] buckets = collectionStrategy.buildBuckets(orderedSize); + for (int i = orderedSize - 1; i >= 0; --i) { + checkRealMemoryCBForInternalBucket(); + BucketAndOrd bucketAndOrd = ordered.pop(); + B bucket = collectionStrategy.convertTempBucketToRealBucket(bucketAndOrd.bucket, lookupGlobalOrd); + ordsArray.set(ordsCollected + i, bucketAndOrd.ord); + buckets[i] = bucket; + otherDocCount.increment(ordIdx, -bucket.getDocCount()); + } + topBucketsPreOrd.set(ordIdx, buckets); + ordsCollected += orderedSize; } - updater.updateBucket(spare, ordsEnum.value(), ordsEnum.ord(), docCount); - spare = ordered.insertWithOverflow(spare); - } - // Get the top buckets - topBucketsPreOrd.set(ordIdx, collectionStrategy.buildBuckets((int) ordered.size())); - for (int i = (int) ordered.size() - 1; i >= 0; --i) { - checkRealMemoryCBForInternalBucket(); - B bucket = collectionStrategy.convertTempBucketToRealBucket(ordered.pop(), lookupGlobalOrd); - topBucketsPreOrd.get(ordIdx)[i] = bucket; - otherDocCount.increment(ordIdx, -bucket.getDocCount()); } + assert ordsCollected == ordsArray.size(); + collectionStrategy.buildSubAggs(topBucketsPreOrd, ordsArray); } } - collectionStrategy.buildSubAggs(topBucketsPreOrd); return GlobalOrdinalsStringTermsAggregator.this.buildAggregations( Math.toIntExact(owningBucketOrds.size()), ordIdx -> collectionStrategy.buildResult( @@ -787,7 +817,7 @@ abstract class ResultStrategy< * Build a {@link PriorityQueue} to sort the buckets. After we've * collected all of the buckets we'll collect all entries in the queue. */ - abstract ObjectArrayPriorityQueue buildPriorityQueue(int size); + abstract ObjectArrayPriorityQueue> buildPriorityQueue(int size); /** * Build an array to hold the "top" buckets for each ordinal. @@ -809,7 +839,7 @@ abstract class ResultStrategy< * Build the sub-aggregations into the buckets. This will usually * delegate to {@link #buildSubAggsForAllBuckets}. */ - abstract void buildSubAggs(ObjectArray topBucketsPreOrd) throws IOException; + abstract void buildSubAggs(ObjectArray topBucketsPreOrd, LongArray ordsArray) throws IOException; /** * Turn the buckets into an aggregation result. @@ -830,7 +860,7 @@ abstract class ResultStrategy< } interface BucketUpdater { - void updateBucket(TB spare, long globalOrd, long bucketOrd, long docCount) throws IOException; + void updateBucket(TB spare, long globalOrd, long docCount) throws IOException; } /** @@ -864,29 +894,30 @@ OrdBucket buildEmptyTemporaryBucket() { @Override BucketUpdater bucketUpdater(long owningBucketOrd, GlobalOrdLookupFunction lookupGlobalOrd) { - return (spare, globalOrd, bucketOrd, docCount) -> { + return (spare, globalOrd, docCount) -> { spare.globalOrd = globalOrd; - spare.bucketOrd = bucketOrd; spare.docCount = docCount; }; } @Override - ObjectArrayPriorityQueue buildPriorityQueue(int size) { - return new BucketPriorityQueue<>(size, bigArrays(), partiallyBuiltBucketComparator); + ObjectArrayPriorityQueue> buildPriorityQueue(int size) { + return new BucketPriorityQueue<>( + size, + bigArrays(), + order.partiallyBuiltBucketComparator(GlobalOrdinalsStringTermsAggregator.this) + ); } @Override StringTerms.Bucket convertTempBucketToRealBucket(OrdBucket temp, GlobalOrdLookupFunction lookupGlobalOrd) throws IOException { BytesRef term = BytesRef.deepCopyOf(lookupGlobalOrd.apply(temp.globalOrd)); - StringTerms.Bucket result = new StringTerms.Bucket(term, temp.docCount, null, showTermDocCountError, 0, format); - result.bucketOrd = temp.bucketOrd; - return result; + return new StringTerms.Bucket(term, temp.docCount, null, showTermDocCountError, 0, format); } @Override - void buildSubAggs(ObjectArray topBucketsPreOrd) throws IOException { - buildSubAggsForAllBuckets(topBucketsPreOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs); + void buildSubAggs(ObjectArray topBucketsPreOrd, LongArray ordsArray) throws IOException { + buildSubAggsForAllBuckets(topBucketsPreOrd, ordsArray, (b, aggs) -> b.aggregations = aggs); } @Override @@ -1001,8 +1032,7 @@ private long subsetSize(long owningBucketOrd) { @Override BucketUpdater bucketUpdater(long owningBucketOrd, GlobalOrdLookupFunction lookupGlobalOrd) { long subsetSize = subsetSize(owningBucketOrd); - return (spare, globalOrd, bucketOrd, docCount) -> { - spare.bucketOrd = bucketOrd; + return (spare, globalOrd, docCount) -> { oversizedCopy(lookupGlobalOrd.apply(globalOrd), spare.termBytes); spare.subsetDf = docCount; spare.supersetDf = backgroundFrequencies.freq(spare.termBytes); @@ -1016,7 +1046,7 @@ BucketUpdater bucketUpdater(long owningBucketOrd, } @Override - ObjectArrayPriorityQueue buildPriorityQueue(int size) { + ObjectArrayPriorityQueue> buildPriorityQueue(int size) { return new BucketSignificancePriorityQueue<>(size, bigArrays()); } @@ -1029,8 +1059,8 @@ SignificantStringTerms.Bucket convertTempBucketToRealBucket( } @Override - void buildSubAggs(ObjectArray topBucketsPreOrd) throws IOException { - buildSubAggsForAllBuckets(topBucketsPreOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs); + void buildSubAggs(ObjectArray topBucketsPreOrd, LongArray ordsArray) throws IOException { + buildSubAggsForAllBuckets(topBucketsPreOrd, ordsArray, (b, aggs) -> b.aggregations = aggs); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalSignificantTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalSignificantTerms.java index 78ae2481f5d9..5108793b8a80 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalSignificantTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalSignificantTerms.java @@ -10,12 +10,12 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.util.ObjectArrayPriorityQueue; import org.elasticsearch.common.util.ObjectObjectPagedHashMap; import org.elasticsearch.core.Releasables; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.AggregationErrors; import org.elasticsearch.search.aggregations.AggregationReduceContext; -import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorReducer; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; @@ -58,12 +58,6 @@ public interface Reader> { long subsetDf; long supersetDf; - /** - * Ordinal of the bucket while it is being built. Not used after it is - * returned from {@link Aggregator#buildAggregations(org.elasticsearch.common.util.LongArray)} and not - * serialized. - */ - transient long bucketOrd; double score; protected InternalAggregations aggregations; final transient DocValueFormat format; @@ -235,7 +229,12 @@ canLeadReduction here is essentially checking if this shard returned data. Unma public InternalAggregation get() { final SignificanceHeuristic heuristic = getSignificanceHeuristic().rewrite(reduceContext); final int size = (int) (reduceContext.isFinalReduce() == false ? buckets.size() : Math.min(requiredSize, buckets.size())); - try (BucketSignificancePriorityQueue ordered = new BucketSignificancePriorityQueue<>(size, reduceContext.bigArrays())) { + try (ObjectArrayPriorityQueue ordered = new ObjectArrayPriorityQueue(size, reduceContext.bigArrays()) { + @Override + protected boolean lessThan(B a, B b) { + return a.getSignificanceScore() < b.getSignificanceScore(); + } + }) { buckets.forEach(entry -> { final B b = createBucket( entry.value.subsetDf[0], diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java index 739f0b923eaa..de35046691b3 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java @@ -38,8 +38,6 @@ public interface Reader> { B read(StreamInput in, DocValueFormat format, boolean showDocCountError) throws IOException; } - long bucketOrd; - protected long docCount; private long docCountError; protected InternalAggregations aggregations; @@ -88,14 +86,6 @@ public void setDocCount(long docCount) { this.docCount = docCount; } - public long getBucketOrd() { - return bucketOrd; - } - - public void setBucketOrd(long bucketOrd) { - this.bucketOrd = bucketOrd; - } - @Override public long getDocCountError() { return docCountError; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java index b96c495d3748..026912a583ef 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java @@ -17,6 +17,7 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefBuilder; import org.apache.lucene.util.PriorityQueue; +import org.elasticsearch.common.util.IntArray; import org.elasticsearch.common.util.LongArray; import org.elasticsearch.common.util.ObjectArray; import org.elasticsearch.common.util.ObjectArrayPriorityQueue; @@ -43,6 +44,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Comparator; import java.util.Map; import java.util.function.BiConsumer; import java.util.function.Function; @@ -287,40 +289,55 @@ private InternalAggregation[] buildAggregations(LongArray owningBucketOrds) thro LongArray otherDocCounts = bigArrays().newLongArray(owningBucketOrds.size(), true); ObjectArray topBucketsPerOrd = buildTopBucketsPerOrd(Math.toIntExact(owningBucketOrds.size())) ) { - for (long ordIdx = 0; ordIdx < topBucketsPerOrd.size(); ordIdx++) { - long owningOrd = owningBucketOrds.get(ordIdx); - collectZeroDocEntriesIfNeeded(owningOrd, excludeDeletedDocs); - int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize()); - - try (ObjectArrayPriorityQueue ordered = buildPriorityQueue(size)) { - B spare = null; - BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningOrd); - BucketUpdater bucketUpdater = bucketUpdater(owningOrd); - while (ordsEnum.next()) { - long docCount = bucketDocCount(ordsEnum.ord()); - otherDocCounts.increment(ordIdx, docCount); - if (docCount < bucketCountThresholds.getShardMinDocCount()) { - continue; - } - if (spare == null) { - checkRealMemoryCBForInternalBucket(); - spare = buildEmptyBucket(); + try (IntArray bucketsToCollect = bigArrays().newIntArray(owningBucketOrds.size())) { + long ordsToCollect = 0; + for (long ordIdx = 0; ordIdx < owningBucketOrds.size(); ordIdx++) { + final long owningBucketOrd = owningBucketOrds.get(ordIdx); + collectZeroDocEntriesIfNeeded(owningBucketOrd, excludeDeletedDocs); + final int size = (int) Math.min(bucketOrds.bucketsInOrd(owningBucketOrd), bucketCountThresholds.getShardSize()); + ordsToCollect += size; + bucketsToCollect.set(ordIdx, size); + } + try (LongArray ordsArray = bigArrays().newLongArray(ordsToCollect)) { + long ordsCollected = 0; + for (long ordIdx = 0; ordIdx < topBucketsPerOrd.size(); ordIdx++) { + long owningOrd = owningBucketOrds.get(ordIdx); + try (ObjectArrayPriorityQueue> ordered = buildPriorityQueue(bucketsToCollect.get(ordIdx))) { + BucketAndOrd spare = null; + BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningOrd); + BucketUpdater bucketUpdater = bucketUpdater(owningOrd); + while (ordsEnum.next()) { + long docCount = bucketDocCount(ordsEnum.ord()); + otherDocCounts.increment(ordIdx, docCount); + if (docCount < bucketCountThresholds.getShardMinDocCount()) { + continue; + } + if (spare == null) { + checkRealMemoryCBForInternalBucket(); + spare = new BucketAndOrd<>(buildEmptyBucket()); + } + bucketUpdater.updateBucket(spare.bucket, ordsEnum, docCount); + spare.ord = ordsEnum.ord(); + spare = ordered.insertWithOverflow(spare); + } + + final int orderedSize = (int) ordered.size(); + final B[] buckets = buildBuckets(orderedSize); + for (int i = orderedSize - 1; i >= 0; --i) { + BucketAndOrd bucketAndOrd = ordered.pop(); + finalizeBucket(bucketAndOrd.bucket); + buckets[i] = bucketAndOrd.bucket; + ordsArray.set(ordsCollected + i, bucketAndOrd.ord); + otherDocCounts.increment(ordIdx, -bucketAndOrd.bucket.getDocCount()); + } + topBucketsPerOrd.set(ordIdx, buckets); + ordsCollected += orderedSize; } - bucketUpdater.updateBucket(spare, ordsEnum, docCount); - spare = ordered.insertWithOverflow(spare); - } - - topBucketsPerOrd.set(ordIdx, buildBuckets((int) ordered.size())); - for (int i = (int) ordered.size() - 1; i >= 0; --i) { - topBucketsPerOrd.get(ordIdx)[i] = ordered.pop(); - otherDocCounts.increment(ordIdx, -topBucketsPerOrd.get(ordIdx)[i].getDocCount()); - finalizeBucket(topBucketsPerOrd.get(ordIdx)[i]); } + assert ordsCollected == ordsArray.size(); + buildSubAggs(topBucketsPerOrd, ordsArray); } } - - buildSubAggs(topBucketsPerOrd); - return MapStringTermsAggregator.this.buildAggregations( Math.toIntExact(owningBucketOrds.size()), ordIdx -> buildResult(owningBucketOrds.get(ordIdx), otherDocCounts.get(ordIdx), topBucketsPerOrd.get(ordIdx)) @@ -355,7 +372,7 @@ private InternalAggregation[] buildAggregations(LongArray owningBucketOrds) thro * Build a {@link PriorityQueue} to sort the buckets. After we've * collected all of the buckets we'll collect all entries in the queue. */ - abstract ObjectArrayPriorityQueue buildPriorityQueue(int size); + abstract ObjectArrayPriorityQueue> buildPriorityQueue(int size); /** * Update fields in {@code spare} to reflect information collected for @@ -382,9 +399,9 @@ private InternalAggregation[] buildAggregations(LongArray owningBucketOrds) thro /** * Build the sub-aggregations into the buckets. This will usually - * delegate to {@link #buildSubAggsForAllBuckets}. + * delegate to {@link #buildSubAggsForAllBuckets(ObjectArray, LongArray, BiConsumer)}. */ - abstract void buildSubAggs(ObjectArray topBucketsPerOrd) throws IOException; + abstract void buildSubAggs(ObjectArray topBucketsPerOrd, LongArray ordsArray) throws IOException; /** * Turn the buckets into an aggregation result. @@ -407,9 +424,11 @@ interface BucketUpdater */ class StandardTermsResults extends ResultStrategy { private final ValuesSource valuesSource; + private final Comparator> comparator; - StandardTermsResults(ValuesSource valuesSource) { + StandardTermsResults(ValuesSource valuesSource, Aggregator aggregator) { this.valuesSource = valuesSource; + this.comparator = order.partiallyBuiltBucketComparator(aggregator); } @Override @@ -498,8 +517,8 @@ StringTerms.Bucket buildEmptyBucket() { } @Override - ObjectArrayPriorityQueue buildPriorityQueue(int size) { - return new BucketPriorityQueue<>(size, bigArrays(), partiallyBuiltBucketComparator); + ObjectArrayPriorityQueue> buildPriorityQueue(int size) { + return new BucketPriorityQueue<>(size, bigArrays(), comparator); } @Override @@ -507,7 +526,6 @@ BucketUpdater bucketUpdater(long owningBucketOrd) { return (spare, ordsEnum, docCount) -> { ordsEnum.readValue(spare.termBytes); spare.docCount = docCount; - spare.bucketOrd = ordsEnum.ord(); }; } @@ -532,8 +550,8 @@ void finalizeBucket(StringTerms.Bucket bucket) { } @Override - void buildSubAggs(ObjectArray topBucketsPerOrd) throws IOException { - buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, a) -> b.aggregations = a); + void buildSubAggs(ObjectArray topBucketsPerOrd, LongArray ordArray) throws IOException { + buildSubAggsForAllBuckets(topBucketsPerOrd, ordArray, (b, a) -> b.aggregations = a); } @Override @@ -625,7 +643,7 @@ SignificantStringTerms.Bucket buildEmptyBucket() { } @Override - ObjectArrayPriorityQueue buildPriorityQueue(int size) { + ObjectArrayPriorityQueue> buildPriorityQueue(int size) { return new BucketSignificancePriorityQueue<>(size, bigArrays()); } @@ -634,7 +652,6 @@ BucketUpdater bucketUpdater(long owningBucketOrd) long subsetSize = subsetSizes.get(owningBucketOrd); return (spare, ordsEnum, docCount) -> { ordsEnum.readValue(spare.termBytes); - spare.bucketOrd = ordsEnum.ord(); spare.subsetDf = docCount; spare.supersetDf = backgroundFrequencies.freq(spare.termBytes); /* @@ -667,8 +684,8 @@ void finalizeBucket(SignificantStringTerms.Bucket bucket) { } @Override - void buildSubAggs(ObjectArray topBucketsPerOrd) throws IOException { - buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, a) -> b.aggregations = a); + void buildSubAggs(ObjectArray topBucketsPerOrd, LongArray ordsArray) throws IOException { + buildSubAggsForAllBuckets(topBucketsPerOrd, ordsArray, (b, a) -> b.aggregations = a); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java index 5d4c15d8a3b8..a54053f712f8 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java @@ -14,6 +14,7 @@ import org.apache.lucene.index.SortedNumericDocValues; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.util.NumericUtils; +import org.elasticsearch.common.util.IntArray; import org.elasticsearch.common.util.LongArray; import org.elasticsearch.common.util.ObjectArray; import org.elasticsearch.common.util.ObjectArrayPriorityQueue; @@ -40,6 +41,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Comparator; import java.util.Map; import java.util.function.BiConsumer; import java.util.function.Function; @@ -167,42 +169,56 @@ private InternalAggregation[] buildAggregations(LongArray owningBucketOrds) thro LongArray otherDocCounts = bigArrays().newLongArray(owningBucketOrds.size(), true); ObjectArray topBucketsPerOrd = buildTopBucketsPerOrd(owningBucketOrds.size()) ) { - for (long ordIdx = 0; ordIdx < topBucketsPerOrd.size(); ordIdx++) { - final long owningBucketOrd = owningBucketOrds.get(ordIdx); - collectZeroDocEntriesIfNeeded(owningBucketOrd, excludeDeletedDocs); - long bucketsInOrd = bucketOrds.bucketsInOrd(owningBucketOrd); - - int size = (int) Math.min(bucketsInOrd, bucketCountThresholds.getShardSize()); - try (ObjectArrayPriorityQueue ordered = buildPriorityQueue(size)) { - B spare = null; - BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrd); - BucketUpdater bucketUpdater = bucketUpdater(owningBucketOrd); - while (ordsEnum.next()) { - long docCount = bucketDocCount(ordsEnum.ord()); - otherDocCounts.increment(ordIdx, docCount); - if (docCount < bucketCountThresholds.getShardMinDocCount()) { - continue; - } - if (spare == null) { - checkRealMemoryCBForInternalBucket(); - spare = buildEmptyBucket(); - } - bucketUpdater.updateBucket(spare, ordsEnum, docCount); - spare = ordered.insertWithOverflow(spare); - } + try (IntArray bucketsToCollect = bigArrays().newIntArray(owningBucketOrds.size())) { + long ordsToCollect = 0; + for (long ordIdx = 0; ordIdx < owningBucketOrds.size(); ordIdx++) { + final long owningBucketOrd = owningBucketOrds.get(ordIdx); + collectZeroDocEntriesIfNeeded(owningBucketOrd, excludeDeletedDocs); + int size = (int) Math.min(bucketOrds.bucketsInOrd(owningBucketOrd), bucketCountThresholds.getShardSize()); + bucketsToCollect.set(ordIdx, size); + ordsToCollect += size; + } + try (LongArray ordsArray = bigArrays().newLongArray(ordsToCollect)) { + long ordsCollected = 0; + for (long ordIdx = 0; ordIdx < topBucketsPerOrd.size(); ordIdx++) { + final long owningBucketOrd = owningBucketOrds.get(ordIdx); + try (ObjectArrayPriorityQueue> ordered = buildPriorityQueue(bucketsToCollect.get(ordIdx))) { + BucketAndOrd spare = null; + BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrd); + BucketUpdater bucketUpdater = bucketUpdater(owningBucketOrd); + while (ordsEnum.next()) { + long docCount = bucketDocCount(ordsEnum.ord()); + otherDocCounts.increment(ordIdx, docCount); + if (docCount < bucketCountThresholds.getShardMinDocCount()) { + continue; + } + if (spare == null) { + checkRealMemoryCBForInternalBucket(); + spare = new BucketAndOrd<>(buildEmptyBucket()); + } + bucketUpdater.updateBucket(spare.bucket, ordsEnum, docCount); + spare.ord = ordsEnum.ord(); + spare = ordered.insertWithOverflow(spare); + } + + // Get the top buckets + final int orderedSize = (int) ordered.size(); + final B[] bucketsForOrd = buildBuckets(orderedSize); + for (int b = orderedSize - 1; b >= 0; --b) { + BucketAndOrd bucketAndOrd = ordered.pop(); + bucketsForOrd[b] = bucketAndOrd.bucket; + ordsArray.set(ordsCollected + b, bucketAndOrd.ord); + otherDocCounts.increment(ordIdx, -bucketAndOrd.bucket.getDocCount()); + } + topBucketsPerOrd.set(ordIdx, bucketsForOrd); + ordsCollected += orderedSize; - // Get the top buckets - B[] bucketsForOrd = buildBuckets((int) ordered.size()); - topBucketsPerOrd.set(ordIdx, bucketsForOrd); - for (int b = (int) ordered.size() - 1; b >= 0; --b) { - topBucketsPerOrd.get(ordIdx)[b] = ordered.pop(); - otherDocCounts.increment(ordIdx, -topBucketsPerOrd.get(ordIdx)[b].getDocCount()); + } } + assert ordsCollected == ordsArray.size(); + buildSubAggs(topBucketsPerOrd, ordsArray); } } - - buildSubAggs(topBucketsPerOrd); - return NumericTermsAggregator.this.buildAggregations( Math.toIntExact(owningBucketOrds.size()), ordIdx -> buildResult(owningBucketOrds.get(ordIdx), otherDocCounts.get(ordIdx), topBucketsPerOrd.get(ordIdx)) @@ -254,13 +270,13 @@ private InternalAggregation[] buildAggregations(LongArray owningBucketOrds) thro * Build a {@link ObjectArrayPriorityQueue} to sort the buckets. After we've * collected all of the buckets we'll collect all entries in the queue. */ - abstract ObjectArrayPriorityQueue buildPriorityQueue(int size); + abstract ObjectArrayPriorityQueue> buildPriorityQueue(int size); /** * Build the sub-aggregations into the buckets. This will usually - * delegate to {@link #buildSubAggsForAllBuckets}. + * delegate to {@link #buildSubAggsForAllBuckets(ObjectArray, LongArray, BiConsumer)}. */ - abstract void buildSubAggs(ObjectArray topBucketsPerOrd) throws IOException; + abstract void buildSubAggs(ObjectArray topBucketsPerOrd, LongArray ordsArray) throws IOException; /** * Collect extra entries for "zero" hit documents if they were requested @@ -287,9 +303,11 @@ interface BucketUpdater abstract class StandardTermsResultStrategy, B extends InternalTerms.Bucket> extends ResultStrategy { protected final boolean showTermDocCountError; + private final Comparator> comparator; - StandardTermsResultStrategy(boolean showTermDocCountError) { + StandardTermsResultStrategy(boolean showTermDocCountError, Aggregator aggregator) { this.showTermDocCountError = showTermDocCountError; + this.comparator = order.partiallyBuiltBucketComparator(aggregator); } @Override @@ -298,13 +316,13 @@ final LeafBucketCollector wrapCollector(LeafBucketCollector primary) { } @Override - final ObjectArrayPriorityQueue buildPriorityQueue(int size) { - return new BucketPriorityQueue<>(size, bigArrays(), partiallyBuiltBucketComparator); + final ObjectArrayPriorityQueue> buildPriorityQueue(int size) { + return new BucketPriorityQueue<>(size, bigArrays(), comparator); } @Override - final void buildSubAggs(ObjectArray topBucketsPerOrd) throws IOException { - buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs); + final void buildSubAggs(ObjectArray topBucketsPerOrd, LongArray ordsArray) throws IOException { + buildSubAggsForAllBuckets(topBucketsPerOrd, ordsArray, (b, aggs) -> b.aggregations = aggs); } @Override @@ -340,8 +358,8 @@ public final void close() {} } class LongTermsResults extends StandardTermsResultStrategy { - LongTermsResults(boolean showTermDocCountError) { - super(showTermDocCountError); + LongTermsResults(boolean showTermDocCountError, Aggregator aggregator) { + super(showTermDocCountError, aggregator); } @Override @@ -374,7 +392,6 @@ BucketUpdater bucketUpdater(long owningBucketOrd) { return (LongTerms.Bucket spare, BucketOrdsEnum ordsEnum, long docCount) -> { spare.term = ordsEnum.value(); spare.docCount = docCount; - spare.bucketOrd = ordsEnum.ord(); }; } @@ -424,8 +441,8 @@ LongTerms buildEmptyResult() { class DoubleTermsResults extends StandardTermsResultStrategy { - DoubleTermsResults(boolean showTermDocCountError) { - super(showTermDocCountError); + DoubleTermsResults(boolean showTermDocCountError, Aggregator aggregator) { + super(showTermDocCountError, aggregator); } @Override @@ -458,7 +475,6 @@ BucketUpdater bucketUpdater(long owningBucketOrd) { return (DoubleTerms.Bucket spare, BucketOrdsEnum ordsEnum, long docCount) -> { spare.term = NumericUtils.sortableLongToDouble(ordsEnum.value()); spare.docCount = docCount; - spare.bucketOrd = ordsEnum.ord(); }; } @@ -575,7 +591,6 @@ BucketUpdater bucketUpdater(long owningBucketOrd) { spare.term = ordsEnum.value(); spare.subsetDf = docCount; spare.supersetDf = backgroundFrequencies.freq(spare.term); - spare.bucketOrd = ordsEnum.ord(); // During shard-local down-selection we use subset/superset stats that are for this shard only // Back at the central reducer these properties will be updated with global stats spare.updateScore(significanceHeuristic, subsetSize, supersetSize); @@ -583,13 +598,13 @@ BucketUpdater bucketUpdater(long owningBucketOrd) { } @Override - ObjectArrayPriorityQueue buildPriorityQueue(int size) { + ObjectArrayPriorityQueue> buildPriorityQueue(int size) { return new BucketSignificancePriorityQueue<>(size, bigArrays()); } @Override - void buildSubAggs(ObjectArray topBucketsPerOrd) throws IOException { - buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs); + void buildSubAggs(ObjectArray topBucketsPerOrd, LongArray ordsArray) throws IOException { + buildSubAggsForAllBuckets(topBucketsPerOrd, ordsArray, (b, aggs) -> b.aggregations = aggs); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java index 4922be7cec1b..c07c0726a4ae 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java @@ -27,7 +27,6 @@ import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; -import java.util.Comparator; import java.util.HashSet; import java.util.Map; import java.util.Objects; @@ -190,7 +189,6 @@ public boolean equals(Object obj) { protected final DocValueFormat format; protected final BucketCountThresholds bucketCountThresholds; protected final BucketOrder order; - protected final Comparator> partiallyBuiltBucketComparator; protected final Set aggsUsedForSorting; protected final SubAggCollectionMode collectMode; @@ -209,7 +207,9 @@ public TermsAggregator( super(name, factories, context, parent, metadata); this.bucketCountThresholds = bucketCountThresholds; this.order = order; - partiallyBuiltBucketComparator = order == null ? null : order.partiallyBuiltBucketComparator(b -> b.bucketOrd, this); + if (order != null) { + order.validate(this); + } this.format = format; if ((subAggsNeedScore() && descendsFromNestedAggregator(parent)) || context.isInSortOrderExecutionRequired()) { /** diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java index 2c7b768fcdbb..da5ae37b0822 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java @@ -195,12 +195,12 @@ private static TermsAggregatorSupplier numericSupplier() { if (includeExclude != null) { longFilter = includeExclude.convertToDoubleFilter(); } - resultStrategy = agg -> agg.new DoubleTermsResults(showTermDocCountError); + resultStrategy = agg -> agg.new DoubleTermsResults(showTermDocCountError, agg); } else { if (includeExclude != null) { longFilter = includeExclude.convertToLongFilter(valuesSourceConfig.format()); } - resultStrategy = agg -> agg.new LongTermsResults(showTermDocCountError); + resultStrategy = agg -> agg.new LongTermsResults(showTermDocCountError, agg); } return new NumericTermsAggregator( name, @@ -403,7 +403,7 @@ Aggregator create( name, factories, new MapStringTermsAggregator.ValuesSourceCollectorSource(valuesSourceConfig), - a -> a.new StandardTermsResults(valuesSourceConfig.getValuesSource()), + a -> a.new StandardTermsResults(valuesSourceConfig.getValuesSource(), a), order, valuesSourceConfig.format(), bucketCountThresholds, diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/InternalMultiTerms.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/InternalMultiTerms.java index 0d42a2856a10..85510c8a989c 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/InternalMultiTerms.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/InternalMultiTerms.java @@ -37,9 +37,6 @@ public class InternalMultiTerms extends AbstractInternalTerms { - - long bucketOrd; - protected long docCount; protected InternalAggregations aggregations; private long docCountError; diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsAggregator.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsAggregator.java index 1691aedf543f..5c10e2c8feeb 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsAggregator.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsAggregator.java @@ -20,6 +20,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.util.IntArray; import org.elasticsearch.common.util.LongArray; import org.elasticsearch.common.util.ObjectArray; import org.elasticsearch.common.util.ObjectArrayPriorityQueue; @@ -40,6 +41,7 @@ import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; import org.elasticsearch.search.aggregations.bucket.DeferableBucketAggregator; +import org.elasticsearch.search.aggregations.bucket.terms.BucketAndOrd; import org.elasticsearch.search.aggregations.bucket.terms.BucketPriorityQueue; import org.elasticsearch.search.aggregations.bucket.terms.BytesKeyedBucketOrds; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregator; @@ -72,7 +74,7 @@ class MultiTermsAggregator extends DeferableBucketAggregator { protected final List formats; protected final TermsAggregator.BucketCountThresholds bucketCountThresholds; protected final BucketOrder order; - protected final Comparator partiallyBuiltBucketComparator; + protected final Comparator> partiallyBuiltBucketComparator; protected final Set aggsUsedForSorting; protected final SubAggCollectionMode collectMode; private final List values; @@ -99,7 +101,7 @@ protected MultiTermsAggregator( super(name, factories, context, parent, metadata); this.bucketCountThresholds = bucketCountThresholds; this.order = order; - partiallyBuiltBucketComparator = order == null ? null : order.partiallyBuiltBucketComparator(b -> b.bucketOrd, this); + partiallyBuiltBucketComparator = order == null ? null : order.partiallyBuiltBucketComparator(this); this.formats = formats; this.showTermDocCountError = showTermDocCountError; if (subAggsNeedScore() && descendsFromNestedAggregator(parent) || context.isInSortOrderExecutionRequired()) { @@ -242,52 +244,67 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw LongArray otherDocCounts = bigArrays().newLongArray(owningBucketOrds.size(), true); ObjectArray topBucketsPerOrd = bigArrays().newObjectArray(owningBucketOrds.size()) ) { - for (long ordIdx = 0; ordIdx < owningBucketOrds.size(); ordIdx++) { - final long owningBucketOrd = owningBucketOrds.get(ordIdx); - long bucketsInOrd = bucketOrds.bucketsInOrd(owningBucketOrd); - - int size = (int) Math.min(bucketsInOrd, bucketCountThresholds.getShardSize()); - try ( - ObjectArrayPriorityQueue ordered = new BucketPriorityQueue<>( - size, - bigArrays(), - partiallyBuiltBucketComparator - ) - ) { - InternalMultiTerms.Bucket spare = null; - BytesRef spareKey = null; - BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrd); - while (ordsEnum.next()) { - long docCount = bucketDocCount(ordsEnum.ord()); - otherDocCounts.increment(ordIdx, docCount); - if (docCount < bucketCountThresholds.getShardMinDocCount()) { - continue; - } - if (spare == null) { - checkRealMemoryCBForInternalBucket(); - spare = new InternalMultiTerms.Bucket(null, 0, null, showTermDocCountError, 0, formats, keyConverters); - spareKey = new BytesRef(); - } - ordsEnum.readValue(spareKey); - spare.terms = unpackTerms(spareKey); - spare.docCount = docCount; - spare.bucketOrd = ordsEnum.ord(); - spare = ordered.insertWithOverflow(spare); - } + try (IntArray bucketsToCollect = bigArrays().newIntArray(owningBucketOrds.size())) { + long ordsToCollect = 0; + for (long ordIdx = 0; ordIdx < owningBucketOrds.size(); ordIdx++) { + int size = (int) Math.min(bucketOrds.bucketsInOrd(owningBucketOrds.get(ordIdx)), bucketCountThresholds.getShardSize()); + ordsToCollect += size; + bucketsToCollect.set(ordIdx, size); + } + try (LongArray ordsArray = bigArrays().newLongArray(ordsToCollect)) { + long ordsCollected = 0; + for (long ordIdx = 0; ordIdx < owningBucketOrds.size(); ordIdx++) { + final long owningBucketOrd = owningBucketOrds.get(ordIdx); + long bucketsInOrd = bucketOrds.bucketsInOrd(owningBucketOrd); + + int size = (int) Math.min(bucketsInOrd, bucketCountThresholds.getShardSize()); + try ( + ObjectArrayPriorityQueue> ordered = new BucketPriorityQueue<>( + size, + bigArrays(), + partiallyBuiltBucketComparator + ) + ) { + BucketAndOrd spare = null; + BytesRef spareKey = null; + BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrd); + while (ordsEnum.next()) { + long docCount = bucketDocCount(ordsEnum.ord()); + otherDocCounts.increment(ordIdx, docCount); + if (docCount < bucketCountThresholds.getShardMinDocCount()) { + continue; + } + if (spare == null) { + checkRealMemoryCBForInternalBucket(); + spare = new BucketAndOrd<>( + new InternalMultiTerms.Bucket(null, 0, null, showTermDocCountError, 0, formats, keyConverters) + ); + spareKey = new BytesRef(); + } + ordsEnum.readValue(spareKey); + spare.bucket.terms = unpackTerms(spareKey); + spare.bucket.docCount = docCount; + spare.ord = ordsEnum.ord(); + spare = ordered.insertWithOverflow(spare); + } - // Get the top buckets - InternalMultiTerms.Bucket[] bucketsForOrd = new InternalMultiTerms.Bucket[(int) ordered.size()]; - topBucketsPerOrd.set(ordIdx, bucketsForOrd); - for (int b = (int) ordered.size() - 1; b >= 0; --b) { - InternalMultiTerms.Bucket[] buckets = topBucketsPerOrd.get(ordIdx); - buckets[b] = ordered.pop(); - otherDocCounts.increment(ordIdx, -buckets[b].getDocCount()); + // Get the top buckets + int orderedSize = (int) ordered.size(); + InternalMultiTerms.Bucket[] buckets = new InternalMultiTerms.Bucket[orderedSize]; + for (int i = orderedSize - 1; i >= 0; --i) { + BucketAndOrd bucketAndOrd = ordered.pop(); + buckets[i] = bucketAndOrd.bucket; + ordsArray.set(ordsCollected + i, bucketAndOrd.ord); + otherDocCounts.increment(ordIdx, -buckets[i].getDocCount()); + } + topBucketsPerOrd.set(ordIdx, buckets); + ordsCollected += orderedSize; + } } + buildSubAggsForAllBuckets(topBucketsPerOrd, ordsArray, (b, a) -> b.aggregations = a); } } - buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, a) -> b.aggregations = a); - return buildAggregations( Math.toIntExact(owningBucketOrds.size()), ordIdx -> buildResult(otherDocCounts.get(ordIdx), topBucketsPerOrd.get(ordIdx)) From b449c8e0ec0cbc45a79aea0ce443e4f60b77103c Mon Sep 17 00:00:00 2001 From: kosabogi <105062005+kosabogi@users.noreply.github.com> Date: Thu, 5 Dec 2024 17:00:19 +0100 Subject: [PATCH 2/3] Adds warning to Create inference API page (#118073) (#118091) --- docs/reference/inference/put-inference.asciidoc | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/docs/reference/inference/put-inference.asciidoc b/docs/reference/inference/put-inference.asciidoc index e7e25ec98b49..2986f16916f3 100644 --- a/docs/reference/inference/put-inference.asciidoc +++ b/docs/reference/inference/put-inference.asciidoc @@ -10,7 +10,6 @@ Creates an {infer} endpoint to perform an {infer} task. * For built-in models and models uploaded through Eland, the {infer} APIs offer an alternative way to use and manage trained models. However, if you do not plan to use the {infer} APIs to use these models or if you want to use non-NLP models, use the <>. ==== - [discrete] [[put-inference-api-request]] ==== {api-request-title} @@ -47,6 +46,14 @@ Refer to the service list in the <> API. In the response, look for `"state": "fully_allocated"` and ensure the `"allocation_count"` matches the `"target_allocation_count"`. +* Avoid creating multiple endpoints for the same model unless required, as each endpoint consumes significant resources. +==== + + The following services are available through the {infer} API. You can find the available task types next to the service name. Click the links to review the configuration details of the services: From 90b116f2f190ab206d221b6a1f09d15e1f4ea7a7 Mon Sep 17 00:00:00 2001 From: Mark Tozzi Date: Thu, 5 Dec 2024 11:15:10 -0500 Subject: [PATCH 3/3] Esql refactor date tests (#117923) (#118085) This refactors a bit of the type logic in the parametrized testing to pass the input values as java Instants for millisecond and nanosecond date. Mainly, this impacts verifier functions. The goal here is to ensure that the values are correctly converted based on the type they were generated as, rather than relying on the verifier function to know how to convert from a long with no additional information. This will make tests that have mixed millisecond and nanosecond inputs easier to write correctly. --- .../xpack/esql/core/util/DateUtils.java | 4 + .../expression/function/TestCaseSupplier.java | 78 +++---------------- .../scalar/convert/ToDateNanosTests.java | 17 ++-- .../scalar/convert/ToDatetimeTests.java | 14 +++- .../scalar/convert/ToDoubleTests.java | 6 +- .../scalar/convert/ToIntegerTests.java | 7 +- .../function/scalar/convert/ToLongTests.java | 11 ++- .../scalar/convert/ToStringTests.java | 11 ++- .../scalar/convert/ToUnsignedLongTests.java | 6 +- .../operator/arithmetic/AddTests.java | 22 +++--- .../operator/arithmetic/SubTests.java | 16 ++-- .../comparison/GreaterThanOrEqualTests.java | 39 ++++------ .../operator/comparison/GreaterThanTests.java | 5 +- .../comparison/LessThanOrEqualTests.java | 39 ++++------ .../operator/comparison/LessThanTests.java | 5 +- 15 files changed, 117 insertions(+), 163 deletions(-) diff --git a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/util/DateUtils.java b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/util/DateUtils.java index 280cf172a8a5..20f7b400e936 100644 --- a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/util/DateUtils.java +++ b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/util/DateUtils.java @@ -174,6 +174,10 @@ public static ZonedDateTime asDateTime(long millis) { return ZonedDateTime.ofInstant(Instant.ofEpochMilli(millis), UTC); } + public static ZonedDateTime asDateTime(Instant instant) { + return ZonedDateTime.ofInstant(instant, UTC); + } + public static long asMillis(ZonedDateTime zonedDateTime) { return zonedDateTime.toInstant().toEpochMilli(); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/TestCaseSupplier.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/TestCaseSupplier.java index 816c9ef6f352..377027b70fb5 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/TestCaseSupplier.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/TestCaseSupplier.java @@ -620,70 +620,6 @@ public static void forUnaryBoolean( unary(suppliers, expectedEvaluatorToString, booleanCases(), expectedType, v -> expectedValue.apply((Boolean) v), warnings); } - /** - * Generate positive test cases for a unary function operating on an {@link DataType#DATETIME}. - * This variant defaults to maximum range of possible values - */ - public static void forUnaryDatetime( - List suppliers, - String expectedEvaluatorToString, - DataType expectedType, - Function expectedValue, - List warnings - ) { - unaryNumeric( - suppliers, - expectedEvaluatorToString, - dateCases(), - expectedType, - n -> expectedValue.apply(Instant.ofEpochMilli(n.longValue())), - warnings - ); - } - - /** - * Generate positive test cases for a unary function operating on an {@link DataType#DATETIME}. - * This variant accepts a range of values - */ - public static void forUnaryDatetime( - List suppliers, - String expectedEvaluatorToString, - DataType expectedType, - long min, - long max, - Function expectedValue, - List warnings - ) { - unaryNumeric( - suppliers, - expectedEvaluatorToString, - dateCases(min, max), - expectedType, - n -> expectedValue.apply(Instant.ofEpochMilli(n.longValue())), - warnings - ); - } - - /** - * Generate positive test cases for a unary function operating on an {@link DataType#DATE_NANOS}. - */ - public static void forUnaryDateNanos( - List suppliers, - String expectedEvaluatorToString, - DataType expectedType, - Function expectedValue, - List warnings - ) { - unaryNumeric( - suppliers, - expectedEvaluatorToString, - dateNanosCases(), - expectedType, - n -> expectedValue.apply(DateUtils.toInstant((long) n)), - warnings - ); - } - /** * Generate positive test cases for a unary function operating on an {@link DataType#GEO_POINT}. */ @@ -1912,11 +1848,19 @@ public List multiRowData() { } /** - * @return the data value being supplied, casting unsigned longs into BigIntegers correctly + * @return the data value being supplied, casting to java objects when appropriate */ public Object getValue() { - if (type == DataType.UNSIGNED_LONG && data instanceof Long l) { - return NumericUtils.unsignedLongAsBigInteger(l); + if (data instanceof Long l) { + if (type == DataType.UNSIGNED_LONG) { + return NumericUtils.unsignedLongAsBigInteger(l); + } + if (type == DataType.DATETIME) { + return Instant.ofEpochMilli(l); + } + if (type == DataType.DATE_NANOS) { + return DateUtils.toInstant(l); + } } return data; } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToDateNanosTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToDateNanosTests.java index e91a5cc1ebca..8d1c2443c1bf 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToDateNanosTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToDateNanosTests.java @@ -18,6 +18,7 @@ import org.elasticsearch.xpack.esql.expression.function.TestCaseSupplier; import java.math.BigInteger; +import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.function.Supplier; @@ -32,14 +33,20 @@ public static Iterable parameters() { final String read = "Attribute[channel=0]"; final List suppliers = new ArrayList<>(); - TestCaseSupplier.forUnaryDateNanos(suppliers, read, DataType.DATE_NANOS, DateUtils::toLong, List.of()); - TestCaseSupplier.forUnaryDatetime( + TestCaseSupplier.unary( + suppliers, + read, + TestCaseSupplier.dateNanosCases(), + DataType.DATE_NANOS, + v -> DateUtils.toLong((Instant) v), + List.of() + ); + TestCaseSupplier.unary( suppliers, "ToDateNanosFromDatetimeEvaluator[field=" + read + "]", + TestCaseSupplier.dateCases(0, DateUtils.MAX_NANOSECOND_INSTANT.toEpochMilli()), DataType.DATE_NANOS, - 0, - DateUtils.MAX_NANOSECOND_INSTANT.toEpochMilli(), - i -> DateUtils.toNanoSeconds(i.toEpochMilli()), + i -> DateUtils.toNanoSeconds(((Instant) i).toEpochMilli()), List.of() ); TestCaseSupplier.forUnaryLong( diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToDatetimeTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToDatetimeTests.java index 2852b92ba156..43b889baf530 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToDatetimeTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToDatetimeTests.java @@ -37,12 +37,20 @@ public static Iterable parameters() { final String read = "Attribute[channel=0]"; final List suppliers = new ArrayList<>(); - TestCaseSupplier.forUnaryDatetime(suppliers, read, DataType.DATETIME, Instant::toEpochMilli, emptyList()); - TestCaseSupplier.forUnaryDateNanos( + TestCaseSupplier.unary( + suppliers, + read, + TestCaseSupplier.dateCases(), + DataType.DATETIME, + v -> ((Instant) v).toEpochMilli(), + emptyList() + ); + TestCaseSupplier.unary( suppliers, "ToDatetimeFromDateNanosEvaluator[field=" + read + "]", + TestCaseSupplier.dateNanosCases(), DataType.DATETIME, - i -> DateUtils.toMilliSeconds(DateUtils.toLong(i)), + i -> DateUtils.toMilliSeconds(DateUtils.toLong((Instant) i)), emptyList() ); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToDoubleTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToDoubleTests.java index d5153019c1e4..b68306d6cac8 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToDoubleTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToDoubleTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter; import java.math.BigInteger; +import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.function.Function; @@ -49,11 +50,12 @@ public static Iterable parameters() { ); TestCaseSupplier.forUnaryBoolean(suppliers, evaluatorName.apply("Boolean"), DataType.DOUBLE, b -> b ? 1d : 0d, List.of()); - TestCaseSupplier.forUnaryDatetime( + TestCaseSupplier.unary( suppliers, evaluatorName.apply("Long"), + TestCaseSupplier.dateCases(), DataType.DOUBLE, - i -> (double) i.toEpochMilli(), + i -> (double) ((Instant) i).toEpochMilli(), List.of() ); // random strings that don't look like a double diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToIntegerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToIntegerTests.java index eb81d48e0c5b..6a3f7022c9d3 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToIntegerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToIntegerTests.java @@ -19,6 +19,7 @@ import org.elasticsearch.xpack.esql.expression.function.TestCaseSupplier; import java.math.BigInteger; +import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.function.Function; @@ -48,7 +49,7 @@ public static Iterable parameters() { evaluatorName.apply("Long"), dateCases(0, Integer.MAX_VALUE), DataType.INTEGER, - l -> ((Long) l).intValue(), + l -> Long.valueOf(((Instant) l).toEpochMilli()).intValue(), List.of() ); // datetimes that fall outside Integer's range @@ -60,7 +61,9 @@ public static Iterable parameters() { l -> null, l -> List.of( "Line -1:-1: evaluation of [] failed, treating result as null. Only first 20 failures recorded.", - "Line -1:-1: org.elasticsearch.xpack.esql.core.InvalidArgumentException: [" + l + "] out of [integer] range" + "Line -1:-1: org.elasticsearch.xpack.esql.core.InvalidArgumentException: [" + + ((Instant) l).toEpochMilli() + + "] out of [integer] range" ) ); // random strings that don't look like an Integer diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToLongTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToLongTests.java index 4c2cf14af41e..c7101ab730ab 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToLongTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToLongTests.java @@ -43,8 +43,15 @@ public static Iterable parameters() { TestCaseSupplier.forUnaryBoolean(suppliers, evaluatorName.apply("Boolean"), DataType.LONG, b -> b ? 1L : 0L, List.of()); // datetimes - TestCaseSupplier.forUnaryDatetime(suppliers, read, DataType.LONG, Instant::toEpochMilli, List.of()); - TestCaseSupplier.forUnaryDateNanos(suppliers, read, DataType.LONG, DateUtils::toLong, List.of()); + TestCaseSupplier.unary(suppliers, read, TestCaseSupplier.dateCases(), DataType.LONG, v -> ((Instant) v).toEpochMilli(), List.of()); + TestCaseSupplier.unary( + suppliers, + read, + TestCaseSupplier.dateNanosCases(), + DataType.LONG, + v -> DateUtils.toLong((Instant) v), + List.of() + ); // random strings that don't look like a long TestCaseSupplier.forUnaryStrings( suppliers, diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToStringTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToStringTests.java index 0b101efa073d..3b30e4b353ae 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToStringTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToStringTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.xpack.esql.expression.function.TestCaseSupplier; import java.math.BigInteger; +import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.function.Supplier; @@ -81,18 +82,20 @@ public static Iterable parameters() { b -> new BytesRef(b.toString()), List.of() ); - TestCaseSupplier.forUnaryDatetime( + TestCaseSupplier.unary( suppliers, "ToStringFromDatetimeEvaluator[field=" + read + "]", + TestCaseSupplier.dateCases(), DataType.KEYWORD, - i -> new BytesRef(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(i.toEpochMilli())), + i -> new BytesRef(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(((Instant) i).toEpochMilli())), List.of() ); - TestCaseSupplier.forUnaryDateNanos( + TestCaseSupplier.unary( suppliers, "ToStringFromDateNanosEvaluator[field=" + read + "]", + TestCaseSupplier.dateNanosCases(), DataType.KEYWORD, - i -> new BytesRef(DateFieldMapper.DEFAULT_DATE_TIME_NANOS_FORMATTER.formatNanos(DateUtils.toLong(i))), + i -> new BytesRef(DateFieldMapper.DEFAULT_DATE_TIME_NANOS_FORMATTER.formatNanos(DateUtils.toLong((Instant) i))), List.of() ); TestCaseSupplier.forUnaryGeoPoint( diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToUnsignedLongTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToUnsignedLongTests.java index d8122aa73f81..ca48bb029f22 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToUnsignedLongTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToUnsignedLongTests.java @@ -19,6 +19,7 @@ import java.math.BigDecimal; import java.math.BigInteger; +import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.function.Function; @@ -58,11 +59,12 @@ public static Iterable parameters() { ); // datetimes - TestCaseSupplier.forUnaryDatetime( + TestCaseSupplier.unary( suppliers, evaluatorName.apply("Long"), + TestCaseSupplier.dateCases(), DataType.UNSIGNED_LONG, - instant -> BigInteger.valueOf(instant.toEpochMilli()), + instant -> BigInteger.valueOf(((Instant) instant).toEpochMilli()), List.of() ); // random strings that don't look like an unsigned_long diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/predicate/operator/arithmetic/AddTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/predicate/operator/arithmetic/AddTests.java index abfb634d5f30..aa4c037e5e96 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/predicate/operator/arithmetic/AddTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/predicate/operator/arithmetic/AddTests.java @@ -159,7 +159,7 @@ public static Iterable parameters() { }; BiFunction> warnings = (lhs, rhs) -> { try { - addDatesAndTemporalAmount(lhs.data(), rhs.data(), AddTests::addMillis); + addDatesAndTemporalAmount(lhs.getValue(), rhs.getValue(), AddTests::addMillis); return List.of(); } catch (ArithmeticException e) { return List.of( @@ -193,6 +193,7 @@ public static Iterable parameters() { BinaryOperator nanosResult = (lhs, rhs) -> { try { + assert (lhs instanceof Instant) || (rhs instanceof Instant); return addDatesAndTemporalAmount(lhs, rhs, AddTests::addNanos); } catch (ArithmeticException e) { return null; @@ -327,29 +328,28 @@ private static String addErrorMessageString(boolean includeOrdinal, List adder) { + private static Object addDatesAndTemporalAmount(Object lhs, Object rhs, ToLongBiFunction adder) { // this weird casting dance makes the expected value lambda symmetric - Long date; + Instant date; TemporalAmount period; - if (lhs instanceof Long) { - date = (Long) lhs; + assert (lhs instanceof Instant) || (rhs instanceof Instant); + if (lhs instanceof Instant) { + date = (Instant) lhs; period = (TemporalAmount) rhs; } else { - date = (Long) rhs; + date = (Instant) rhs; period = (TemporalAmount) lhs; } return adder.applyAsLong(date, period); } - private static long addMillis(Long date, TemporalAmount period) { + private static long addMillis(Instant date, TemporalAmount period) { return asMillis(asDateTime(date).plus(period)); } - private static long addNanos(Long date, TemporalAmount period) { + private static long addNanos(Instant date, TemporalAmount period) { return DateUtils.toLong( - Instant.from( - ZonedDateTime.ofInstant(DateUtils.toInstant(date), org.elasticsearch.xpack.esql.core.util.DateUtils.UTC).plus(period) - ) + Instant.from(ZonedDateTime.ofInstant(date, org.elasticsearch.xpack.esql.core.util.DateUtils.UTC).plus(period)) ); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/predicate/operator/arithmetic/SubTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/predicate/operator/arithmetic/SubTests.java index 1338299b3a12..bce5dea30f84 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/predicate/operator/arithmetic/SubTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/predicate/operator/arithmetic/SubTests.java @@ -277,25 +277,23 @@ protected Expression build(Source source, List args) { return new Sub(source, args.get(0), args.get(1)); } - private static Object subtractDatesAndTemporalAmount(Object lhs, Object rhs, ToLongBiFunction subtract) { + private static Object subtractDatesAndTemporalAmount(Object lhs, Object rhs, ToLongBiFunction subtract) { // this weird casting dance makes the expected value lambda symmetric - Long date; + Instant date; TemporalAmount period; - if (lhs instanceof Long) { - date = (Long) lhs; + if (lhs instanceof Instant) { + date = (Instant) lhs; period = (TemporalAmount) rhs; } else { - date = (Long) rhs; + date = (Instant) rhs; period = (TemporalAmount) lhs; } return subtract.applyAsLong(date, period); } - private static long subtractNanos(Long date, TemporalAmount period) { + private static long subtractNanos(Instant date, TemporalAmount period) { return DateUtils.toLong( - Instant.from( - ZonedDateTime.ofInstant(DateUtils.toInstant(date), org.elasticsearch.xpack.esql.core.util.DateUtils.UTC).minus(period) - ) + Instant.from(ZonedDateTime.ofInstant(date, org.elasticsearch.xpack.esql.core.util.DateUtils.UTC).minus(period)) ); } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/predicate/operator/comparison/GreaterThanOrEqualTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/predicate/operator/comparison/GreaterThanOrEqualTests.java index a4f1a19e135e..395a574028f6 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/predicate/operator/comparison/GreaterThanOrEqualTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/predicate/operator/comparison/GreaterThanOrEqualTests.java @@ -19,6 +19,7 @@ import org.elasticsearch.xpack.esql.expression.function.TestCaseSupplier; import java.math.BigInteger; +import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.function.Supplier; @@ -106,33 +107,19 @@ public static Iterable parameters() { ) ); // Datetime - suppliers.addAll( - TestCaseSupplier.forBinaryNotCasting( - "GreaterThanOrEqualLongsEvaluator", - "lhs", - "rhs", - (l, r) -> ((Number) l).longValue() >= ((Number) r).longValue(), - DataType.BOOLEAN, - TestCaseSupplier.dateCases(), - TestCaseSupplier.dateCases(), - List.of(), - false - ) - ); + suppliers.addAll(TestCaseSupplier.forBinaryNotCasting("GreaterThanOrEqualLongsEvaluator", "lhs", "rhs", (lhs, rhs) -> { + if (lhs instanceof Instant l && rhs instanceof Instant r) { + return l.isAfter(r) || l.equals(r); + } + throw new UnsupportedOperationException("Got some weird types"); + }, DataType.BOOLEAN, TestCaseSupplier.dateCases(), TestCaseSupplier.dateCases(), List.of(), false)); - suppliers.addAll( - TestCaseSupplier.forBinaryNotCasting( - "GreaterThanOrEqualLongsEvaluator", - "lhs", - "rhs", - (l, r) -> ((Number) l).longValue() >= ((Number) r).longValue(), - DataType.BOOLEAN, - TestCaseSupplier.dateNanosCases(), - TestCaseSupplier.dateNanosCases(), - List.of(), - false - ) - ); + suppliers.addAll(TestCaseSupplier.forBinaryNotCasting("GreaterThanOrEqualLongsEvaluator", "lhs", "rhs", (lhs, rhs) -> { + if (lhs instanceof Instant l && rhs instanceof Instant r) { + return l.isAfter(r) || l.equals(r); + } + throw new UnsupportedOperationException("Got some weird types"); + }, DataType.BOOLEAN, TestCaseSupplier.dateNanosCases(), TestCaseSupplier.dateNanosCases(), List.of(), false)); suppliers.addAll( TestCaseSupplier.stringCases( diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/predicate/operator/comparison/GreaterThanTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/predicate/operator/comparison/GreaterThanTests.java index 86a4676e3500..b56ecd7392ba 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/predicate/operator/comparison/GreaterThanTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/predicate/operator/comparison/GreaterThanTests.java @@ -19,6 +19,7 @@ import org.elasticsearch.xpack.esql.expression.function.TestCaseSupplier; import java.math.BigInteger; +import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.function.Supplier; @@ -111,7 +112,7 @@ public static Iterable parameters() { "GreaterThanLongsEvaluator", "lhs", "rhs", - (l, r) -> ((Number) l).longValue() > ((Number) r).longValue(), + (l, r) -> ((Instant) l).isAfter((Instant) r), DataType.BOOLEAN, TestCaseSupplier.dateCases(), TestCaseSupplier.dateCases(), @@ -125,7 +126,7 @@ public static Iterable parameters() { "GreaterThanLongsEvaluator", "lhs", "rhs", - (l, r) -> ((Number) l).longValue() > ((Number) r).longValue(), + (l, r) -> ((Instant) l).isAfter((Instant) r), DataType.BOOLEAN, TestCaseSupplier.dateNanosCases(), TestCaseSupplier.dateNanosCases(), diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/predicate/operator/comparison/LessThanOrEqualTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/predicate/operator/comparison/LessThanOrEqualTests.java index 5793f26ecd44..60062f071c18 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/predicate/operator/comparison/LessThanOrEqualTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/predicate/operator/comparison/LessThanOrEqualTests.java @@ -19,6 +19,7 @@ import org.elasticsearch.xpack.esql.expression.function.TestCaseSupplier; import java.math.BigInteger; +import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.function.Supplier; @@ -106,33 +107,19 @@ public static Iterable parameters() { ) ); // Datetime - suppliers.addAll( - TestCaseSupplier.forBinaryNotCasting( - "LessThanOrEqualLongsEvaluator", - "lhs", - "rhs", - (l, r) -> ((Number) l).longValue() <= ((Number) r).longValue(), - DataType.BOOLEAN, - TestCaseSupplier.dateCases(), - TestCaseSupplier.dateCases(), - List.of(), - false - ) - ); + suppliers.addAll(TestCaseSupplier.forBinaryNotCasting("LessThanOrEqualLongsEvaluator", "lhs", "rhs", (lhs, rhs) -> { + if (lhs instanceof Instant l && rhs instanceof Instant r) { + return l.isBefore(r) || l.equals(r); + } + throw new UnsupportedOperationException("Got some weird types"); + }, DataType.BOOLEAN, TestCaseSupplier.dateCases(), TestCaseSupplier.dateCases(), List.of(), false)); - suppliers.addAll( - TestCaseSupplier.forBinaryNotCasting( - "LessThanOrEqualLongsEvaluator", - "lhs", - "rhs", - (l, r) -> ((Number) l).longValue() <= ((Number) r).longValue(), - DataType.BOOLEAN, - TestCaseSupplier.dateNanosCases(), - TestCaseSupplier.dateNanosCases(), - List.of(), - false - ) - ); + suppliers.addAll(TestCaseSupplier.forBinaryNotCasting("LessThanOrEqualLongsEvaluator", "lhs", "rhs", (lhs, rhs) -> { + if (lhs instanceof Instant l && rhs instanceof Instant r) { + return l.isBefore(r) || l.equals(r); + } + throw new UnsupportedOperationException("Got some weird types"); + }, DataType.BOOLEAN, TestCaseSupplier.dateNanosCases(), TestCaseSupplier.dateNanosCases(), List.of(), false)); suppliers.addAll( TestCaseSupplier.stringCases( diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/predicate/operator/comparison/LessThanTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/predicate/operator/comparison/LessThanTests.java index 0d114b496492..30812cf8e538 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/predicate/operator/comparison/LessThanTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/predicate/operator/comparison/LessThanTests.java @@ -19,6 +19,7 @@ import org.elasticsearch.xpack.esql.expression.function.TestCaseSupplier; import java.math.BigInteger; +import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.function.Supplier; @@ -111,7 +112,7 @@ public static Iterable parameters() { "LessThanLongsEvaluator", "lhs", "rhs", - (l, r) -> ((Number) l).longValue() < ((Number) r).longValue(), + (l, r) -> ((Instant) l).isBefore((Instant) r), DataType.BOOLEAN, TestCaseSupplier.dateNanosCases(), TestCaseSupplier.dateNanosCases(), @@ -125,7 +126,7 @@ public static Iterable parameters() { "LessThanLongsEvaluator", "lhs", "rhs", - (l, r) -> ((Number) l).longValue() < ((Number) r).longValue(), + (l, r) -> ((Instant) l).isBefore((Instant) r), DataType.BOOLEAN, TestCaseSupplier.dateCases(), TestCaseSupplier.dateCases(),