diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/FilterRewriteHelper.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/FilterRewriteHelper.java index 804f179a0990e..a647f82548e33 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/FilterRewriteHelper.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/FilterRewriteHelper.java @@ -143,7 +143,7 @@ private static Weight[] createFilterForAggregations( final long interval = intervalOpt.getAsLong(); // afterKey is the last bucket key in previous response, while the bucket key // is the start of the bucket values, so add the interval - if (afterKey != 0) { + if (afterKey != -1) { low = afterKey + interval; } // Calculate the number of buckets using range and interval @@ -248,7 +248,7 @@ public static class ValueSourceContext { * @param missing whether missing value/bucket is set * @param hasScript whether script is used * @param fieldType null if the field doesn't exist - * @param afterKey for composite aggregation, the key of the last bucket in the previous response + * @param afterKey used to paginate for composite aggregation, pass in -1 if not used */ public ValueSourceContext(boolean missing, boolean hasScript, MappedFieldType fieldType, long afterKey) { this.missing = missing; diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java index 23e875645d344..e391422d7a9a2 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -165,7 +165,7 @@ final class CompositeAggregator extends BucketsAggregator { this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size, rawAfterKey); this.rawAfterKey = rawAfterKey; - // Try fast filter optimization + // Try fast filter optimization when the only source is date histogram if (sourceConfigs.length == 1 && sourceConfigs[0].valuesSource() instanceof RoundingValuesSource) { RoundingValuesSource dateHistogramSource = (RoundingValuesSource) sourceConfigs[0].valuesSource(); bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), CardinalityUpperBound.ONE); @@ -258,7 +258,7 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I ); } - // Fast filters optimization + // Build results from fast filters optimization if (bucketOrds != null) { Map bucketMap = new HashMap<>(); for (InternalComposite.InternalBucket internalBucket : buckets) { @@ -386,9 +386,7 @@ private Sort buildIndexSortPrefix(LeafReaderContext context) throws IOException } break; } - sortFields.add(indexSortField); - if (sourceConfig.valuesSource() instanceof RoundingValuesSource) { // the rounding "squashes" many values together, that breaks the ordering of sub-values, // so we ignore the subsequent sources even if they match the index sort. @@ -516,7 +514,6 @@ private void processLeafFromQuery(LeafReaderContext ctx, Sort indexSortPrefix) t .build(); Weight weight = context.searcher().createWeight(context.searcher().rewrite(newQuery), ScoreMode.COMPLETE_NO_SCORES, 1f); Scorer scorer = weight.scorer(ctx); - if (scorer != null) { DocIdSetIterator docIt = scorer.iterator(); final LeafBucketCollector inner = queue.getLeafCollector( @@ -524,7 +521,6 @@ private void processLeafFromQuery(LeafReaderContext ctx, Sort indexSortPrefix) t getFirstPassCollector(docIdSetBuilder, indexSortPrefix.getSort().length) ); inner.setScorer(scorer); - final Bits liveDocs = ctx.reader().getLiveDocs(); while (docIt.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { if (liveDocs == null || liveDocs.get(docIt.docID())) { @@ -548,7 +544,6 @@ protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucket Sort indexSortPrefix = buildIndexSortPrefix(ctx); int sortPrefixLen = computeSortPrefixLen(indexSortPrefix); - // are there index sort enabled? sortPrefixLen SortedDocsProducer sortedDocsProducer = sortPrefixLen == 0 ? sources[0].createSortedDocsProducerOrNull(ctx.reader(), context.query()) : null; @@ -602,6 +597,8 @@ public void collect(int doc, long bucket) throws IOException { try { long docCount = docCountProvider.getDocCount(doc); if (queue.addIfCompetitive(indexSortPrefix, docCount)) { + // one doc may contain multiple values, we iterate over and collect one by one + // so the same doc can appear multiple times here if (builder != null && lastDoc != doc) { builder.add(doc); lastDoc = doc; @@ -626,18 +623,14 @@ private void runDeferredCollections() throws IOException { Query query = context.query(); weight = context.searcher().createWeight(context.searcher().rewrite(query), ScoreMode.COMPLETE, 1f); } - deferredCollectors.preCollection(); - for (Entry entry : entries) { DocIdSetIterator docIdSetIterator = entry.docIdSet.iterator(); if (docIdSetIterator == null) { continue; } - final LeafBucketCollector subCollector = deferredCollectors.getLeafCollector(entry.context); final LeafBucketCollector collector = queue.getLeafCollector(entry.context, getSecondPassCollector(subCollector)); - DocIdSetIterator scorerIt = null; if (needsScores) { Scorer scorer = weight.scorer(entry.context); @@ -646,7 +639,6 @@ private void runDeferredCollections() throws IOException { subCollector.setScorer(scorer); } } - int docID; while ((docID = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { if (needsScores) { @@ -658,7 +650,6 @@ private void runDeferredCollections() throws IOException { collector.collect(docID); } } - deferredCollectors.postCollection(); } @@ -670,7 +661,7 @@ private LeafBucketCollector getSecondPassCollector(LeafBucketCollector subCollec @Override public void collect(int doc, long zeroBucket) throws IOException { assert zeroBucket == 0; - Integer slot = queue.compareCurrent(); + Integer slot = queue.getCurrentSlot(); if (slot != null) { // The candidate key is a top bucket. // We can defer the collection of this document/bucket to the sub collector diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java index e078ffca1597c..2c4d451322bca 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java @@ -47,6 +47,8 @@ /** * A specialized {@link PriorityQueue} implementation for composite buckets. + * Can think of this as a max heap that holds the top small buckets slots in order. + * Each slot holds the values of the composite bucket key it represents. * * @opensearch.internal */ @@ -77,7 +79,7 @@ public int hashCode() { private final BigArrays bigArrays; private final int maxSize; - private final Map map; + private final Map map; // to quickly find the slot for a value private final SingleDimensionValuesSource[] arrays; private LongArray docCounts; @@ -119,10 +121,10 @@ boolean isFull() { } /** - * Compares the current candidate with the values in the queue and returns + * Try to get the slot of the current/candidate values in the queue and returns * the slot if the candidate is already in the queue or null if the candidate is not present. */ - Integer compareCurrent() { + Integer getCurrentSlot() { return map.get(new Slot(CANDIDATE_SLOT)); } @@ -165,13 +167,11 @@ int compare(int slot1, int slot2) { assert slot2 != CANDIDATE_SLOT; for (int i = 0; i < arrays.length; i++) { final int cmp; - if (slot1 == CANDIDATE_SLOT) { cmp = arrays[i].compareCurrent(slot2); } else { cmp = arrays[i].compare(slot1, slot2); } - if (cmp != 0) { return cmp > 0 ? i + 1 : -(i + 1); } @@ -255,13 +255,11 @@ LeafBucketCollector getLeafCollector(Comparable forceLeadSourceValue, LeafReader while (last > 0) { collector = arrays[last--].getLeafCollector(context, collector); } - if (forceLeadSourceValue != null) { collector = arrays[last].getLeafCollector(forceLeadSourceValue, context, collector); } else { collector = arrays[last].getLeafCollector(context, collector); } - return collector; } @@ -283,9 +281,9 @@ boolean addIfCompetitive(long inc) { * * @throws CollectionTerminatedException if the current collection can be terminated early due to index sorting. */ - boolean addIfCompetitive(int indexSortSourcePrefix, long inc) { // TODO reading indexSortSourcePrefix can only be -1 + boolean addIfCompetitive(int indexSortSourcePrefix, long inc) { // checks if the candidate key is competitive - Integer curSlot = compareCurrent(); + Integer curSlot = getCurrentSlot(); if (curSlot != null) { // this key is already in the top N, skip it docCounts.increment(curSlot, inc); @@ -296,25 +294,23 @@ boolean addIfCompetitive(int indexSortSourcePrefix, long inc) { // TODO reading int cmp = compareCurrentWithAfter(); if (cmp <= 0) { if (indexSortSourcePrefix < 0 && cmp == indexSortSourcePrefix) { - // the leading index sort is in the reverse order of the leading source + // the leading index sort is and the leading source order are both reversed, // so we can early terminate when we reach a document that is smaller // than the after key (collected on a previous page). throw new CollectionTerminatedException(); } - // key was collected on a previous page, skip it (>= afterKey). + // the key was collected on a previous page, skip it. return false; } } - if (size() >= maxSize) { // TODO reading when queue is full, can check competitiveness - // the tree map is full, check if the candidate key should be kept // TODO reading queue contain topN largest composite - // key/bucket/slot + // the heap is full, check if the candidate key larger than max heap top + if (size() >= maxSize) { int cmp = compare(CANDIDATE_SLOT, top()); - if (cmp > 0) { // TODO reading current large than queue - if (cmp <= indexSortSourcePrefix) { // TODO reading the way of comparing current and queue uses sorted fields - // index sort guarantees that there is no key greater or equal than the - // current one in the subsequent documents so we can early terminate. // TODO reading how to get the topN smallest items - // using heap? + if (cmp > 0) { + if (cmp <= indexSortSourcePrefix) { + // index sort guarantees the following documents will have a key larger than the current candidate, + // so we can early terminate. throw new CollectionTerminatedException(); } // the candidate key is not competitive, skip it. @@ -330,9 +326,9 @@ boolean addIfCompetitive(int indexSortSourcePrefix, long inc) { // TODO reading // and we recycle the deleted slot newSlot = slot; } else { - newSlot = size(); // TODO reading seems we don't care the number of slot here? + newSlot = size(); } - // move the candidate key to its new slot + // move the candidate key to its new slot by copy its values to the new slot copyCurrent(newSlot, inc); map.put(new Slot(newSlot), newSlot); add(newSlot); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/PointsSortedDocsProducer.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/PointsSortedDocsProducer.java index 628fab55b5411..dc130eb54c0ea 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/PointsSortedDocsProducer.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/PointsSortedDocsProducer.java @@ -77,7 +77,6 @@ DocIdSet processLeaf(Query query, CompositeValuesCollectorQueue queue, LeafReade } lowerBucket = (Long) lowerValue; } - long upperBucket = Long.MAX_VALUE; Comparable upperValue = queue.getUpperValueLeadSource(); if (upperValue != null) { @@ -148,7 +147,8 @@ public void visit(int docID, byte[] packedValue) throws IOException { } long bucket = bucketFunction.applyAsLong(packedValue); - if (first == false && bucket != lastBucket) { // process previous bucket when new bucket appears + // process previous bucket when new bucket appears + if (first == false && bucket != lastBucket) { final DocIdSet docIdSet = bucketDocsBuilder.build(); if (processBucket(queue, context, docIdSet.iterator(), lastBucket, builder) && // lower bucket is inclusive diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/SortedDocsProducer.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/SortedDocsProducer.java index 95d3ecad31669..9442529bf9342 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/SortedDocsProducer.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/SortedDocsProducer.java @@ -75,10 +75,8 @@ protected boolean processBucket( ) throws IOException { final int[] topCompositeCollected = new int[1]; final boolean[] hasCollected = new boolean[1]; - final DocCountProvider docCountProvider = new DocCountProvider(); docCountProvider.setLeafReaderContext(context); - final LeafBucketCollector queueCollector = new LeafBucketCollector() { int lastDoc = -1; @@ -94,7 +92,7 @@ public void collect(int doc, long bucket) throws IOException { long docCount = docCountProvider.getDocCount(doc); if (queue.addIfCompetitive(docCount)) { topCompositeCollected[0]++; - if (adder != null && doc != lastDoc) { // TODO reading why doc can be == lastDoc? + if (adder != null && doc != lastDoc) { if (remainingBits == 0) { // the cost approximation was lower than the real size, we need to grow the adder // by some numbers (128) to ensure that we can add the extra documents @@ -108,7 +106,6 @@ public void collect(int doc, long bucket) throws IOException { } } }; - final Bits liveDocs = context.reader().getLiveDocs(); final LeafBucketCollector collector = queue.getLeafCollector(leadSourceBucket, context, queueCollector); while (iterator.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { @@ -116,7 +113,6 @@ public void collect(int doc, long bucket) throws IOException { collector.collect(iterator.docID()); } } - if (queue.isFull() && hasCollected[0] && topCompositeCollected[0] == 0) { return true; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java index 0ef546e98a5ba..5a0b6bffb4f38 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java @@ -162,7 +162,7 @@ private AutoDateHistogramAggregator( valuesSourceConfig.missing() != null, valuesSourceConfig.script() != null, valuesSourceConfig.fieldType(), - 0 + -1 ); FilterRewriteHelper.FilterContext filterContext = FilterRewriteHelper.buildFastFilterContext( parent(), diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java index 11fff40e0542b..74ed12b8d759d 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java @@ -121,7 +121,7 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg valuesSourceConfig.missing() != null, valuesSourceConfig.script() != null, valuesSourceConfig.fieldType(), - 0 + -1 ); FilterRewriteHelper.FilterContext filterContext = FilterRewriteHelper.buildFastFilterContext( parent, diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java index ef845c8435295..b581e552fec4f 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java @@ -2241,21 +2241,20 @@ private , V extends Comparable> void testRandomTerms( Function transformKey ) throws IOException { int numTerms = randomIntBetween(10, 500); - List terms = new ArrayList<>(); + List terms = new ArrayList<>(); // possible values for the terms for (int i = 0; i < numTerms; i++) { terms.add(randomSupplier.get()); } int numDocs = randomIntBetween(100, 200); List>> dataset = new ArrayList<>(); - - Set valuesSet = new HashSet<>(); - Map, AtomicLong> expectedDocCounts = new HashMap<>(); + Set valuesSet = new HashSet<>(); // how many different values + Map, AtomicLong> expectedDocCounts = new HashMap<>(); // how many docs for each value for (int i = 0; i < numDocs; i++) { int numValues = randomIntBetween(1, 5); Set values = new HashSet<>(); for (int j = 0; j < numValues; j++) { int rand = randomIntBetween(0, terms.size() - 1); - if (values.add(terms.get(rand))) { + if (values.add(terms.get(rand))) { // values are unique for one doc AtomicLong count = expectedDocCounts.computeIfAbsent(terms.get(rand), (k) -> new AtomicLong(0)); count.incrementAndGet(); valuesSet.add(terms.get(rand)); @@ -2263,9 +2262,8 @@ private , V extends Comparable> void testRandomTerms( } dataset.add(Collections.singletonMap(field, new ArrayList<>(values))); } - List expected = new ArrayList<>(valuesSet); + List expected = new ArrayList<>(valuesSet); // how many buckets expected Collections.sort(expected); - List> seen = new ArrayList<>(); AtomicBoolean finish = new AtomicBoolean(false); int size = randomIntBetween(1, expected.size());