From addb3320ea8bfe24dbd7c950d29f68d32d5a927e Mon Sep 17 00:00:00 2001 From: Sandesh Kumar Date: Sun, 25 Aug 2024 02:59:43 -0700 Subject: [PATCH] Star Tree Search request/response changes Signed-off-by: Sandesh Kumar --- .../index/query/QueryShardContext.java | 95 ++++++ .../org/opensearch/search/SearchService.java | 68 ++++- .../aggregations/AggregatorFactories.java | 6 +- .../aggregations/AggregatorFactory.java | 4 + .../aggregations/metrics/AvgAggregator.java | 66 ++++ .../metrics/AvgAggregatorFactory.java | 2 +- .../aggregations/metrics/MaxAggregator.java | 44 +++ .../metrics/MaxAggregatorFactory.java | 2 +- .../aggregations/metrics/MinAggregator.java | 42 +++ .../metrics/MinAggregatorFactory.java | 2 +- .../metrics/NumericMetricsAggregator.java | 19 ++ .../aggregations/metrics/SumAggregator.java | 59 +++- .../metrics/SumAggregatorFactory.java | 2 +- .../metrics/ValueCountAggregationBuilder.java | 2 +- .../metrics/ValueCountAggregator.java | 34 +++ .../metrics/ValueCountAggregatorFactory.java | 2 +- .../aggregations/support/ValuesSource.java | 4 + .../ValuesSourceAggregatorFactory.java | 12 + .../startree/OriginalOrStarTreeQuery.java | 83 +++++ .../search/startree/StarTreeFilter.java | 289 ++++++++++++++++++ .../search/startree/StarTreeQuery.java | 116 +++++++ .../search/startree/package-info.java | 10 + .../StarTreeDocValuesFormatTests.java | 10 +- .../startree/MetricAggregatorTests.java | 212 +++++++++++++ .../aggregations/AggregatorTestCase.java | 70 +++++ 25 files changed, 1235 insertions(+), 20 deletions(-) create mode 100644 server/src/main/java/org/opensearch/search/startree/OriginalOrStarTreeQuery.java create mode 100644 server/src/main/java/org/opensearch/search/startree/StarTreeFilter.java create mode 100644 server/src/main/java/org/opensearch/search/startree/StarTreeQuery.java create mode 100644 server/src/main/java/org/opensearch/search/startree/package-info.java create mode 100644 server/src/test/java/org/opensearch/search/aggregations/startree/MetricAggregatorTests.java diff --git a/server/src/main/java/org/opensearch/index/query/QueryShardContext.java b/server/src/main/java/org/opensearch/index/query/QueryShardContext.java index 91313092d8d28..8d2421517fdad 100644 --- a/server/src/main/java/org/opensearch/index/query/QueryShardContext.java +++ b/server/src/main/java/org/opensearch/index/query/QueryShardContext.java @@ -56,7 +56,12 @@ import org.opensearch.index.IndexSortConfig; import org.opensearch.index.analysis.IndexAnalyzers; import org.opensearch.index.cache.bitset.BitsetFilterCache; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.compositeindex.datacube.Dimension; +import org.opensearch.index.compositeindex.datacube.Metric; +import org.opensearch.index.compositeindex.datacube.MetricStat; import org.opensearch.index.fielddata.IndexFieldData; +import org.opensearch.index.mapper.CompositeDataCubeFieldType; import org.opensearch.index.mapper.ContentPath; import org.opensearch.index.mapper.DerivedFieldResolver; import org.opensearch.index.mapper.DerivedFieldResolverFactory; @@ -73,12 +78,22 @@ import org.opensearch.script.ScriptContext; import org.opensearch.script.ScriptFactory; import org.opensearch.script.ScriptService; +import org.opensearch.search.aggregations.AggregatorFactory; +import org.opensearch.search.aggregations.metrics.AvgAggregatorFactory; +import org.opensearch.search.aggregations.metrics.MaxAggregatorFactory; +import org.opensearch.search.aggregations.metrics.MinAggregatorFactory; +import org.opensearch.search.aggregations.metrics.SumAggregatorFactory; +import org.opensearch.search.aggregations.metrics.ValueCountAggregatorFactory; import org.opensearch.search.aggregations.support.AggregationUsageService; +import org.opensearch.search.aggregations.support.ValuesSourceAggregatorFactory; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; import org.opensearch.search.lookup.SearchLookup; +import org.opensearch.search.startree.OriginalOrStarTreeQuery; +import org.opensearch.search.startree.StarTreeQuery; import org.opensearch.transport.RemoteClusterAware; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -89,6 +104,7 @@ import java.util.function.LongSupplier; import java.util.function.Predicate; import java.util.function.Supplier; +import java.util.stream.Collectors; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; @@ -522,6 +538,85 @@ private ParsedQuery toQuery(QueryBuilder queryBuilder, CheckedFunction>> predicateMap; + + if (queryBuilder == null) { + predicateMap = null; + } else if (queryBuilder instanceof TermQueryBuilder) { + List supportedDimensions = compositeIndexFieldInfo.getDimensions() + .stream() + .map(Dimension::getField) + .collect(Collectors.toList()); + predicateMap = getStarTreePredicates(queryBuilder, supportedDimensions); + } else { + return null; + } + + StarTreeQuery starTreeQuery = new StarTreeQuery(starTree, predicateMap); + OriginalOrStarTreeQuery originalOrStarTreeQuery = new OriginalOrStarTreeQuery(starTreeQuery, query); + return new ParsedQuery(originalOrStarTreeQuery); + } + + /** + * Parse query body to star-tree predicates + * @param queryBuilder + * @return predicates to match + */ + private Map>> getStarTreePredicates(QueryBuilder queryBuilder, List supportedDimensions) { + TermQueryBuilder tq = (TermQueryBuilder) queryBuilder; + String field = tq.fieldName(); + if (supportedDimensions.contains(field) == false) { + throw new IllegalArgumentException("unsupported field in star-tree"); + } + long inputQueryVal = Long.parseLong(tq.value().toString()); + + // Get or create the list of predicates for the given field + Map>> predicateMap = new HashMap<>(); + List> predicates = predicateMap.getOrDefault(field, new ArrayList<>()); + + // Create a predicate to match the input query value + Predicate predicate = dimVal -> dimVal == inputQueryVal; + predicates.add(predicate); + + // Put the predicates list back into the map + predicateMap.put(field, predicates); + return predicateMap; + } + + public boolean validateStarTreeMetricSuport(CompositeDataCubeFieldType compositeIndexFieldInfo, AggregatorFactory aggregatorFactory) { + String field; + Map> supportedMetrics = compositeIndexFieldInfo.getMetrics() + .stream() + .collect(Collectors.toMap(Metric::getField, Metric::getMetrics)); + + // Map to associate supported AggregatorFactory classes with their corresponding MetricStat + Map, MetricStat> aggregatorStatMap = Map.of( + SumAggregatorFactory.class, + MetricStat.SUM, + MaxAggregatorFactory.class, + MetricStat.MAX, + MinAggregatorFactory.class, + MetricStat.MIN, + ValueCountAggregatorFactory.class, + MetricStat.VALUE_COUNT, + AvgAggregatorFactory.class, + MetricStat.AVG + ); + + MetricStat metricStat = aggregatorStatMap.get(aggregatorFactory.getClass()); + if (metricStat != null) { + field = ((ValuesSourceAggregatorFactory) aggregatorFactory).getField(); + return supportedMetrics.containsKey(field) && supportedMetrics.get(field).contains(metricStat); + } + return false; + } + public Index index() { return indexSettings.getIndex(); } diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index a53a7198c366f..c51345b33be3f 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -77,12 +77,16 @@ import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; import org.opensearch.index.engine.Engine; +import org.opensearch.index.mapper.CompositeDataCubeFieldType; import org.opensearch.index.mapper.DerivedFieldResolver; import org.opensearch.index.mapper.DerivedFieldResolverFactory; +import org.opensearch.index.mapper.StarTreeMapper; import org.opensearch.index.query.InnerHitContextBuilder; import org.opensearch.index.query.MatchAllQueryBuilder; import org.opensearch.index.query.MatchNoneQueryBuilder; +import org.opensearch.index.query.ParsedQuery; import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.QueryRewriteContext; import org.opensearch.index.query.QueryShardContext; @@ -97,11 +101,13 @@ import org.opensearch.script.ScriptService; import org.opensearch.search.aggregations.AggregationInitializationException; import org.opensearch.search.aggregations.AggregatorFactories; +import org.opensearch.search.aggregations.AggregatorFactory; import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.InternalAggregation.ReduceContext; import org.opensearch.search.aggregations.MultiBucketConsumerService; import org.opensearch.search.aggregations.SearchContextAggregations; import org.opensearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree; +import org.opensearch.search.aggregations.support.ValuesSourceAggregatorFactory; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.search.collapse.CollapseContext; import org.opensearch.search.dfs.DfsPhase; @@ -162,6 +168,7 @@ import static org.opensearch.common.unit.TimeValue.timeValueHours; import static org.opensearch.common.unit.TimeValue.timeValueMillis; import static org.opensearch.common.unit.TimeValue.timeValueMinutes; +import static org.opensearch.search.internal.SearchContext.TRACK_TOTAL_HITS_DISABLED; /** * The main search service @@ -1314,6 +1321,10 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc context.evaluateRequestShouldUseConcurrentSearch(); return; } + // Can be marked false for majority cases for which star-tree cannot be used + // As we increment the cases where star-tree can be used, this can be set back to true + boolean canUseStarTree = context.mapperService().isCompositeIndexPresent(); + SearchShardTarget shardTarget = context.shardTarget(); QueryShardContext queryShardContext = context.getQueryShardContext(); context.from(source.from()); @@ -1324,10 +1335,12 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc context.parsedQuery(queryShardContext.toQuery(source.query())); } if (source.postFilter() != null) { + canUseStarTree = false; InnerHitContextBuilder.extractInnerHits(source.postFilter(), innerHitBuilders); context.parsedPostFilter(queryShardContext.toQuery(source.postFilter())); } - if (innerHitBuilders.size() > 0) { + if (!innerHitBuilders.isEmpty()) { + canUseStarTree = false; for (Map.Entry entry : innerHitBuilders.entrySet()) { try { entry.getValue().build(context, context.innerHits()); @@ -1337,11 +1350,10 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc } } if (source.sorts() != null) { + canUseStarTree = false; try { Optional optionalSort = SortBuilder.buildSort(source.sorts(), context.getQueryShardContext()); - if (optionalSort.isPresent()) { - context.sort(optionalSort.get()); - } + optionalSort.ifPresent(context::sort); } catch (IOException e) { throw new SearchException(shardTarget, "failed to create sort elements", e); } @@ -1355,8 +1367,10 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc } if (source.trackTotalHitsUpTo() != null) { context.trackTotalHitsUpTo(source.trackTotalHitsUpTo()); + canUseStarTree = canUseStarTree && (source.trackTotalHitsUpTo() == TRACK_TOTAL_HITS_DISABLED); } if (source.minScore() != null) { + canUseStarTree = false; context.minimumScore(source.minScore()); } if (source.timeout() != null) { @@ -1496,6 +1510,50 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc if (source.profile()) { context.setProfilers(new Profilers(context.searcher(), context.shouldUseConcurrentSearch())); } + + if (canUseStarTree) { + try { + setStarTreeQuery(context, queryShardContext, source); + logger.debug("can use star tree"); + } catch (IOException e) { + logger.debug("not using star tree"); + } + } + } + + private boolean setStarTreeQuery(SearchContext context, QueryShardContext queryShardContext, SearchSourceBuilder source) + throws IOException { + + if (source.aggregations() == null) { + return false; + } + + // Current implementation assumes only single star-tree is supported + CompositeDataCubeFieldType compositeMappedFieldType = (StarTreeMapper.StarTreeFieldType) context.mapperService() + .getCompositeFieldTypes() + .iterator() + .next(); + CompositeIndexFieldInfo starTree = new CompositeIndexFieldInfo( + compositeMappedFieldType.name(), + compositeMappedFieldType.getCompositeIndexType() + ); + + ParsedQuery newParsedQuery = queryShardContext.toStarTreeQuery(starTree, compositeMappedFieldType, source.query(), context.query()); + if (newParsedQuery == null) { + return false; + } + + for (AggregatorFactory aggregatorFactory : context.aggregations().factories().getFactories()) { + if (!(aggregatorFactory instanceof ValuesSourceAggregatorFactory + && aggregatorFactory.getSubFactories().getFactories().length == 0)) { + return false; + } + if (queryShardContext.validateStarTreeMetricSuport(compositeMappedFieldType, aggregatorFactory) == false) { + return false; + } + } + context.parsedQuery(newParsedQuery); + return true; } /** @@ -1655,7 +1713,7 @@ public static boolean canMatchSearchAfter( && minMax != null && primarySortField != null && primarySortField.missing() == null - && Objects.equals(trackTotalHitsUpto, SearchContext.TRACK_TOTAL_HITS_DISABLED)) { + && Objects.equals(trackTotalHitsUpto, TRACK_TOTAL_HITS_DISABLED)) { final Object searchAfterPrimary = searchAfter.fields[0]; if (primarySortField.order() == SortOrder.DESC) { if (minMax.compareMin(searchAfterPrimary) > 0) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactories.java b/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactories.java index eeb0c606694b0..dfcb245ef3656 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactories.java +++ b/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactories.java @@ -255,7 +255,7 @@ public static Builder builder() { return new Builder(); } - private AggregatorFactories(AggregatorFactory[] factories) { + public AggregatorFactories(AggregatorFactory[] factories) { this.factories = factories; } @@ -661,4 +661,8 @@ public PipelineTree buildPipelineTree() { return new PipelineTree(subTrees, aggregators); } } + + public AggregatorFactory[] getFactories() { + return factories; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactory.java index 6cc3a78fb1e36..86fbb46a9ad3c 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactory.java @@ -127,4 +127,8 @@ protected boolean supportsConcurrentSegmentSearch() { public boolean evaluateChildFactories() { return factories.allFactoriesSupportConcurrentSearch(); } + + public AggregatorFactories getSubFactories() { + return factories; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java index e58466b56df2a..adafcc82d86d4 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java @@ -32,11 +32,16 @@ package org.opensearch.search.aggregations.metrics; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SortedNumericDocValues; import org.apache.lucene.search.ScoreMode; import org.opensearch.common.lease.Releasables; import org.opensearch.common.util.BigArrays; import org.opensearch.common.util.DoubleArray; import org.opensearch.common.util.LongArray; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.compositeindex.datacube.MetricStat; +import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues; +import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeUtils; import org.opensearch.index.fielddata.SortedNumericDoubleValues; import org.opensearch.search.DocValueFormat; import org.opensearch.search.aggregations.Aggregator; @@ -46,6 +51,8 @@ import org.opensearch.search.aggregations.support.ValuesSource; import org.opensearch.search.aggregations.support.ValuesSourceConfig; import org.opensearch.search.internal.SearchContext; +import org.opensearch.search.startree.OriginalOrStarTreeQuery; +import org.opensearch.search.startree.StarTreeQuery; import java.io.IOException; import java.util.Map; @@ -93,6 +100,15 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc if (valuesSource == null) { return LeafBucketCollector.NO_OP_COLLECTOR; } + if (context.query() instanceof OriginalOrStarTreeQuery && ((OriginalOrStarTreeQuery) context.query()).isStarTreeUsed()) { + StarTreeQuery starTreeQuery = ((OriginalOrStarTreeQuery) context.query()).getStarTreeQuery(); + return getStarTreeLeafCollector(ctx, sub, starTreeQuery.getStarTree()); + } + return getDefaultLeafCollector(ctx, sub); + } + + private LeafBucketCollector getDefaultLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { + final BigArrays bigArrays = context.bigArrays(); final SortedNumericDoubleValues values = valuesSource.doubleValues(ctx); final CompensatedSum kahanSummation = new CompensatedSum(0, 0); @@ -126,6 +142,56 @@ public void collect(int doc, long bucket) throws IOException { }; } + private LeafBucketCollector getStarTreeLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub, CompositeIndexFieldInfo starTree) + throws IOException { + final BigArrays bigArrays = context.bigArrays(); + final CompensatedSum kahanSummation = new CompensatedSum(0, 0); + + StarTreeValues starTreeValues = getStarTreeValues(ctx, starTree); + String fieldName = ((ValuesSource.Numeric.FieldData) valuesSource).getIndexFieldName(); + String sumMetricName = StarTreeUtils.fullyQualifiedFieldNameForStarTreeMetricsDocValues( + starTree.getField(), + fieldName, + MetricStat.SUM.getTypeName() + ); + SortedNumericDocValues values = (SortedNumericDocValues) starTreeValues.getMetricDocValuesIteratorMap().get(sumMetricName); + + String countMetricName = StarTreeUtils.fullyQualifiedFieldNameForStarTreeMetricsDocValues( + starTree.getField(), + fieldName, + MetricStat.VALUE_COUNT.getTypeName() + ); + SortedNumericDocValues countValues = (SortedNumericDocValues) starTreeValues.getMetricDocValuesIteratorMap().get(countMetricName); + + return new LeafBucketCollectorBase(sub, values) { + @Override + public void collect(int doc, long bucket) throws IOException { + counts = bigArrays.grow(counts, bucket + 1); + sums = bigArrays.grow(sums, bucket + 1); + compensations = bigArrays.grow(compensations, bucket + 1); + + if (values.advanceExact(doc) && countValues.advanceExact(doc)) { + final long valueCount = values.docValueCount(); + counts.increment(bucket, countValues.nextValue()); + // Compute the sum of double values with Kahan summation algorithm which is more + // accurate than naive summation. + double sum = sums.get(bucket); + double compensation = compensations.get(bucket); + + kahanSummation.reset(sum, compensation); + + for (int i = 0; i < valueCount; i++) { + double value = Double.longBitsToDouble(values.nextValue()); + kahanSummation.add(value); + } + + sums.set(bucket, kahanSummation.value()); + compensations.set(bucket, kahanSummation.delta()); + } + } + }; + } + @Override public double metric(long owningBucketOrd) { if (valuesSource == null || owningBucketOrd >= sums.size()) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregatorFactory.java index 0a09fae1eaebe..d8fe51b153819 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregatorFactory.java @@ -52,7 +52,7 @@ * * @opensearch.internal */ -class AvgAggregatorFactory extends ValuesSourceAggregatorFactory { +public class AvgAggregatorFactory extends ValuesSourceAggregatorFactory { AvgAggregatorFactory( String name, diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java index 8108b8a726856..bb4301c34d089 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java @@ -34,12 +34,16 @@ import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.PointValues; +import org.apache.lucene.index.SortedNumericDocValues; import org.apache.lucene.search.CollectionTerminatedException; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.util.Bits; import org.opensearch.common.lease.Releasables; import org.opensearch.common.util.BigArrays; import org.opensearch.common.util.DoubleArray; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues; +import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeUtils; import org.opensearch.index.fielddata.NumericDoubleValues; import org.opensearch.index.fielddata.SortedNumericDoubleValues; import org.opensearch.search.DocValueFormat; @@ -51,6 +55,8 @@ import org.opensearch.search.aggregations.support.ValuesSource; import org.opensearch.search.aggregations.support.ValuesSourceConfig; import org.opensearch.search.internal.SearchContext; +import org.opensearch.search.startree.OriginalOrStarTreeQuery; +import org.opensearch.search.startree.StarTreeQuery; import java.io.IOException; import java.util.Arrays; @@ -120,6 +126,16 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc throw new CollectionTerminatedException(); } } + + if (context.query() instanceof OriginalOrStarTreeQuery && ((OriginalOrStarTreeQuery) context.query()).isStarTreeUsed()) { + StarTreeQuery starTreeQuery = ((OriginalOrStarTreeQuery) context.query()).getStarTreeQuery(); + return getStarTreeLeafCollector(ctx, sub, starTreeQuery.getStarTree()); + } + return getDefaultLeafCollector(ctx, sub); + } + + private LeafBucketCollector getDefaultLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { + final BigArrays bigArrays = context.bigArrays(); final SortedNumericDoubleValues allValues = valuesSource.doubleValues(ctx); final NumericDoubleValues values = MultiValueMode.MAX.select(allValues); @@ -143,6 +159,34 @@ public void collect(int doc, long bucket) throws IOException { }; } + private LeafBucketCollector getStarTreeLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub, CompositeIndexFieldInfo starTree) + throws IOException { + StarTreeValues starTreeValues = getStarTreeValues(ctx, starTree); + String fieldName = ((ValuesSource.Numeric.FieldData) valuesSource).getIndexFieldName(); + String metricName = StarTreeUtils.fullyQualifiedFieldNameForStarTreeMetricsDocValues(starTree.getField(), fieldName, "max"); + SortedNumericDocValues values = (SortedNumericDocValues) starTreeValues.getMetricDocValuesIteratorMap().get(metricName); + + final BigArrays bigArrays = context.bigArrays(); + final SortedNumericDoubleValues allValues = valuesSource.doubleValues(ctx); + return new LeafBucketCollectorBase(sub, allValues) { + + @Override + public void collect(int doc, long bucket) throws IOException { + if (bucket >= maxes.size()) { + long from = maxes.size(); + maxes = bigArrays.grow(maxes, bucket + 1); + maxes.fill(from, maxes.size(), Double.NEGATIVE_INFINITY); + } + if (values.advanceExact(doc)) { + final double value = Double.longBitsToDouble(values.nextValue()); + double max = maxes.get(bucket); + max = Math.max(max, value); + maxes.set(bucket, max); + } + } + }; + } + @Override public double metric(long owningBucketOrd) { if (valuesSource == null || owningBucketOrd >= maxes.size()) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregatorFactory.java index 4fe936c8b7797..0d537745126d3 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregatorFactory.java @@ -52,7 +52,7 @@ * * @opensearch.internal */ -class MaxAggregatorFactory extends ValuesSourceAggregatorFactory { +public class MaxAggregatorFactory extends ValuesSourceAggregatorFactory { static void registerAggregators(ValuesSourceRegistry.Builder builder) { builder.register( diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java index 946057e42ac88..31e0ace591582 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java @@ -34,12 +34,16 @@ import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.PointValues; +import org.apache.lucene.index.SortedNumericDocValues; import org.apache.lucene.search.CollectionTerminatedException; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.util.Bits; import org.opensearch.common.lease.Releasables; import org.opensearch.common.util.BigArrays; import org.opensearch.common.util.DoubleArray; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues; +import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeUtils; import org.opensearch.index.fielddata.NumericDoubleValues; import org.opensearch.index.fielddata.SortedNumericDoubleValues; import org.opensearch.search.DocValueFormat; @@ -51,6 +55,8 @@ import org.opensearch.search.aggregations.support.ValuesSource; import org.opensearch.search.aggregations.support.ValuesSourceConfig; import org.opensearch.search.internal.SearchContext; +import org.opensearch.search.startree.OriginalOrStarTreeQuery; +import org.opensearch.search.startree.StarTreeQuery; import java.io.IOException; import java.util.Map; @@ -119,6 +125,15 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc throw new CollectionTerminatedException(); } } + + if (context.query() instanceof OriginalOrStarTreeQuery && ((OriginalOrStarTreeQuery) context.query()).isStarTreeUsed()) { + StarTreeQuery starTreeQuery = ((OriginalOrStarTreeQuery) context.query()).getStarTreeQuery(); + return getStarTreeLeafCollector(ctx, sub, starTreeQuery.getStarTree()); + } + return getDefaultLeafCollector(ctx, sub); + } + + private LeafBucketCollector getDefaultLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { final BigArrays bigArrays = context.bigArrays(); final SortedNumericDoubleValues allValues = valuesSource.doubleValues(ctx); final NumericDoubleValues values = MultiValueMode.MIN.select(allValues); @@ -138,7 +153,34 @@ public void collect(int doc, long bucket) throws IOException { mins.set(bucket, min); } } + }; + } + private LeafBucketCollector getStarTreeLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub, CompositeIndexFieldInfo starTree) + throws IOException { + StarTreeValues starTreeValues = getStarTreeValues(ctx, starTree); + String fieldName = ((ValuesSource.Numeric.FieldData) valuesSource).getIndexFieldName(); + String metricName = StarTreeUtils.fullyQualifiedFieldNameForStarTreeMetricsDocValues(starTree.getField(), fieldName, "min"); + SortedNumericDocValues values = (SortedNumericDocValues) starTreeValues.getMetricDocValuesIteratorMap().get(metricName); + + final BigArrays bigArrays = context.bigArrays(); + final SortedNumericDoubleValues allValues = valuesSource.doubleValues(ctx); + return new LeafBucketCollectorBase(sub, allValues) { + + @Override + public void collect(int doc, long bucket) throws IOException { + if (bucket >= mins.size()) { + long from = mins.size(); + mins = bigArrays.grow(mins, bucket + 1); + mins.fill(from, mins.size(), Double.POSITIVE_INFINITY); + } + if (values.advanceExact(doc)) { + final double value = Double.longBitsToDouble(values.nextValue()); + double min = mins.get(bucket); + min = Math.min(min, value); + mins.set(bucket, min); + } + } }; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregatorFactory.java index 58fbe5edefd12..2b7c8a4cc8c9c 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregatorFactory.java @@ -52,7 +52,7 @@ * * @opensearch.internal */ -class MinAggregatorFactory extends ValuesSourceAggregatorFactory { +public class MinAggregatorFactory extends ValuesSourceAggregatorFactory { static void registerAggregators(ValuesSourceRegistry.Builder builder) { builder.register( diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/NumericMetricsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/NumericMetricsAggregator.java index f90e5a092385f..395bf32c7ffe8 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/NumericMetricsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/NumericMetricsAggregator.java @@ -31,13 +31,20 @@ package org.opensearch.search.aggregations.metrics; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SegmentReader; +import org.opensearch.common.lucene.Lucene; import org.opensearch.common.util.Comparators; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.codec.composite.CompositeIndexReader; +import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues; import org.opensearch.search.aggregations.Aggregator; import org.opensearch.search.internal.SearchContext; import org.opensearch.search.sort.SortOrder; import java.io.IOException; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; /** * Base class to aggregate all docs into a single numeric metric value. @@ -107,4 +114,16 @@ public BucketComparator bucketComparator(String key, SortOrder order) { return (lhs, rhs) -> Comparators.compareDiscardNaN(metric(key, lhs), metric(key, rhs), order == SortOrder.ASC); } } + + protected StarTreeValues getStarTreeValues(LeafReaderContext ctx, CompositeIndexFieldInfo starTree) throws IOException { + SegmentReader reader = Lucene.segmentReader(ctx.reader()); + if (!(reader.getDocValuesReader() instanceof CompositeIndexReader)) { + return null; + } + CompositeIndexReader starTreeDocValuesReader = (CompositeIndexReader) reader.getDocValuesReader(); + StarTreeValues values = (StarTreeValues) starTreeDocValuesReader.getCompositeIndexValues(starTree); + final AtomicReference aggrVal = new AtomicReference<>(null); + + return values; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java index 4b8e882cd69bc..b4af1d4768d11 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java @@ -32,10 +32,14 @@ package org.opensearch.search.aggregations.metrics; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SortedNumericDocValues; import org.apache.lucene.search.ScoreMode; import org.opensearch.common.lease.Releasables; import org.opensearch.common.util.BigArrays; import org.opensearch.common.util.DoubleArray; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues; +import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeUtils; import org.opensearch.index.fielddata.SortedNumericDoubleValues; import org.opensearch.search.DocValueFormat; import org.opensearch.search.aggregations.Aggregator; @@ -45,6 +49,8 @@ import org.opensearch.search.aggregations.support.ValuesSource; import org.opensearch.search.aggregations.support.ValuesSourceConfig; import org.opensearch.search.internal.SearchContext; +import org.opensearch.search.startree.OriginalOrStarTreeQuery; +import org.opensearch.search.startree.StarTreeQuery; import java.io.IOException; import java.util.Map; @@ -56,13 +62,13 @@ */ public class SumAggregator extends NumericMetricsAggregator.SingleValue { - private final ValuesSource.Numeric valuesSource; - private final DocValueFormat format; + protected final ValuesSource.Numeric valuesSource; + protected final DocValueFormat format; - private DoubleArray sums; - private DoubleArray compensations; + protected DoubleArray sums; + protected DoubleArray compensations; - SumAggregator( + public SumAggregator( String name, ValuesSourceConfig valuesSourceConfig, SearchContext context, @@ -89,6 +95,15 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc if (valuesSource == null) { return LeafBucketCollector.NO_OP_COLLECTOR; } + + if (context.query() instanceof OriginalOrStarTreeQuery && ((OriginalOrStarTreeQuery) context.query()).isStarTreeUsed()) { + StarTreeQuery starTreeQuery = ((OriginalOrStarTreeQuery) context.query()).getStarTreeQuery(); + return getStarTreeLeafCollector(ctx, sub, starTreeQuery.getStarTree()); + } + return getDefaultLeafCollector(ctx, sub); + } + + private LeafBucketCollector getDefaultLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { final BigArrays bigArrays = context.bigArrays(); final SortedNumericDoubleValues values = valuesSource.doubleValues(ctx); final CompensatedSum kahanSummation = new CompensatedSum(0, 0); @@ -118,6 +133,40 @@ public void collect(int doc, long bucket) throws IOException { }; } + private LeafBucketCollector getStarTreeLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub, CompositeIndexFieldInfo starTree) + throws IOException { + StarTreeValues starTreeValues = getStarTreeValues(ctx, starTree); + String fieldName = ((ValuesSource.Numeric.FieldData) valuesSource).getIndexFieldName(); + String metricName = StarTreeUtils.fullyQualifiedFieldNameForStarTreeMetricsDocValues(starTree.getField(), fieldName, "sum"); + SortedNumericDocValues values = (SortedNumericDocValues) starTreeValues.getMetricDocValuesIteratorMap().get(metricName); + + final BigArrays bigArrays = context.bigArrays(); + final CompensatedSum kahanSummation = new CompensatedSum(0, 0); + + return new LeafBucketCollectorBase(sub, values) { + @Override + public void collect(int doc, long bucket) throws IOException { + sums = bigArrays.grow(sums, bucket + 1); + compensations = bigArrays.grow(compensations, bucket + 1); + + if (values.advanceExact(doc)) { + final int valuesCount = values.docValueCount(); + double sum = sums.get(bucket); + double compensation = compensations.get(bucket); + kahanSummation.reset(sum, compensation); + + for (int i = 0; i < valuesCount; i++) { + double value = Double.longBitsToDouble(values.nextValue()); + kahanSummation.add(value); + } + + compensations.set(bucket, kahanSummation.delta()); + sums.set(bucket, kahanSummation.value()); + } + } + }; + } + @Override public double metric(long owningBucketOrd) { if (valuesSource == null || owningBucketOrd >= sums.size()) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregatorFactory.java index ef9b93920ba18..e0cd44f2672a8 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregatorFactory.java @@ -52,7 +52,7 @@ * * @opensearch.internal */ -class SumAggregatorFactory extends ValuesSourceAggregatorFactory { +public class SumAggregatorFactory extends ValuesSourceAggregatorFactory { SumAggregatorFactory( String name, diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregationBuilder.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregationBuilder.java index 0258a2c87cb20..4b9e96332c1b8 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregationBuilder.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregationBuilder.java @@ -57,7 +57,7 @@ * @opensearch.internal */ public class ValueCountAggregationBuilder extends ValuesSourceAggregationBuilder.LeafOnly { - public static final String NAME = "value_count"; + public static final String NAME = "count"; public static final ValuesSourceRegistry.RegistryKey REGISTRY_KEY = new ValuesSourceRegistry.RegistryKey<>( NAME, MetricAggregatorSupplier.class diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregator.java index 6f9be06231819..7e49a4c3a1763 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregator.java @@ -37,6 +37,10 @@ import org.opensearch.common.lease.Releasables; import org.opensearch.common.util.BigArrays; import org.opensearch.common.util.LongArray; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.compositeindex.datacube.MetricStat; +import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues; +import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeUtils; import org.opensearch.index.fielddata.MultiGeoPointValues; import org.opensearch.index.fielddata.SortedBinaryDocValues; import org.opensearch.search.aggregations.Aggregator; @@ -46,6 +50,8 @@ import org.opensearch.search.aggregations.support.ValuesSource; import org.opensearch.search.aggregations.support.ValuesSourceConfig; import org.opensearch.search.internal.SearchContext; +import org.opensearch.search.startree.OriginalOrStarTreeQuery; +import org.opensearch.search.startree.StarTreeQuery; import java.io.IOException; import java.util.Map; @@ -88,6 +94,12 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc final BigArrays bigArrays = context.bigArrays(); if (valuesSource instanceof ValuesSource.Numeric) { + + if (context.query() instanceof OriginalOrStarTreeQuery && ((OriginalOrStarTreeQuery) context.query()).isStarTreeUsed()) { + StarTreeQuery starTreeQuery = ((OriginalOrStarTreeQuery) context.query()).getStarTreeQuery(); + return getStarTreeLeafCollector(ctx, sub, starTreeQuery.getStarTree()); + } + final SortedNumericDocValues values = ((ValuesSource.Numeric) valuesSource).longValues(ctx); return new LeafBucketCollectorBase(sub, values) { @@ -124,7 +136,29 @@ public void collect(int doc, long bucket) throws IOException { counts.increment(bucket, values.docValueCount()); } } + }; + } + + private LeafBucketCollector getStarTreeLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub, CompositeIndexFieldInfo starTree) + throws IOException { + StarTreeValues starTreeValues = getStarTreeValues(ctx, starTree); + String fieldName = ((ValuesSource.Numeric.FieldData) valuesSource).getIndexFieldName(); + String metricName = StarTreeUtils.fullyQualifiedFieldNameForStarTreeMetricsDocValues( + starTree.getField(), + fieldName, + MetricStat.VALUE_COUNT.getTypeName() + ); + SortedNumericDocValues values = (SortedNumericDocValues) starTreeValues.getMetricDocValuesIteratorMap().get(metricName); + final BigArrays bigArrays = context.bigArrays(); + return new LeafBucketCollectorBase(sub, values) { + @Override + public void collect(int doc, long bucket) throws IOException { + counts = bigArrays.grow(counts, bucket + 1); + if (values.advanceExact(doc)) { + counts.increment(bucket, values.nextValue()); + } + } }; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregatorFactory.java index 4a04dd2e0a932..4c6fc142c182e 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregatorFactory.java @@ -51,7 +51,7 @@ * * @opensearch.internal */ -class ValueCountAggregatorFactory extends ValuesSourceAggregatorFactory { +public class ValueCountAggregatorFactory extends ValuesSourceAggregatorFactory { public static void registerAggregators(ValuesSourceRegistry.Builder builder) { builder.register(ValueCountAggregationBuilder.REGISTRY_KEY, CoreValuesSourceType.ALL_CORE, ValueCountAggregator::new, true); diff --git a/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSource.java b/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSource.java index 1f4dd429e094e..5732d545cb2d2 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSource.java +++ b/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSource.java @@ -625,6 +625,10 @@ public SortedNumericDocValues longValues(LeafReaderContext context) { public SortedNumericDoubleValues doubleValues(LeafReaderContext context) { return indexFieldData.load(context).getDoubleValues(); } + + public String getIndexFieldName() { + return indexFieldData.getFieldName(); + } } /** diff --git a/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSourceAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSourceAggregatorFactory.java index 69a4a5d8b6703..b19e466b081f9 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSourceAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSourceAggregatorFactory.java @@ -102,4 +102,16 @@ protected abstract Aggregator doCreateInternal( public String getStatsSubtype() { return config.valueSourceType().typeName(); } + + public String getField() { + return config.fieldContext().field(); + } + + public String getAggregationName() { + return name; + } + + public ValuesSourceConfig getConfig() { + return config; + } } diff --git a/server/src/main/java/org/opensearch/search/startree/OriginalOrStarTreeQuery.java b/server/src/main/java/org/opensearch/search/startree/OriginalOrStarTreeQuery.java new file mode 100644 index 0000000000000..a3aef0eab3ec9 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/startree/OriginalOrStarTreeQuery.java @@ -0,0 +1,83 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.startree; + +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.QueryVisitor; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Weight; +import org.apache.lucene.util.Accountable; + +import java.io.IOException; + +/** + * Preserves star-tree queries which can be used along with original query + * Decides which star-tree query to use (or not) based on cost factors + * + * @opensearch.experimental + */ +public class OriginalOrStarTreeQuery extends Query implements Accountable { + + private final StarTreeQuery starTreeQuery; + private final Query originalQuery; + private boolean starTreeQueryUsed; + + public OriginalOrStarTreeQuery(StarTreeQuery starTreeQuery, Query originalQuery) { + this.starTreeQuery = starTreeQuery; + this.originalQuery = originalQuery; + this.starTreeQueryUsed = false; + } + + @Override + public String toString(String s) { + return ""; + } + + @Override + public void visit(QueryVisitor queryVisitor) { + + } + + @Override + public boolean equals(Object o) { + return true; + } + + @Override + public int hashCode() { + return originalQuery.hashCode(); + } + + @Override + public long ramBytesUsed() { + return 0; + } + + public boolean isStarTreeUsed() { + return starTreeQueryUsed; + } + + public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float boost) throws IOException { + if (searcher.getIndexReader().hasDeletions() == false) { + this.starTreeQueryUsed = true; + return this.starTreeQuery.createWeight(searcher, scoreMode, boost); + } else { + return this.originalQuery.createWeight(searcher, scoreMode, boost); + } + } + + public Query getOriginalQuery() { + return originalQuery; + } + + public StarTreeQuery getStarTreeQuery() { + return starTreeQuery; + } +} diff --git a/server/src/main/java/org/opensearch/search/startree/StarTreeFilter.java b/server/src/main/java/org/opensearch/search/startree/StarTreeFilter.java new file mode 100644 index 0000000000000..91ce9c06a4c60 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/startree/StarTreeFilter.java @@ -0,0 +1,289 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.startree; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.util.DocIdSetBuilder; +import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues; +import org.opensearch.index.compositeindex.datacube.startree.node.StarTreeNode; +import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeUtils; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.function.Predicate; + +import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; + +/** + * Filter operator for star tree data structure. + * + * @opensearch.experimental + */ +public class StarTreeFilter { + private static final Logger logger = LogManager.getLogger(StarTreeFilter.class); + + /** + * Helper class to wrap the result from traversing the star tree. + * */ + static class StarTreeResult { + final DocIdSetBuilder _matchedDocIds; + final Set _remainingPredicateColumns; + final int numOfMatchedDocs; + final int maxMatchedDoc; + + StarTreeResult(DocIdSetBuilder matchedDocIds, Set remainingPredicateColumns, int numOfMatchedDocs, int maxMatchedDoc) { + _matchedDocIds = matchedDocIds; + _remainingPredicateColumns = remainingPredicateColumns; + this.numOfMatchedDocs = numOfMatchedDocs; + this.maxMatchedDoc = maxMatchedDoc; + } + } + + private final StarTreeNode starTreeRoot; + + Map>> _predicateEvaluators; + + DocIdSetBuilder docsWithField; + + DocIdSetBuilder.BulkAdder adder; + Map dimValueMap; + + public StarTreeFilter(StarTreeValues starTreeAggrStructure, Map>> predicateEvaluators) { + // This filter operator does not support AND/OR/NOT operations. + starTreeRoot = starTreeAggrStructure.getRoot(); + dimValueMap = starTreeAggrStructure.getDimensionDocValuesIteratorMap(); + _predicateEvaluators = predicateEvaluators != null ? predicateEvaluators : Collections.emptyMap(); + // _groupByColumns = groupByColumns != null ? groupByColumns : Collections.emptyList(); + + // TODO : this should be the maximum number of doc values + docsWithField = new DocIdSetBuilder(Integer.MAX_VALUE); + } + + /** + *
    + *
  • First go over the star tree and try to match as many dimensions as possible + *
  • For the remaining columns, use doc values indexes to match them + *
+ */ + public DocIdSetIterator getStarTreeResult() throws IOException { + StarTreeResult starTreeResult = traverseStarTree(); + List andIterators = new ArrayList<>(); + andIterators.add(starTreeResult._matchedDocIds.build().iterator()); + DocIdSetIterator docIdSetIterator = andIterators.get(0); + // No matches, return + if (starTreeResult.maxMatchedDoc == -1) { + return docIdSetIterator; + } + int docCount = 0; + for (String remainingPredicateColumn : starTreeResult._remainingPredicateColumns) { + // TODO : set to max value of doc values + logger.debug("remainingPredicateColumn : {}, maxMatchedDoc : {} ", remainingPredicateColumn, starTreeResult.maxMatchedDoc); + DocIdSetBuilder builder = new DocIdSetBuilder(starTreeResult.maxMatchedDoc + 1); + List> compositePredicateEvaluators = _predicateEvaluators.get(remainingPredicateColumn); + SortedNumericDocValues ndv = (SortedNumericDocValues) this.dimValueMap.get(remainingPredicateColumn); + List docIds = new ArrayList<>(); + while (docIdSetIterator.nextDoc() != NO_MORE_DOCS) { + docCount++; + int docID = docIdSetIterator.docID(); + if (ndv.advanceExact(docID)) { + final int valuesCount = ndv.docValueCount(); + long value = ndv.nextValue(); + for (Predicate compositePredicateEvaluator : compositePredicateEvaluators) { + // TODO : this might be expensive as its done against all doc values docs + if (compositePredicateEvaluator.test(value)) { + docIds.add(docID); + for (int i = 0; i < valuesCount - 1; i++) { + while (docIdSetIterator.nextDoc() != NO_MORE_DOCS) { + docIds.add(docIdSetIterator.docID()); + } + } + break; + } + } + } + } + DocIdSetBuilder.BulkAdder adder = builder.grow(docIds.size()); + for (int docID : docIds) { + adder.add(docID); + } + docIdSetIterator = builder.build().iterator(); + } + return docIdSetIterator; + } + + /** + * Helper method to traverse the star tree, get matching documents and keep track of all the + * predicate dimensions that are not matched. + */ + private StarTreeResult traverseStarTree() throws IOException { + Set globalRemainingPredicateColumns = null; + + StarTreeNode starTree = starTreeRoot; + + List dimensionNames = new ArrayList<>(dimValueMap.keySet()); + + // Track whether we have found a leaf node added to the queue. If we have found a leaf node, and + // traversed to the + // level of the leave node, we can set globalRemainingPredicateColumns if not already set + // because we know the leaf + // node won't split further on other predicate columns. + boolean foundLeafNode = starTree.isLeaf(); + + // Use BFS to traverse the star tree + Queue queue = new ArrayDeque<>(); + queue.add(starTree); + int currentDimensionId = -1; + Set remainingPredicateColumns = new HashSet<>(_predicateEvaluators.keySet()); + if (foundLeafNode) { + globalRemainingPredicateColumns = new HashSet<>(remainingPredicateColumns); + } + + int matchedDocsCountInStarTree = 0; + int maxDocNum = -1; + + StarTreeNode starTreeNode; + List docIds = new ArrayList<>(); + while ((starTreeNode = queue.poll()) != null) { + int dimensionId = starTreeNode.getDimensionId(); + if (dimensionId > currentDimensionId) { + // Previous level finished + String dimension = dimensionNames.get(dimensionId); + remainingPredicateColumns.remove(dimension); + if (foundLeafNode && globalRemainingPredicateColumns == null) { + globalRemainingPredicateColumns = new HashSet<>(remainingPredicateColumns); + } + currentDimensionId = dimensionId; + } + + // If all predicate columns columns are matched, we can use aggregated document + if (remainingPredicateColumns.isEmpty()) { + int docId = starTreeNode.getAggregatedDocId(); + docIds.add(docId); + matchedDocsCountInStarTree++; + maxDocNum = Math.max(docId, maxDocNum); + continue; + } + + // For leaf node, because we haven't exhausted all predicate columns and group-by columns, + // we cannot use the aggregated document. + // Add the range of documents for this node to the bitmap, and keep track of the + // remaining predicate columns for this node + if (starTreeNode.isLeaf()) { + for (long i = starTreeNode.getStartDocId(); i < starTreeNode.getEndDocId(); i++) { + docIds.add((int) i); + matchedDocsCountInStarTree++; + maxDocNum = Math.max((int) i, maxDocNum); + } + continue; + } + + // For non-leaf node, proceed to next level + String childDimension = dimensionNames.get(dimensionId + 1); + + // Only read star-node when the dimension is not in the global remaining predicate columns + // because we cannot use star-node in such cases + StarTreeNode starNode = null; + if ((globalRemainingPredicateColumns == null || !globalRemainingPredicateColumns.contains(childDimension))) { + starNode = starTreeNode.getChildForDimensionValue(StarTreeUtils.ALL, true); + } + + if (remainingPredicateColumns.contains(childDimension)) { + // Have predicates on the next level, add matching nodes to the queue + + // Calculate the matching dictionary ids for the child dimension + int numChildren = starTreeNode.getNumChildren(); + + // If number of matching dictionary ids is large, use scan instead of binary search + + Iterator childrenIterator = starTreeNode.getChildrenIterator(); + + // When the star-node exists, and the number of matching doc ids is more than or equal to + // the number of non-star child nodes, check if all the child nodes match the predicate, + // and use the star-node if so + if (starNode != null) { + List matchingChildNodes = new ArrayList<>(); + boolean findLeafChildNode = false; + while (childrenIterator.hasNext()) { + StarTreeNode childNode = childrenIterator.next(); + List> predicates = _predicateEvaluators.get(childDimension); + for (Predicate predicate : predicates) { + long val = childNode.getDimensionValue(); + if (predicate.test(val)) { + matchingChildNodes.add(childNode); + findLeafChildNode |= childNode.isLeaf(); + break; + } + } + } + if (matchingChildNodes.size() == numChildren - 1) { + // All the child nodes (except for the star-node) match the predicate, use the star-node + queue.add(starNode); + foundLeafNode |= starNode.isLeaf(); + } else { + // Some child nodes do not match the predicate, use the matching child nodes + queue.addAll(matchingChildNodes); + foundLeafNode |= findLeafChildNode; + } + } else { + // Cannot use the star-node, use the matching child nodes + while (childrenIterator.hasNext()) { + StarTreeNode childNode = childrenIterator.next(); + List> predicates = _predicateEvaluators.get(childDimension); + for (Predicate predicate : predicates) { + if (predicate.test(childNode.getDimensionValue())) { + queue.add(childNode); + foundLeafNode |= childNode.isLeaf(); + break; + } + } + } + } + } else { + // No predicate on the next level + if (starNode != null) { + // Star-node exists, use it + queue.add(starNode); + foundLeafNode |= starNode.isLeaf(); + } else { + // Star-node does not exist or cannot be used, add all non-star nodes to the queue + Iterator childrenIterator = starTreeNode.getChildrenIterator(); + while (childrenIterator.hasNext()) { + StarTreeNode childNode = childrenIterator.next(); + if (childNode.getDimensionValue() != StarTreeUtils.ALL) { + queue.add(childNode); + foundLeafNode |= childNode.isLeaf(); + } + } + } + } + } + + adder = docsWithField.grow(docIds.size()); + for (int id : docIds) { + adder.add(id); + } + return new StarTreeResult( + docsWithField, + globalRemainingPredicateColumns != null ? globalRemainingPredicateColumns : Collections.emptySet(), + matchedDocsCountInStarTree, + maxDocNum + ); + } +} diff --git a/server/src/main/java/org/opensearch/search/startree/StarTreeQuery.java b/server/src/main/java/org/opensearch/search/startree/StarTreeQuery.java new file mode 100644 index 0000000000000..0c24e2f469909 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/startree/StarTreeQuery.java @@ -0,0 +1,116 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.startree; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SegmentReader; +import org.apache.lucene.search.ConstantScoreScorer; +import org.apache.lucene.search.ConstantScoreWeight; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.QueryVisitor; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Scorer; +import org.apache.lucene.search.Weight; +import org.apache.lucene.util.Accountable; +import org.opensearch.common.lucene.Lucene; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.codec.composite.CompositeIndexReader; +import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; + +/** + * Query class for querying star tree data structure. + * + * @opensearch.experimental + */ +public class StarTreeQuery extends Query implements Accountable { + + /** + * Star tree field info + * This is used to get the star tree data structure + */ + CompositeIndexFieldInfo starTree; + + /** + * Map of field name to a list of predicates to be applied on that field + * This is used to filter the data based on the predicates + */ + Map>> compositePredicateMap; + + public StarTreeQuery(CompositeIndexFieldInfo starTree, Map>> compositePredicateMap) { + this.starTree = starTree; + this.compositePredicateMap = compositePredicateMap; + } + + @Override + public String toString(String field) { + return null; + } + + @Override + public void visit(QueryVisitor visitor) { + visitor.visitLeaf(this); + } + + @Override + public boolean equals(Object obj) { + return sameClassAs(obj); + } + + @Override + public int hashCode() { + return classHash(); + } + + @Override + public long ramBytesUsed() { + return 0; + } + + @Override + public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float boost) throws IOException { + return new ConstantScoreWeight(this, boost) { + @Override + public Scorer scorer(LeafReaderContext context) throws IOException { + SegmentReader reader = Lucene.segmentReader(context.reader()); + + // We get the 'CompositeIndexReader' instance so that we can get StarTreeValues + if (!(reader.getDocValuesReader() instanceof CompositeIndexReader)) return null; + + CompositeIndexReader starTreeDocValuesReader = (CompositeIndexReader) reader.getDocValuesReader(); + List compositeIndexFields = starTreeDocValuesReader.getCompositeIndexFields(); + StarTreeValues starTreeValues = null; + if (compositeIndexFields != null && !compositeIndexFields.isEmpty()) { + starTreeValues = (StarTreeValues) starTreeDocValuesReader.getCompositeIndexValues(starTree); + } else { + return null; + } + + StarTreeFilter filter = new StarTreeFilter(starTreeValues, compositePredicateMap); + DocIdSetIterator result = filter.getStarTreeResult(); + return new ConstantScoreScorer(this, score(), scoreMode, result); + } + + @Override + public boolean isCacheable(LeafReaderContext ctx) { + return false; + } + }; + } + + public CompositeIndexFieldInfo getStarTree() { + return starTree; + } +} diff --git a/server/src/main/java/org/opensearch/search/startree/package-info.java b/server/src/main/java/org/opensearch/search/startree/package-info.java new file mode 100644 index 0000000000000..601a588e54e69 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/startree/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Star Tree query classes */ +package org.opensearch.search.startree; diff --git a/server/src/test/java/org/opensearch/index/codec/composite99/datacube/startree/StarTreeDocValuesFormatTests.java b/server/src/test/java/org/opensearch/index/codec/composite99/datacube/startree/StarTreeDocValuesFormatTests.java index 366fa5c85a920..2a7cf1acaa2b6 100644 --- a/server/src/test/java/org/opensearch/index/codec/composite99/datacube/startree/StarTreeDocValuesFormatTests.java +++ b/server/src/test/java/org/opensearch/index/codec/composite99/datacube/startree/StarTreeDocValuesFormatTests.java @@ -201,7 +201,7 @@ public void testStarTreeDocValues() throws IOException { directory.close(); } - private XContentBuilder getExpandedMapping() throws IOException { + public static XContentBuilder getExpandedMapping() throws IOException { return topMapping(b -> { b.startObject("composite"); b.startObject("startree"); @@ -222,6 +222,8 @@ private XContentBuilder getExpandedMapping() throws IOException { b.startArray("stats"); b.value("sum"); b.value("value_count"); + b.value("max"); + b.value("min"); b.endArray(); b.endObject(); b.endArray(); @@ -242,13 +244,14 @@ private XContentBuilder getExpandedMapping() throws IOException { }); } - private XContentBuilder topMapping(CheckedConsumer buildFields) throws IOException { + private static XContentBuilder topMapping(CheckedConsumer buildFields) throws IOException { XContentBuilder builder = XContentFactory.jsonBuilder().startObject().startObject("_doc"); buildFields.accept(builder); return builder.endObject().endObject(); } - private void createMapperService(XContentBuilder builder) throws IOException { + public static MapperService createMapperService(XContentBuilder builder) throws IOException { + MapperService mapperService = null; IndexMetadata indexMetadata = IndexMetadata.builder("test") .settings( Settings.builder() @@ -267,5 +270,6 @@ private void createMapperService(XContentBuilder builder) throws IOException { "test" ); mapperService.merge(indexMetadata, MapperService.MergeReason.INDEX_TEMPLATE); + return mapperService; } } diff --git a/server/src/test/java/org/opensearch/search/aggregations/startree/MetricAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/startree/MetricAggregatorTests.java new file mode 100644 index 0000000000000..515382967f4a1 --- /dev/null +++ b/server/src/test/java/org/opensearch/search/aggregations/startree/MetricAggregatorTests.java @@ -0,0 +1,212 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.aggregations.startree; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.codecs.Codec; +import org.apache.lucene.codecs.lucene99.Lucene99Codec; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.SortedNumericDocValuesField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SegmentReader; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.store.Directory; +import org.apache.lucene.tests.index.RandomIndexWriter; +import org.opensearch.common.lucene.Lucene; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.codec.composite.CompositeIndexReader; +import org.opensearch.index.codec.composite.composite99.Composite99Codec; +import org.opensearch.index.codec.composite99.datacube.startree.StarTreeDocValuesFormatTests; +import org.opensearch.index.mapper.MappedFieldType; +import org.opensearch.index.mapper.MapperService; +import org.opensearch.index.mapper.NumberFieldMapper; +import org.opensearch.search.aggregations.AggregationBuilder; +import org.opensearch.search.aggregations.AggregatorTestCase; +import org.opensearch.search.aggregations.InternalAggregation; +import org.opensearch.search.aggregations.metrics.AvgAggregationBuilder; +import org.opensearch.search.aggregations.metrics.InternalAvg; +import org.opensearch.search.aggregations.metrics.InternalMax; +import org.opensearch.search.aggregations.metrics.InternalMin; +import org.opensearch.search.aggregations.metrics.InternalSum; +import org.opensearch.search.aggregations.metrics.InternalValueCount; +import org.opensearch.search.aggregations.metrics.MaxAggregationBuilder; +import org.opensearch.search.aggregations.metrics.MinAggregationBuilder; +import org.opensearch.search.aggregations.metrics.SumAggregationBuilder; +import org.opensearch.search.aggregations.metrics.ValueCountAggregationBuilder; +import org.opensearch.search.startree.OriginalOrStarTreeQuery; +import org.opensearch.search.startree.StarTreeQuery; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; +import java.util.function.Function; +import java.util.function.Predicate; + +import static org.opensearch.search.aggregations.AggregationBuilders.avg; +import static org.opensearch.search.aggregations.AggregationBuilders.count; +import static org.opensearch.search.aggregations.AggregationBuilders.max; +import static org.opensearch.search.aggregations.AggregationBuilders.min; +import static org.opensearch.search.aggregations.AggregationBuilders.sum; +import static org.opensearch.test.InternalAggregationTestCase.DEFAULT_MAX_BUCKETS; + +public class MetricAggregatorTests extends AggregatorTestCase { + + private static final String FIELD_NAME = "field"; + private static final NumberFieldMapper.NumberType DEFAULT_FIELD_TYPE = NumberFieldMapper.NumberType.LONG; + private static final MappedFieldType DEFAULT_MAPPED_FIELD = new NumberFieldMapper.NumberFieldType(FIELD_NAME, DEFAULT_FIELD_TYPE); + + @Before + public void setup() { + FeatureFlags.initializeFeatureFlags(Settings.builder().put(FeatureFlags.STAR_TREE_INDEX, true).build()); + } + + @After + public void teardown() throws IOException { + FeatureFlags.initializeFeatureFlags(Settings.EMPTY); + } + + protected Codec getCodec() { + final Logger testLogger = LogManager.getLogger(MetricAggregatorTests.class); + MapperService mapperService; + try { + mapperService = StarTreeDocValuesFormatTests.createMapperService(StarTreeDocValuesFormatTests.getExpandedMapping()); + } catch (IOException e) { + throw new RuntimeException(e); + } + return new Composite99Codec(Lucene99Codec.Mode.BEST_SPEED, mapperService, testLogger); + } + + public void testStarTreeDocValues() throws IOException { + Directory directory = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(null); + conf.setCodec(getCodec()); + conf.setMergePolicy(newLogMergePolicy()); + RandomIndexWriter iw = new RandomIndexWriter(random(), directory, conf); + Document doc = new Document(); + doc.add(new SortedNumericDocValuesField("sndv", 1)); + doc.add(new SortedNumericDocValuesField("dv", 1)); + doc.add(new SortedNumericDocValuesField("field", 1)); + iw.addDocument(doc); + doc = new Document(); + doc.add(new SortedNumericDocValuesField("sndv", 1)); + doc.add(new SortedNumericDocValuesField("dv", 2)); + doc.add(new SortedNumericDocValuesField("field", 3)); + iw.addDocument(doc); + doc = new Document(); + iw.forceMerge(1); + doc.add(new SortedNumericDocValuesField("sndv", 2)); + doc.add(new SortedNumericDocValuesField("dv", 4)); + doc.add(new SortedNumericDocValuesField("field", 6)); + iw.addDocument(doc); + doc = new Document(); + doc.add(new SortedNumericDocValuesField("sndv", 3)); + doc.add(new SortedNumericDocValuesField("dv", 6)); + doc.add(new SortedNumericDocValuesField("field", 9)); + iw.addDocument(doc); + iw.forceMerge(1); + iw.close(); + + DirectoryReader ir = DirectoryReader.open(directory); + initValuesSourceRegistry(); + assertEquals(ir.leaves().size(), 1); + LeafReaderContext context = ir.leaves().get(0); + + SegmentReader reader = Lucene.segmentReader(context.reader()); + IndexSearcher indexSearcher = newSearcher(reader, true, true); + CompositeIndexReader starTreeDocValuesReader = (CompositeIndexReader) reader.getDocValuesReader(); + List compositeIndexFields = starTreeDocValuesReader.getCompositeIndexFields(); + + CompositeIndexFieldInfo starTree = compositeIndexFields.get(0); + + SumAggregationBuilder sumAggregationBuilder = sum("_name").field(FIELD_NAME); + MaxAggregationBuilder maxAggregationBuilder = max("_name").field(FIELD_NAME); + MinAggregationBuilder minAggregationBuilder = min("_name").field(FIELD_NAME); + ValueCountAggregationBuilder valueCountAggregationBuilder = count("_name").field(FIELD_NAME); + AvgAggregationBuilder avgAggregationBuilder = avg("_name").field(FIELD_NAME); + + // match-all query + Query defaultQeury = new MatchAllDocsQuery(); + StarTreeQuery starTreeQuery = new StarTreeQuery(starTree, null); // no predicates + testCase(indexSearcher, defaultQeury, starTreeQuery, sumAggregationBuilder, verifyAggregation(InternalSum::getValue), 19); + testCase(indexSearcher, defaultQeury, starTreeQuery, maxAggregationBuilder, verifyAggregation(InternalMax::getValue), 9); + testCase(indexSearcher, defaultQeury, starTreeQuery, minAggregationBuilder, verifyAggregation(InternalMin::getValue), 1); + testCase( + indexSearcher, + defaultQeury, + starTreeQuery, + valueCountAggregationBuilder, + verifyAggregation(InternalValueCount::getValue), + 4 + ); + testCase(indexSearcher, defaultQeury, starTreeQuery, avgAggregationBuilder, verifyAggregation(InternalAvg::getValue), 4.75); + + // numeric-terms query + defaultQeury = new TermQuery(new Term("sndv", "1")); + Map>> compositePredicateMap = new HashMap<>(); + compositePredicateMap.put("sndv", List.of(dimVal -> dimVal == 1)); + starTreeQuery = new StarTreeQuery(starTree, compositePredicateMap); + testCase(indexSearcher, defaultQeury, starTreeQuery, sumAggregationBuilder, verifyAggregation(InternalSum::getValue), 4); + testCase(indexSearcher, defaultQeury, starTreeQuery, maxAggregationBuilder, verifyAggregation(InternalMax::getValue), 3); + testCase(indexSearcher, defaultQeury, starTreeQuery, minAggregationBuilder, verifyAggregation(InternalMin::getValue), 1); + testCase( + indexSearcher, + defaultQeury, + starTreeQuery, + valueCountAggregationBuilder, + verifyAggregation(InternalValueCount::getValue), + 2 + ); + testCase(indexSearcher, defaultQeury, starTreeQuery, avgAggregationBuilder, verifyAggregation(InternalAvg::getValue), 2); + + ir.close(); + directory.close(); + } + + private void testCase( + IndexSearcher searcher, + Query defaultQuery, + StarTreeQuery starTreeQuery, + T builder, + BiConsumer verify, + N expectedValue + ) throws IOException { + OriginalOrStarTreeQuery originalOrStarTreeQuery = new OriginalOrStarTreeQuery(starTreeQuery, defaultQuery); + V aggregation = searchAndReduceStarTree( + createIndexSettings(), + searcher, + originalOrStarTreeQuery, + builder, + DEFAULT_MAX_BUCKETS, + false, + DEFAULT_MAPPED_FIELD + ); + verify.accept(aggregation, expectedValue); + } + + BiConsumer verifyAggregation(Function valueExtractor) { + return (aggregation, expectedValue) -> assertEquals( + expectedValue.doubleValue(), + valueExtractor.apply(aggregation).doubleValue(), + 0f + ); + } +} diff --git a/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java index 4abd7fbea9cff..9d816e864c50b 100644 --- a/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java @@ -37,6 +37,7 @@ import org.apache.lucene.document.Field; import org.apache.lucene.document.InetAddressPoint; import org.apache.lucene.document.LatLonDocValuesField; +import org.apache.lucene.document.LongField; import org.apache.lucene.document.SortedNumericDocValuesField; import org.apache.lucene.document.SortedSetDocValuesField; import org.apache.lucene.document.StoredField; @@ -141,6 +142,7 @@ import org.opensearch.search.internal.ContextIndexSearcher; import org.opensearch.search.internal.SearchContext; import org.opensearch.search.lookup.SearchLookup; +import org.opensearch.search.startree.StarTreeQuery; import org.opensearch.test.InternalAggregationTestCase; import org.opensearch.test.OpenSearchTestCase; import org.junit.After; @@ -650,6 +652,74 @@ protected A searchAndReduc doAssertReducedMultiBucketConsumer(internalAgg, reduceBucketConsumer); return internalAgg; } + protected A searchAndReduceStarTree( + IndexSettings indexSettings, + IndexSearcher searcher, + Query query, + AggregationBuilder builder, + int maxBucket, + boolean hasNested, + MappedFieldType... fieldTypes + ) throws IOException { + final IndexReaderContext ctx = searcher.getTopReaderContext(); + final PipelineTree pipelines = builder.buildPipelineTree(); + List aggs = new ArrayList<>(); + if (hasNested) { + query = Queries.filtered(query, Queries.newNonNestedFilter()); + } + + MultiBucketConsumer bucketConsumer = new MultiBucketConsumer( + maxBucket, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ); + C root = createAggregator(query, builder, searcher, bucketConsumer, fieldTypes); + + if (randomBoolean() && searcher.getIndexReader().leaves().size() > 0) { + assertTrue(ctx instanceof LeafReaderContext); + final LeafReaderContext compCTX = (LeafReaderContext) ctx; + final int size = compCTX.leaves().size(); + final ShardSearcher[] subSearchers = new ShardSearcher[size]; + for (int searcherIDX = 0; searcherIDX < subSearchers.length; searcherIDX++) { + final LeafReaderContext leave = compCTX.leaves().get(searcherIDX); + subSearchers[searcherIDX] = new ShardSearcher(leave, compCTX); + } + for (ShardSearcher subSearcher : subSearchers) { + MultiBucketConsumer shardBucketConsumer = new MultiBucketConsumer( + maxBucket, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ); + C a = createAggregator(query, builder, subSearcher, indexSettings, shardBucketConsumer, fieldTypes); + a.preCollection(); + Weight weight = subSearcher.createWeight(query, ScoreMode.COMPLETE, 1f); + + assertTrue(weight.getQuery() instanceof StarTreeQuery); + subSearcher.search(weight, a); + a.postCollection(); + aggs.add(a.buildTopLevel()); + } + } else { + root.preCollection(); + searcher.search(query, root); + root.postCollection(); + aggs.add(root.buildTopLevel()); + } + + MultiBucketConsumer reduceBucketConsumer = new MultiBucketConsumer( + maxBucket, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ); + InternalAggregation.ReduceContext context = InternalAggregation.ReduceContext.forFinalReduction( + root.context().bigArrays(), + getMockScriptService(), + reduceBucketConsumer, + pipelines + ); + + @SuppressWarnings("unchecked") + A internalAgg = (A) aggs.get(0).reduce(aggs, context); + doAssertReducedMultiBucketConsumer(internalAgg, reduceBucketConsumer); + return internalAgg; + } protected void doAssertReducedMultiBucketConsumer(Aggregation agg, MultiBucketConsumerService.MultiBucketConsumer bucketConsumer) { InternalAggregationTestCase.assertMultiBucketConsumer(agg, bucketConsumer);