Skip to content

Commit

Permalink
Fix aggs result of NestedAggregator with sub NestedAggregator (opense…
Browse files Browse the repository at this point in the history
…arch-project#13324)

Signed-off-by: kkewwei <[email protected]>
  • Loading branch information
kkewwei authored Jun 21, 2024
1 parent bcccedb commit f5dbbb0
Show file tree
Hide file tree
Showing 4 changed files with 399 additions and 34 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix the computed max shards of cluster to avoid int overflow ([#14155](https://github.com/opensearch-project/OpenSearch/pull/14155))
- Fixed rest-high-level client searchTemplate & mtermVectors endpoints to have a leading slash ([#14465](https://github.com/opensearch-project/OpenSearch/pull/14465))
- Write shard level metadata blob when snapshotting searchable snapshot indexes ([#13190](https://github.com/opensearch-project/OpenSearch/pull/13190))
- Fix aggs result of NestedAggregator with sub NestedAggregator ([#13324](https://github.com/opensearch-project/OpenSearch/pull/13324))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@
import org.apache.lucene.search.Weight;
import org.apache.lucene.search.join.BitSetProducer;
import org.apache.lucene.util.BitSet;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.lucene.search.Queries;
import org.opensearch.core.ParseField;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.mapper.ObjectMapper;
import org.opensearch.search.aggregations.Aggregator;
import org.opensearch.search.aggregations.AggregatorFactories;
Expand Down Expand Up @@ -88,12 +90,25 @@ public class NestedAggregator extends BucketsAggregator implements SingleBucketA
) throws IOException {
super(name, factories, context, parent, cardinality, metadata);

Query parentFilter = parentObjectMapper != null ? parentObjectMapper.nestedTypeFilter() : Queries.newNonNestedFilter();
Query parentFilter = isParent(parentObjectMapper, childObjectMapper, context.mapperService())
? parentObjectMapper.nestedTypeFilter()
: Queries.newNonNestedFilter();
this.parentFilter = context.bitsetFilterCache().getBitSetProducer(parentFilter);
this.childFilter = childObjectMapper.nestedTypeFilter();
this.collectsFromSingleBucket = cardinality.map(estimate -> estimate < 2);
}

private boolean isParent(ObjectMapper parentObjectMapper, ObjectMapper childObjectMapper, MapperService mapperService) {
if (parentObjectMapper == null) {
return false;
}
ObjectMapper parent;
do {
parent = childObjectMapper.getParentObjectMapper(mapperService);
} while (parent != null && parent != parentObjectMapper);
return parentObjectMapper == parent;
}

@Override
public LeafBucketCollector getLeafCollector(final LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException {
IndexReaderContext topLevelContext = ReaderUtil.getTopLevelContext(ctx);
Expand All @@ -107,20 +122,17 @@ public LeafBucketCollector getLeafCollector(final LeafReaderContext ctx, final L
if (collectsFromSingleBucket) {
return new LeafBucketCollectorBase(sub, null) {
@Override
public void collect(int parentDoc, long bucket) throws IOException {
// if parentDoc is 0 then this means that this parent doesn't have child docs (b/c these appear always before the parent
// doc), so we can skip:
if (parentDoc == 0 || parentDocs == null || childDocs == null) {
public void collect(int parentAggDoc, long bucket) throws IOException {
// parentAggDoc can be 0 when aggregation:
if (parentDocs == null || childDocs == null) {
return;
}

final int prevParentDoc = parentDocs.prevSetBit(parentDoc - 1);
int childDocId = childDocs.docID();
if (childDocId <= prevParentDoc) {
childDocId = childDocs.advance(prevParentDoc + 1);
}
Tuple<Integer, Integer> res = getParentAndChildId(parentDocs, childDocs, parentAggDoc);
int currentParentDoc = res.v1();
int childDocId = res.v2();

for (; childDocId < parentDoc; childDocId = childDocs.nextDoc()) {
for (; childDocId < currentParentDoc; childDocId = childDocs.nextDoc()) {
collectBucket(sub, childDocId, bucket);
}
}
Expand All @@ -130,6 +142,43 @@ public void collect(int parentDoc, long bucket) throws IOException {
}
}

/**
* In one case, it's talking about the parent doc (from the Lucene block-join standpoint),
* while in the other case, it's talking about a child doc ID (from the block-join standpoint)
* from the parent aggregation, where we're trying to aggregate over a sibling of that child.
* So, we need to map from that document to its parent, then join to the appropriate sibling.
*
* @param parentAggDoc the parent aggregation's current doc
* (which may or may not be a block-level parent doc)
* @return a tuple consisting of the current block-level parent doc (the parent of the
* parameter doc), and the next matching child doc (hopefully under this parent)
* for the aggregation (according to the child doc iterator).
*/
static Tuple<Integer, Integer> getParentAndChildId(BitSet parentDocs, DocIdSetIterator childDocs, int parentAggDoc) throws IOException {
int currentParentAggDoc;
int prevParentDoc = parentDocs.prevSetBit(parentAggDoc);
if (prevParentDoc == -1) {
currentParentAggDoc = parentDocs.nextSetBit(0);
} else if (prevParentDoc == parentAggDoc) {
// parentAggDoc is the parent of that child, and is belongs to parentDocs
currentParentAggDoc = parentAggDoc;
if (currentParentAggDoc == 0) {
prevParentDoc = -1;
} else {
prevParentDoc = parentDocs.prevSetBit(currentParentAggDoc - 1);
}
} else {
// parentAggDoc is the sibling of that child, and it means the block-join parent
currentParentAggDoc = parentDocs.nextSetBit(prevParentDoc + 1);
}

int childDocId = childDocs.docID();
if (childDocId <= prevParentDoc) {
childDocId = childDocs.advance(prevParentDoc + 1);
}
return Tuple.tuple(currentParentAggDoc, childDocId);
}

@Override
protected void preGetSubLeafCollectors(LeafReaderContext ctx) throws IOException {
super.preGetSubLeafCollectors(ctx);
Expand Down Expand Up @@ -191,9 +240,8 @@ public void setScorer(Scorable scorer) throws IOException {

@Override
public void collect(int parentDoc, long bucket) throws IOException {
// if parentDoc is 0 then this means that this parent doesn't have child docs (b/c these appear always before the parent
// doc), so we can skip:
if (parentDoc == 0 || parentDocs == null || childDocs == null) {
// parentAggDoc can be 0 when aggregation:
if (parentDocs == null || childDocs == null) {
return;
}

Expand All @@ -214,11 +262,9 @@ void processBufferedChildBuckets() throws IOException {
return;
}

final int prevParentDoc = parentDocs.prevSetBit(currentParentDoc - 1);
int childDocId = childDocs.docID();
if (childDocId <= prevParentDoc) {
childDocId = childDocs.advance(prevParentDoc + 1);
}
Tuple<Integer, Integer> res = getParentAndChildId(parentDocs, childDocs, currentParentDoc);
int currentParentDoc = res.v1();
int childDocId = res.v2();

for (; childDocId < currentParentDoc; childDocId = childDocs.nextDoc()) {
cachedScorer.doc = childDocId;
Expand Down
Loading

0 comments on commit f5dbbb0

Please sign in to comment.