Skip to content

Commit

Permalink
Allow composite aggregation under filter or reverse_nested parent (op…
Browse files Browse the repository at this point in the history
…ensearch-project#11499)

* Allow composite aggregation under filter parent

Composite aggregations are able to run under a filter aggregation
with no change required (other than not throwing an exception).

Also cleaned up FilterAggregatorFactory a little.

Signed-off-by: Michael Froh <[email protected]>

* Add changelog entry

Signed-off-by: Michael Froh <[email protected]>

* Add support for reverse nested agg too

Signed-off-by: Michael Froh <[email protected]>

* Add unit test coverage

Signed-off-by: Michael Froh <[email protected]>

* Skip new tests in pre-3.0 mixed cluster

Signed-off-by: Michael Froh <[email protected]>

---------

Signed-off-by: Michael Froh <[email protected]>
  • Loading branch information
msfroh authored and rayshrey committed Mar 18, 2024
1 parent bd380ba commit 1416bc6
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Change error message when per shard document limit is breached ([#11312](https://github.com/opensearch-project/OpenSearch/pull/11312))
- Improve boolean parsing performance ([#11308](https://github.com/opensearch-project/OpenSearch/pull/11308))
- Interpret byte array as primitive using VarHandles ([#11362](https://github.com/opensearch-project/OpenSearch/pull/11362))
- Allow composite aggregation to run under a parent filter aggregation ([#11499](https://github.com/opensearch-project/OpenSearch/pull/11499))
- Automatically add scheme to discovery.ec2.endpoint ([#11512](https://github.com/opensearch-project/OpenSearch/pull/11512))
- Restore support for Java 8 for RestClient ([#11562](https://github.com/opensearch-project/OpenSearch/pull/11562))
- Add deleted doc count in _cat/shards ([#11678](https://github.com/opensearch-project/OpenSearch/pull/11678))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,134 @@ setup:
- match: { aggregations.1.2.buckets.1.key.nested: 1000 }
- match: { aggregations.1.2.buckets.1.doc_count: 1 }

---
"Composite aggregation with filtered nested parent":
- skip:
version: " - 2.99.99"
reason: fixed in 3.0.0
- do:
search:
rest_total_hits_as_int: true
index: test
body:
aggregations:
1:
nested:
path: nested
aggs:
2:
filter:
range:
nested.nested_long:
gt: 0
lt: 100
aggs:
3:
composite:
sources: [
"nested": {
"terms": {
"field": "nested.nested_long"
}
}
]

- match: {hits.total: 6}
- length: { aggregations.1.2.3.buckets: 2 }
- match: { aggregations.1.2.3.buckets.0.key.nested: 10 }
- match: { aggregations.1.2.3.buckets.0.doc_count: 2 }
- match: { aggregations.1.2.3.buckets.1.key.nested: 20 }
- match: { aggregations.1.2.3.buckets.1.doc_count: 2 }
- do:
search:
rest_total_hits_as_int: true
index: test
body:
aggregations:
1:
nested:
path: nested
aggs:
2:
filter:
range:
nested.nested_long:
gt: 0
lt: 100
aggs:
3:
composite:
after: { "nested": 10 }
sources: [
"nested": {
"terms": {
"field": "nested.nested_long"
}
}
]
- match: {hits.total: 6}
- length: { aggregations.1.2.3.buckets: 1 }
- match: { aggregations.1.2.3.buckets.0.key.nested: 20 }
- match: { aggregations.1.2.3.buckets.0.doc_count: 2 }

---
"Composite aggregation with filtered reverse nested parent":
- skip:
version: " - 2.99.99"
reason: fixed in 3.0.0
- do:
search:
rest_total_hits_as_int: true
index: test
body:
aggregations:
1:
nested:
path: nested
aggs:
2:
filter:
range:
nested.nested_long:
gt: 0
lt: 20
aggs:
3:
reverse_nested: {}
aggs:
4:
composite:
sources: [
{
"long": {
"terms": {
"field": "long"
}
}
},
{
"kw": {
"terms": {
"field": "keyword"
}
}
}
]
- match: {hits.total: 6}
- length: { aggregations.1.2.3.4.buckets: 4 }
- match: { aggregations.1.2.3.4.buckets.0.key.long: 0 }
- match: { aggregations.1.2.3.4.buckets.0.key.kw: "bar" }
- match: { aggregations.1.2.3.4.buckets.0.doc_count: 1 }
- match: { aggregations.1.2.3.4.buckets.1.key.long: 10 }
- match: { aggregations.1.2.3.4.buckets.1.key.kw: "foo" }
- match: { aggregations.1.2.3.4.buckets.1.doc_count: 1 }
- match: { aggregations.1.2.3.4.buckets.2.key.long: 20 }
- match: { aggregations.1.2.3.4.buckets.2.key.kw: "foo" }
- match: { aggregations.1.2.3.4.buckets.2.doc_count: 1 }
- match: { aggregations.1.2.3.4.buckets.3.key.long: 100 }
- match: { aggregations.1.2.3.4.buckets.3.key.kw: "bar" }
- match: { aggregations.1.2.3.4.buckets.3.doc_count: 1 }

---
"Composite aggregation with unmapped field":
- skip:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@
import org.opensearch.search.aggregations.AggregationBuilder;
import org.opensearch.search.aggregations.AggregatorFactories;
import org.opensearch.search.aggregations.AggregatorFactory;
import org.opensearch.search.aggregations.bucket.filter.FilterAggregatorFactory;
import org.opensearch.search.aggregations.bucket.nested.NestedAggregatorFactory;
import org.opensearch.search.aggregations.bucket.nested.ReverseNestedAggregatorFactory;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;

import java.io.IOException;
Expand Down Expand Up @@ -240,14 +242,16 @@ public BucketCardinality bucketCardinality() {
* this aggregator or the instance of the parent's factory that is incompatible with
* the composite aggregation.
*/
private AggregatorFactory checkParentIsNullOrNested(AggregatorFactory factory) {
private static AggregatorFactory checkParentIsSafe(AggregatorFactory factory) {
if (factory == null) {
return null;
} else if (factory instanceof NestedAggregatorFactory) {
return checkParentIsNullOrNested(factory.getParent());
} else {
return factory;
}
} else if (factory instanceof NestedAggregatorFactory
|| factory instanceof FilterAggregatorFactory
|| factory instanceof ReverseNestedAggregatorFactory) {
return checkParentIsSafe(factory.getParent());
} else {
return factory;
}
}

private static void validateSources(List<CompositeValuesSourceBuilder<?>> sources) {
Expand Down Expand Up @@ -278,7 +282,7 @@ protected AggregatorFactory doBuild(
AggregatorFactory parent,
AggregatorFactories.Builder subfactoriesBuilder
) throws IOException {
AggregatorFactory invalid = checkParentIsNullOrNested(parent);
AggregatorFactory invalid = checkParentIsSafe(parent);
if (invalid != null) {
throw new IllegalArgumentException(
"[composite] aggregation cannot be used with a parent aggregation of"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
public class FilterAggregatorFactory extends AggregatorFactory {

private Weight weight;
private Query filter;
private final Query filter;

public FilterAggregatorFactory(
String name,
Expand Down Expand Up @@ -85,7 +85,7 @@ public Weight getWeight() {
try {
weight = contextSearcher.createWeight(contextSearcher.rewrite(filter), ScoreMode.COMPLETE_NO_SCORES, 1f);
} catch (IOException e) {
throw new AggregationInitializationException("Failed to initialse filter", e);
throw new AggregationInitializationException("Failed to initialise filter", e);
}
}
return weight;
Expand All @@ -98,7 +98,7 @@ public Aggregator createInternal(
CardinalityUpperBound cardinality,
Map<String, Object> metadata
) throws IOException {
return new FilterAggregator(name, () -> this.getWeight(), factories, searchContext, parent, cardinality, metadata);
return new FilterAggregator(name, this::getWeight, factories, searchContext, parent, cardinality, metadata);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.TermQuery;
import org.opensearch.OpenSearchParseException;
import org.opensearch.index.query.MatchAllQueryBuilder;
import org.opensearch.search.aggregations.AggregationBuilders;
import org.opensearch.search.aggregations.Aggregator;
import org.opensearch.search.aggregations.bucket.filter.FilterAggregationBuilder;
import org.opensearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.opensearch.search.aggregations.bucket.missing.MissingOrder;
import org.opensearch.search.aggregations.bucket.terms.StringTerms;
Expand Down Expand Up @@ -2460,4 +2463,41 @@ public void testIndexSortWithDuplicate() throws Exception {
);
}
}

public void testUnderFilterAggregator() throws IOException {
executeTestCase(false, false, new MatchAllDocsQuery(), Collections.emptyList(), () -> {
FilterAggregationBuilder filterAggregatorBuilder = new FilterAggregationBuilder(
"filter_mcmilterface",
new MatchAllQueryBuilder()
);
filterAggregatorBuilder.subAggregation(
new CompositeAggregationBuilder(
"compo",
Collections.singletonList(new TermsValuesSourceBuilder("keyword").field("keyword"))
)
);
return filterAggregatorBuilder;
}, (ic) -> {});
}

public void testUnderBucketAggregator() throws IOException {
try {
executeTestCase(false, false, new MatchAllDocsQuery(), Collections.emptyList(), () -> {
TermsAggregationBuilder termsAggregationBuilder = AggregationBuilders.terms("terms").field("keyword");
termsAggregationBuilder.subAggregation(
new CompositeAggregationBuilder(
"compo",
Collections.singletonList(new TermsValuesSourceBuilder("keyword").field("keyword"))
)
);
return termsAggregationBuilder;
}, (ic) -> {});
fail("Should have thrown an IllegalArgumentException");
} catch (IllegalArgumentException iae) {
assertTrue(
iae.getMessage()
.contains("[composite] aggregation cannot be used with a parent aggregation of type: [TermsAggregatorFactory]")
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@
import org.opensearch.index.mapper.MappedFieldType;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.mapper.NumberFieldMapper;
import org.opensearch.search.aggregations.AggregationBuilder;
import org.opensearch.search.aggregations.AggregatorTestCase;
import org.opensearch.search.aggregations.InternalAggregation;
import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder;
import org.opensearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder;
import org.opensearch.search.aggregations.bucket.composite.InternalComposite;
Expand Down Expand Up @@ -139,12 +141,16 @@ protected void executeTestCase(
boolean useIndexSort,
Query query,
List<Map<String, List<Object>>> dataset,
Supplier<CompositeAggregationBuilder> create,
Supplier<? extends AggregationBuilder> create,
Consumer<InternalComposite> verify
) throws IOException {
Map<String, MappedFieldType> types = FIELD_TYPES.stream().collect(Collectors.toMap(MappedFieldType::name, Function.identity()));
CompositeAggregationBuilder aggregationBuilder = create.get();
Sort indexSort = useIndexSort ? buildIndexSort(aggregationBuilder.sources(), types) : null;
AggregationBuilder aggregationBuilder = create.get();
Sort indexSort = null;
if (aggregationBuilder instanceof CompositeAggregationBuilder && useIndexSort) {
CompositeAggregationBuilder cab = (CompositeAggregationBuilder) aggregationBuilder;
indexSort = buildIndexSort(cab.sources(), types);
}
IndexSettings indexSettings = createIndexSettings(indexSort);
try (Directory directory = newDirectory()) {
IndexWriterConfig config = newIndexWriterConfig(random(), new MockAnalyzer(random()));
Expand Down Expand Up @@ -180,14 +186,16 @@ protected void executeTestCase(
}
try (IndexReader indexReader = DirectoryReader.open(directory)) {
IndexSearcher indexSearcher = new IndexSearcher(indexReader);
InternalComposite composite = searchAndReduce(
InternalAggregation aggregation = searchAndReduce(
indexSettings,
indexSearcher,
query,
aggregationBuilder,
FIELD_TYPES.toArray(new MappedFieldType[0])
);
verify.accept(composite);
if (aggregation instanceof InternalComposite) {
verify.accept((InternalComposite) aggregation);
}
}
}
}
Expand Down

0 comments on commit 1416bc6

Please sign in to comment.