From 9b77ab19e5d89cd43d2ba8bb8dba21fc2b806d24 Mon Sep 17 00:00:00 2001 From: Sandesh Kumar Date: Mon, 21 Oct 2024 12:34:58 -0700 Subject: [PATCH] [Star Tree] [Search] Support for metric aggregations with/without term query (#15289) --------- Signed-off-by: Sandesh Kumar --- CHANGELOG.md | 1 + .../datacube/DateDimension.java | 5 + .../compositeindex/datacube/Dimension.java | 3 + .../datacube/NumericDimension.java | 6 + .../datacube/ReadDimension.java | 6 + .../startree/utils/StarTreeQueryHelper.java | 248 ++++++++++++++ .../SortedNumericStarTreeValuesIterator.java | 8 + .../org/opensearch/search/SearchService.java | 26 +- .../aggregations/AggregatorFactories.java | 4 + .../aggregations/AggregatorFactory.java | 4 + .../aggregations/metrics/AvgAggregator.java | 74 ++++ .../metrics/AvgAggregatorFactory.java | 9 +- .../aggregations/metrics/MaxAggregator.java | 34 ++ .../metrics/MaxAggregatorFactory.java | 9 +- .../metrics/MetricAggregatorFactory.java | 37 ++ .../aggregations/metrics/MinAggregator.java | 34 +- .../metrics/MinAggregatorFactory.java | 9 +- .../aggregations/metrics/SumAggregator.java | 30 ++ .../metrics/SumAggregatorFactory.java | 9 +- .../metrics/ValueCountAggregator.java | 26 +- .../metrics/ValueCountAggregatorFactory.java | 9 +- .../aggregations/support/ValuesSource.java | 4 + .../ValuesSourceAggregatorFactory.java | 4 + .../search/internal/SearchContext.java | 11 +- .../search/startree/StarTreeFilter.java | 228 +++++++++++++ .../search/startree/StarTreeQueryContext.java | 79 +++++ .../search/startree/package-info.java | 10 + .../StarTreeDocValuesFormatTests.java | 11 +- .../search/SearchServiceStarTreeTests.java | 160 +++++++++ .../startree/MetricAggregatorTests.java | 317 +++++++++++++++++ .../startree/StarTreeFilterTests.java | 319 ++++++++++++++++++ .../aggregations/AggregatorTestCase.java | 142 ++++++++ 32 files changed, 1853 insertions(+), 23 deletions(-) create mode 100644 server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeQueryHelper.java create mode 100644 server/src/main/java/org/opensearch/search/aggregations/metrics/MetricAggregatorFactory.java create mode 100644 server/src/main/java/org/opensearch/search/startree/StarTreeFilter.java create mode 100644 server/src/main/java/org/opensearch/search/startree/StarTreeQueryContext.java create mode 100644 server/src/main/java/org/opensearch/search/startree/package-info.java create mode 100644 server/src/test/java/org/opensearch/search/SearchServiceStarTreeTests.java create mode 100644 server/src/test/java/org/opensearch/search/aggregations/startree/MetricAggregatorTests.java create mode 100644 server/src/test/java/org/opensearch/search/aggregations/startree/StarTreeFilterTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index a31b2ca43b14c..a8bfe04ceeee7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Flat object field use IndexOrDocValuesQuery to optimize query ([#14383](https://github.com/opensearch-project/OpenSearch/issues/14383)) - Add method to return dynamic SecureTransportParameters from SecureTransportSettingsProvider interface ([#16387](https://github.com/opensearch-project/OpenSearch/pull/16387) - Add _list/shards API as paginated alternate to _cat/shards ([#14641](https://github.com/opensearch-project/OpenSearch/pull/14641)) +- [Star Tree - Search] Add support for metric aggregations with/without term query ([15289](https://github.com/opensearch-project/OpenSearch/pull/15289)) ### Dependencies - Bump `org.apache.logging.log4j:log4j-core` from 2.23.1 to 2.24.0 ([#15858](https://github.com/opensearch-project/OpenSearch/pull/15858)) diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/DateDimension.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/DateDimension.java index ee6d5b4680c73..8feb9ccd27dbd 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/DateDimension.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/DateDimension.java @@ -8,6 +8,7 @@ package org.opensearch.index.compositeindex.datacube; +import org.apache.lucene.index.DocValuesType; import org.opensearch.common.Rounding; import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.time.DateUtils; @@ -169,4 +170,8 @@ public int compare(DateTimeUnitRounding unit1, DateTimeUnitRounding unit2) { public static List getSortedDateTimeUnits(List dateTimeUnits) { return dateTimeUnits.stream().sorted(new DateTimeUnitComparator()).collect(Collectors.toList()); } + + public DocValuesType getDocValuesType() { + return DocValuesType.SORTED_NUMERIC; + } } diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/Dimension.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/Dimension.java index cfa8d3a2a8164..3d71b38881693 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/Dimension.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/Dimension.java @@ -8,6 +8,7 @@ package org.opensearch.index.compositeindex.datacube; +import org.apache.lucene.index.DocValuesType; import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.core.xcontent.ToXContent; @@ -42,4 +43,6 @@ public interface Dimension extends ToXContent { * Returns the list of dimension fields that represent the dimension */ List getSubDimensionNames(); + + DocValuesType getDocValuesType(); } diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/NumericDimension.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/NumericDimension.java index acc14f5f05c68..f1d1b15337f4a 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/NumericDimension.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/NumericDimension.java @@ -8,6 +8,7 @@ package org.opensearch.index.compositeindex.datacube; +import org.apache.lucene.index.DocValuesType; import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.index.mapper.CompositeDataCubeFieldType; @@ -71,4 +72,9 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(field); } + + @Override + public DocValuesType getDocValuesType() { + return DocValuesType.SORTED_NUMERIC; + } } diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/ReadDimension.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/ReadDimension.java index be3667f10b6da..0e2ec086abc0a 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/ReadDimension.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/ReadDimension.java @@ -8,6 +8,7 @@ package org.opensearch.index.compositeindex.datacube; +import org.apache.lucene.index.DocValuesType; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.index.mapper.CompositeDataCubeFieldType; @@ -69,4 +70,9 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(field); } + + @Override + public DocValuesType getDocValuesType() { + return DocValuesType.SORTED_NUMERIC; + } } diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeQueryHelper.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeQueryHelper.java new file mode 100644 index 0000000000000..e538be5d5bece --- /dev/null +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeQueryHelper.java @@ -0,0 +1,248 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.compositeindex.datacube.startree.utils; + +import org.apache.lucene.index.DocValuesType; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SegmentReader; +import org.apache.lucene.search.CollectionTerminatedException; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.util.FixedBitSet; +import org.opensearch.common.lucene.Lucene; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.codec.composite.CompositeIndexReader; +import org.opensearch.index.compositeindex.datacube.Dimension; +import org.opensearch.index.compositeindex.datacube.Metric; +import org.opensearch.index.compositeindex.datacube.MetricStat; +import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues; +import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedNumericStarTreeValuesIterator; +import org.opensearch.index.mapper.CompositeDataCubeFieldType; +import org.opensearch.index.query.MatchAllQueryBuilder; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.index.query.TermQueryBuilder; +import org.opensearch.search.aggregations.AggregatorFactory; +import org.opensearch.search.aggregations.LeafBucketCollector; +import org.opensearch.search.aggregations.LeafBucketCollectorBase; +import org.opensearch.search.aggregations.metrics.MetricAggregatorFactory; +import org.opensearch.search.aggregations.support.ValuesSource; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.search.internal.SearchContext; +import org.opensearch.search.startree.StarTreeFilter; +import org.opensearch.search.startree.StarTreeQueryContext; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** + * Helper class for building star-tree query + * + * @opensearch.internal + * @opensearch.experimental + */ +public class StarTreeQueryHelper { + + /** + * Checks if the search context can be supported by star-tree + */ + public static boolean isStarTreeSupported(SearchContext context) { + return context.aggregations() != null && context.mapperService().isCompositeIndexPresent() && context.parsedPostFilter() == null; + } + + /** + * Gets StarTreeQueryContext from the search context and source builder. + * Returns null if the query and aggregation cannot be supported. + */ + public static StarTreeQueryContext getStarTreeQueryContext(SearchContext context, SearchSourceBuilder source) throws IOException { + // Current implementation assumes only single star-tree is supported + CompositeDataCubeFieldType compositeMappedFieldType = (CompositeDataCubeFieldType) context.mapperService() + .getCompositeFieldTypes() + .iterator() + .next(); + CompositeIndexFieldInfo starTree = new CompositeIndexFieldInfo( + compositeMappedFieldType.name(), + compositeMappedFieldType.getCompositeIndexType() + ); + + for (AggregatorFactory aggregatorFactory : context.aggregations().factories().getFactories()) { + MetricStat metricStat = validateStarTreeMetricSupport(compositeMappedFieldType, aggregatorFactory); + if (metricStat == null) { + return null; + } + } + + // need to cache star tree values only for multiple aggregations + boolean cacheStarTreeValues = context.aggregations().factories().getFactories().length > 1; + int cacheSize = cacheStarTreeValues ? context.indexShard().segments(false).size() : -1; + + return StarTreeQueryHelper.tryCreateStarTreeQueryContext(starTree, compositeMappedFieldType, source.query(), cacheSize); + } + + /** + * Uses query builder and composite index info to form star-tree query context + */ + private static StarTreeQueryContext tryCreateStarTreeQueryContext( + CompositeIndexFieldInfo compositeIndexFieldInfo, + CompositeDataCubeFieldType compositeFieldType, + QueryBuilder queryBuilder, + int cacheStarTreeValuesSize + ) { + Map queryMap; + if (queryBuilder == null || queryBuilder instanceof MatchAllQueryBuilder) { + queryMap = null; + } else if (queryBuilder instanceof TermQueryBuilder) { + // TODO: Add support for keyword fields + if (compositeFieldType.getDimensions().stream().anyMatch(d -> d.getDocValuesType() != DocValuesType.SORTED_NUMERIC)) { + // return null for non-numeric fields + return null; + } + + List supportedDimensions = compositeFieldType.getDimensions() + .stream() + .map(Dimension::getField) + .collect(Collectors.toList()); + queryMap = getStarTreePredicates(queryBuilder, supportedDimensions); + if (queryMap == null) { + return null; + } + } else { + return null; + } + return new StarTreeQueryContext(compositeIndexFieldInfo, queryMap, cacheStarTreeValuesSize); + } + + /** + * Parse query body to star-tree predicates + * @param queryBuilder to match star-tree supported query shape + * @return predicates to match + */ + private static Map getStarTreePredicates(QueryBuilder queryBuilder, List supportedDimensions) { + TermQueryBuilder tq = (TermQueryBuilder) queryBuilder; + String field = tq.fieldName(); + if (!supportedDimensions.contains(field)) { + return null; + } + long inputQueryVal = Long.parseLong(tq.value().toString()); + + // Create a map with the field and the value + Map predicateMap = new HashMap<>(); + predicateMap.put(field, inputQueryVal); + return predicateMap; + } + + private static MetricStat validateStarTreeMetricSupport( + CompositeDataCubeFieldType compositeIndexFieldInfo, + AggregatorFactory aggregatorFactory + ) { + if (aggregatorFactory instanceof MetricAggregatorFactory && aggregatorFactory.getSubFactories().getFactories().length == 0) { + String field; + Map> supportedMetrics = compositeIndexFieldInfo.getMetrics() + .stream() + .collect(Collectors.toMap(Metric::getField, Metric::getMetrics)); + + MetricStat metricStat = ((MetricAggregatorFactory) aggregatorFactory).getMetricStat(); + field = ((MetricAggregatorFactory) aggregatorFactory).getField(); + + if (supportedMetrics.containsKey(field) && supportedMetrics.get(field).contains(metricStat)) { + return metricStat; + } + } + return null; + } + + public static CompositeIndexFieldInfo getSupportedStarTree(SearchContext context) { + StarTreeQueryContext starTreeQueryContext = context.getStarTreeQueryContext(); + return (starTreeQueryContext != null) ? starTreeQueryContext.getStarTree() : null; + } + + public static StarTreeValues getStarTreeValues(LeafReaderContext context, CompositeIndexFieldInfo starTree) throws IOException { + SegmentReader reader = Lucene.segmentReader(context.reader()); + if (!(reader.getDocValuesReader() instanceof CompositeIndexReader)) { + return null; + } + CompositeIndexReader starTreeDocValuesReader = (CompositeIndexReader) reader.getDocValuesReader(); + return (StarTreeValues) starTreeDocValuesReader.getCompositeIndexValues(starTree); + } + + /** + * Get the star-tree leaf collector + * This collector computes the aggregation prematurely and invokes an early termination collector + */ + public static LeafBucketCollector getStarTreeLeafCollector( + SearchContext context, + ValuesSource.Numeric valuesSource, + LeafReaderContext ctx, + LeafBucketCollector sub, + CompositeIndexFieldInfo starTree, + String metric, + Consumer valueConsumer, + Runnable finalConsumer + ) throws IOException { + StarTreeValues starTreeValues = getStarTreeValues(ctx, starTree); + assert starTreeValues != null; + String fieldName = ((ValuesSource.Numeric.FieldData) valuesSource).getIndexFieldName(); + String metricName = StarTreeUtils.fullyQualifiedFieldNameForStarTreeMetricsDocValues(starTree.getField(), fieldName, metric); + + assert starTreeValues != null; + SortedNumericStarTreeValuesIterator valuesIterator = (SortedNumericStarTreeValuesIterator) starTreeValues.getMetricValuesIterator( + metricName + ); + // Obtain a FixedBitSet of matched star tree document IDs + FixedBitSet filteredValues = getStarTreeFilteredValues(context, ctx, starTreeValues); + assert filteredValues != null; + + int numBits = filteredValues.length(); // Get the number of the filtered values (matching docs) + if (numBits > 0) { + // Iterate over the filtered values + for (int bit = filteredValues.nextSetBit(0); bit != DocIdSetIterator.NO_MORE_DOCS; bit = (bit + 1 < numBits) + ? filteredValues.nextSetBit(bit + 1) + : DocIdSetIterator.NO_MORE_DOCS) { + // Advance to the entryId in the valuesIterator + if (valuesIterator.advanceExact(bit) == false) { + continue; // Skip if no more entries + } + + // Iterate over the values for the current entryId + for (int i = 0, count = valuesIterator.entryValueCount(); i < count; i++) { + long value = valuesIterator.nextValue(); + valueConsumer.accept(value); // Apply the consumer operation (e.g., max, sum) + } + } + } + + // Call the final consumer after processing all entries + finalConsumer.run(); + + // Return a LeafBucketCollector that terminates collection + return new LeafBucketCollectorBase(sub, valuesSource.doubleValues(ctx)) { + @Override + public void collect(int doc, long bucket) { + throw new CollectionTerminatedException(); + } + }; + } + + /** + * Get the filtered values for the star-tree query + * Cache the results in case of multiple aggregations (if cache is initialized) + * @return FixedBitSet of matched document IDs + */ + public static FixedBitSet getStarTreeFilteredValues(SearchContext context, LeafReaderContext ctx, StarTreeValues starTreeValues) + throws IOException { + FixedBitSet result = context.getStarTreeQueryContext().getStarTreeValues(ctx); + if (result == null) { + result = StarTreeFilter.getStarTreeResult(starTreeValues, context.getStarTreeQueryContext().getQueryMap()); + context.getStarTreeQueryContext().setStarTreeValues(ctx, result); + } + return result; + } +} diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/iterator/SortedNumericStarTreeValuesIterator.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/iterator/SortedNumericStarTreeValuesIterator.java index 27afdf1479b4e..4b4bfa6a915eb 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/iterator/SortedNumericStarTreeValuesIterator.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/iterator/SortedNumericStarTreeValuesIterator.java @@ -29,4 +29,12 @@ public SortedNumericStarTreeValuesIterator(DocIdSetIterator docIdSetIterator) { public long nextValue() throws IOException { return ((SortedNumericDocValues) docIdSetIterator).nextValue(); } + + public int entryValueCount() throws IOException { + return ((SortedNumericDocValues) docIdSetIterator).docValueCount(); + } + + public boolean advanceExact(int target) throws IOException { + return ((SortedNumericDocValues) docIdSetIterator).advanceExact(target); + } } diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index 49042219b100f..b20f8222d6b7a 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -78,6 +78,7 @@ import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; +import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeQueryHelper; import org.opensearch.index.engine.Engine; import org.opensearch.index.mapper.DerivedFieldResolver; import org.opensearch.index.mapper.DerivedFieldResolverFactory; @@ -138,6 +139,7 @@ import org.opensearch.search.sort.SortAndFormats; import org.opensearch.search.sort.SortBuilder; import org.opensearch.search.sort.SortOrder; +import org.opensearch.search.startree.StarTreeQueryContext; import org.opensearch.search.suggest.Suggest; import org.opensearch.search.suggest.completion.CompletionSuggestion; import org.opensearch.tasks.TaskResourceTrackingService; @@ -165,6 +167,7 @@ import static org.opensearch.common.unit.TimeValue.timeValueHours; import static org.opensearch.common.unit.TimeValue.timeValueMillis; import static org.opensearch.common.unit.TimeValue.timeValueMinutes; +import static org.opensearch.search.internal.SearchContext.TRACK_TOTAL_HITS_DISABLED; /** * The main search service @@ -1359,6 +1362,7 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc context.evaluateRequestShouldUseConcurrentSearch(); return; } + SearchShardTarget shardTarget = context.shardTarget(); QueryShardContext queryShardContext = context.getQueryShardContext(); context.from(source.from()); @@ -1372,7 +1376,7 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc InnerHitContextBuilder.extractInnerHits(source.postFilter(), innerHitBuilders); context.parsedPostFilter(queryShardContext.toQuery(source.postFilter())); } - if (innerHitBuilders.size() > 0) { + if (!innerHitBuilders.isEmpty()) { for (Map.Entry entry : innerHitBuilders.entrySet()) { try { entry.getValue().build(context, context.innerHits()); @@ -1384,9 +1388,7 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc if (source.sorts() != null) { try { Optional optionalSort = SortBuilder.buildSort(source.sorts(), context.getQueryShardContext()); - if (optionalSort.isPresent()) { - context.sort(optionalSort.get()); - } + optionalSort.ifPresent(context::sort); } catch (IOException e) { throw new SearchException(shardTarget, "failed to create sort elements", e); } @@ -1541,6 +1543,20 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc if (source.profile()) { context.setProfilers(new Profilers(context.searcher(), context.shouldUseConcurrentSearch())); } + + if (this.indicesService.getCompositeIndexSettings() != null + && this.indicesService.getCompositeIndexSettings().isStarTreeIndexCreationEnabled() + && StarTreeQueryHelper.isStarTreeSupported(context)) { + try { + StarTreeQueryContext starTreeQueryContext = StarTreeQueryHelper.getStarTreeQueryContext(context, source); + if (starTreeQueryContext != null) { + context.starTreeQueryContext(starTreeQueryContext); + logger.debug("can use star tree"); + } else { + logger.debug("cannot use star tree"); + } + } catch (IOException ignored) {} + } } /** @@ -1700,7 +1716,7 @@ public static boolean canMatchSearchAfter( && minMax != null && primarySortField != null && primarySortField.missing() == null - && Objects.equals(trackTotalHitsUpto, SearchContext.TRACK_TOTAL_HITS_DISABLED)) { + && Objects.equals(trackTotalHitsUpto, TRACK_TOTAL_HITS_DISABLED)) { final Object searchAfterPrimary = searchAfter.fields[0]; if (primarySortField.order() == SortOrder.DESC) { if (minMax.compareMin(searchAfterPrimary) > 0) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactories.java b/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactories.java index eeb0c606694b0..720a24da1d9d4 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactories.java +++ b/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactories.java @@ -661,4 +661,8 @@ public PipelineTree buildPipelineTree() { return new PipelineTree(subTrees, aggregators); } } + + public AggregatorFactory[] getFactories() { + return factories; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactory.java index 6cc3a78fb1e36..86fbb46a9ad3c 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactory.java @@ -127,4 +127,8 @@ protected boolean supportsConcurrentSegmentSearch() { public boolean evaluateChildFactories() { return factories.allFactoriesSupportConcurrentSearch(); } + + public AggregatorFactories getSubFactories() { + return factories; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java index e58466b56df2a..2970c5ca851e7 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java @@ -32,11 +32,21 @@ package org.opensearch.search.aggregations.metrics; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.CollectionTerminatedException; +import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.util.FixedBitSet; +import org.apache.lucene.util.NumericUtils; import org.opensearch.common.lease.Releasables; import org.opensearch.common.util.BigArrays; import org.opensearch.common.util.DoubleArray; import org.opensearch.common.util.LongArray; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.compositeindex.datacube.MetricStat; +import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues; +import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeQueryHelper; +import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeUtils; +import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedNumericStarTreeValuesIterator; import org.opensearch.index.fielddata.SortedNumericDoubleValues; import org.opensearch.search.DocValueFormat; import org.opensearch.search.aggregations.Aggregator; @@ -50,6 +60,9 @@ import java.io.IOException; import java.util.Map; +import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeQueryHelper.getStarTreeFilteredValues; +import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeQueryHelper.getSupportedStarTree; + /** * Aggregate all docs into an average * @@ -93,6 +106,14 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc if (valuesSource == null) { return LeafBucketCollector.NO_OP_COLLECTOR; } + CompositeIndexFieldInfo supportedStarTree = getSupportedStarTree(this.context); + if (supportedStarTree != null) { + return getStarTreeLeafCollector(ctx, sub, supportedStarTree); + } + return getDefaultLeafCollector(ctx, sub); + } + + private LeafBucketCollector getDefaultLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { final BigArrays bigArrays = context.bigArrays(); final SortedNumericDoubleValues values = valuesSource.doubleValues(ctx); final CompensatedSum kahanSummation = new CompensatedSum(0, 0); @@ -126,6 +147,59 @@ public void collect(int doc, long bucket) throws IOException { }; } + public LeafBucketCollector getStarTreeLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub, CompositeIndexFieldInfo starTree) + throws IOException { + StarTreeValues starTreeValues = StarTreeQueryHelper.getStarTreeValues(ctx, starTree); + assert starTreeValues != null; + + String fieldName = ((ValuesSource.Numeric.FieldData) valuesSource).getIndexFieldName(); + String sumMetricName = StarTreeUtils.fullyQualifiedFieldNameForStarTreeMetricsDocValues( + starTree.getField(), + fieldName, + MetricStat.SUM.getTypeName() + ); + String countMetricName = StarTreeUtils.fullyQualifiedFieldNameForStarTreeMetricsDocValues( + starTree.getField(), + fieldName, + MetricStat.VALUE_COUNT.getTypeName() + ); + + final CompensatedSum kahanSummation = new CompensatedSum(sums.get(0), 0); + SortedNumericStarTreeValuesIterator sumValuesIterator = (SortedNumericStarTreeValuesIterator) starTreeValues + .getMetricValuesIterator(sumMetricName); + SortedNumericStarTreeValuesIterator countValueIterator = (SortedNumericStarTreeValuesIterator) starTreeValues + .getMetricValuesIterator(countMetricName); + FixedBitSet matchedDocIds = getStarTreeFilteredValues(context, ctx, starTreeValues); + assert matchedDocIds != null; + + int numBits = matchedDocIds.length(); // Get the length of the FixedBitSet + if (numBits > 0) { + // Iterate over the FixedBitSet + for (int bit = matchedDocIds.nextSetBit(0); bit != DocIdSetIterator.NO_MORE_DOCS; bit = bit + 1 < numBits + ? matchedDocIds.nextSetBit(bit + 1) + : DocIdSetIterator.NO_MORE_DOCS) { + // Advance to the bit (entryId) in the valuesIterator + if ((sumValuesIterator.advanceExact(bit) && countValueIterator.advanceExact(bit)) == false) { + continue; // Skip if no more entries + } + + // Iterate over the values for the current entryId + for (int i = 0; i < sumValuesIterator.entryValueCount(); i++) { + kahanSummation.add(NumericUtils.sortableLongToDouble(sumValuesIterator.nextValue())); + counts.increment(0, countValueIterator.nextValue()); // Apply the consumer operation (e.g., max, sum) + } + } + } + + sums.set(0, kahanSummation.value()); + return new LeafBucketCollectorBase(sub, valuesSource.doubleValues(ctx)) { + @Override + public void collect(int doc, long bucket) { + throw new CollectionTerminatedException(); + } + }; + } + @Override public double metric(long owningBucketOrd) { if (valuesSource == null || owningBucketOrd >= sums.size()) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregatorFactory.java index 0a09fae1eaebe..57389f19b4577 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregatorFactory.java @@ -32,13 +32,13 @@ package org.opensearch.search.aggregations.metrics; +import org.opensearch.index.compositeindex.datacube.MetricStat; import org.opensearch.index.query.QueryShardContext; import org.opensearch.search.aggregations.Aggregator; import org.opensearch.search.aggregations.AggregatorFactories; import org.opensearch.search.aggregations.AggregatorFactory; import org.opensearch.search.aggregations.CardinalityUpperBound; import org.opensearch.search.aggregations.support.CoreValuesSourceType; -import org.opensearch.search.aggregations.support.ValuesSourceAggregatorFactory; import org.opensearch.search.aggregations.support.ValuesSourceConfig; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; import org.opensearch.search.internal.SearchContext; @@ -52,7 +52,7 @@ * * @opensearch.internal */ -class AvgAggregatorFactory extends ValuesSourceAggregatorFactory { +class AvgAggregatorFactory extends MetricAggregatorFactory { AvgAggregatorFactory( String name, @@ -65,6 +65,11 @@ class AvgAggregatorFactory extends ValuesSourceAggregatorFactory { super(name, config, queryShardContext, parent, subFactoriesBuilder, metadata); } + @Override + public MetricStat getMetricStat() { + return MetricStat.AVG; + } + static void registerAggregators(ValuesSourceRegistry.Builder builder) { builder.register( AvgAggregationBuilder.REGISTRY_KEY, diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java index 8108b8a726856..257109bca54bb 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java @@ -37,9 +37,13 @@ import org.apache.lucene.search.CollectionTerminatedException; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.util.Bits; +import org.apache.lucene.util.NumericUtils; import org.opensearch.common.lease.Releasables; import org.opensearch.common.util.BigArrays; import org.opensearch.common.util.DoubleArray; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.compositeindex.datacube.MetricStat; +import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeQueryHelper; import org.opensearch.index.fielddata.NumericDoubleValues; import org.opensearch.index.fielddata.SortedNumericDoubleValues; import org.opensearch.search.DocValueFormat; @@ -55,8 +59,11 @@ import java.io.IOException; import java.util.Arrays; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeQueryHelper.getSupportedStarTree; + /** * Aggregate all docs into a max value * @@ -120,6 +127,16 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc throw new CollectionTerminatedException(); } } + + CompositeIndexFieldInfo supportedStarTree = getSupportedStarTree(this.context); + if (supportedStarTree != null) { + return getStarTreeCollector(ctx, sub, supportedStarTree); + } + return getDefaultLeafCollector(ctx, sub); + } + + private LeafBucketCollector getDefaultLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { + final BigArrays bigArrays = context.bigArrays(); final SortedNumericDoubleValues allValues = valuesSource.doubleValues(ctx); final NumericDoubleValues values = MultiValueMode.MAX.select(allValues); @@ -143,6 +160,23 @@ public void collect(int doc, long bucket) throws IOException { }; } + public LeafBucketCollector getStarTreeCollector(LeafReaderContext ctx, LeafBucketCollector sub, CompositeIndexFieldInfo starTree) + throws IOException { + AtomicReference max = new AtomicReference<>(maxes.get(0)); + return StarTreeQueryHelper.getStarTreeLeafCollector( + context, + valuesSource, + ctx, + sub, + starTree, + MetricStat.MAX.getTypeName(), + value -> { + max.set(Math.max(max.get(), (NumericUtils.sortableLongToDouble(value)))); + }, + () -> maxes.set(0, max.get()) + ); + } + @Override public double metric(long owningBucketOrd) { if (valuesSource == null || owningBucketOrd >= maxes.size()) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregatorFactory.java index 4fe936c8b7797..c0ee471c87f29 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregatorFactory.java @@ -32,13 +32,13 @@ package org.opensearch.search.aggregations.metrics; +import org.opensearch.index.compositeindex.datacube.MetricStat; import org.opensearch.index.query.QueryShardContext; import org.opensearch.search.aggregations.Aggregator; import org.opensearch.search.aggregations.AggregatorFactories; import org.opensearch.search.aggregations.AggregatorFactory; import org.opensearch.search.aggregations.CardinalityUpperBound; import org.opensearch.search.aggregations.support.CoreValuesSourceType; -import org.opensearch.search.aggregations.support.ValuesSourceAggregatorFactory; import org.opensearch.search.aggregations.support.ValuesSourceConfig; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; import org.opensearch.search.internal.SearchContext; @@ -52,7 +52,7 @@ * * @opensearch.internal */ -class MaxAggregatorFactory extends ValuesSourceAggregatorFactory { +class MaxAggregatorFactory extends MetricAggregatorFactory { static void registerAggregators(ValuesSourceRegistry.Builder builder) { builder.register( @@ -74,6 +74,11 @@ static void registerAggregators(ValuesSourceRegistry.Builder builder) { super(name, config, queryShardContext, parent, subFactoriesBuilder, metadata); } + @Override + public MetricStat getMetricStat() { + return MetricStat.MAX; + } + @Override protected Aggregator createUnmapped(SearchContext searchContext, Aggregator parent, Map metadata) throws IOException { return new MaxAggregator(name, config, searchContext, parent, metadata); diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/MetricAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/MetricAggregatorFactory.java new file mode 100644 index 0000000000000..0ac630cf051d3 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/MetricAggregatorFactory.java @@ -0,0 +1,37 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.aggregations.metrics; + +import org.opensearch.index.compositeindex.datacube.MetricStat; +import org.opensearch.index.query.QueryShardContext; +import org.opensearch.search.aggregations.AggregatorFactories; +import org.opensearch.search.aggregations.AggregatorFactory; +import org.opensearch.search.aggregations.support.ValuesSourceAggregatorFactory; +import org.opensearch.search.aggregations.support.ValuesSourceConfig; + +import java.io.IOException; +import java.util.Map; + +/** + * Extending ValuesSourceAggregatorFactory for aggregation factories supported by star-tree implementation + */ +public abstract class MetricAggregatorFactory extends ValuesSourceAggregatorFactory { + public MetricAggregatorFactory( + String name, + ValuesSourceConfig config, + QueryShardContext queryShardContext, + AggregatorFactory parent, + AggregatorFactories.Builder subFactoriesBuilder, + Map metadata + ) throws IOException { + super(name, config, queryShardContext, parent, subFactoriesBuilder, metadata); + } + + public abstract MetricStat getMetricStat(); +} diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java index 946057e42ac88..a9f20bdeb5fd5 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java @@ -37,9 +37,13 @@ import org.apache.lucene.search.CollectionTerminatedException; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.util.Bits; +import org.apache.lucene.util.NumericUtils; import org.opensearch.common.lease.Releasables; import org.opensearch.common.util.BigArrays; import org.opensearch.common.util.DoubleArray; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.compositeindex.datacube.MetricStat; +import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeQueryHelper; import org.opensearch.index.fielddata.NumericDoubleValues; import org.opensearch.index.fielddata.SortedNumericDoubleValues; import org.opensearch.search.DocValueFormat; @@ -54,8 +58,11 @@ import java.io.IOException; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeQueryHelper.getSupportedStarTree; + /** * Aggregate all docs into a min value * @@ -119,6 +126,15 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc throw new CollectionTerminatedException(); } } + + CompositeIndexFieldInfo supportedStarTree = getSupportedStarTree(this.context); + if (supportedStarTree != null) { + return getStarTreeCollector(ctx, sub, supportedStarTree); + } + return getDefaultLeafCollector(ctx, sub); + } + + private LeafBucketCollector getDefaultLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { final BigArrays bigArrays = context.bigArrays(); final SortedNumericDoubleValues allValues = valuesSource.doubleValues(ctx); final NumericDoubleValues values = MultiValueMode.MIN.select(allValues); @@ -138,10 +154,26 @@ public void collect(int doc, long bucket) throws IOException { mins.set(bucket, min); } } - }; } + public LeafBucketCollector getStarTreeCollector(LeafReaderContext ctx, LeafBucketCollector sub, CompositeIndexFieldInfo starTree) + throws IOException { + AtomicReference min = new AtomicReference<>(mins.get(0)); + return StarTreeQueryHelper.getStarTreeLeafCollector( + context, + valuesSource, + ctx, + sub, + starTree, + MetricStat.MIN.getTypeName(), + value -> { + min.set(Math.min(min.get(), (NumericUtils.sortableLongToDouble(value)))); + }, + () -> mins.set(0, min.get()) + ); + } + @Override public double metric(long owningBucketOrd) { if (valuesSource == null || owningBucketOrd >= mins.size()) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregatorFactory.java index 58fbe5edefd12..44c0d9d7d11eb 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregatorFactory.java @@ -32,13 +32,13 @@ package org.opensearch.search.aggregations.metrics; +import org.opensearch.index.compositeindex.datacube.MetricStat; import org.opensearch.index.query.QueryShardContext; import org.opensearch.search.aggregations.Aggregator; import org.opensearch.search.aggregations.AggregatorFactories; import org.opensearch.search.aggregations.AggregatorFactory; import org.opensearch.search.aggregations.CardinalityUpperBound; import org.opensearch.search.aggregations.support.CoreValuesSourceType; -import org.opensearch.search.aggregations.support.ValuesSourceAggregatorFactory; import org.opensearch.search.aggregations.support.ValuesSourceConfig; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; import org.opensearch.search.internal.SearchContext; @@ -52,7 +52,7 @@ * * @opensearch.internal */ -class MinAggregatorFactory extends ValuesSourceAggregatorFactory { +class MinAggregatorFactory extends MetricAggregatorFactory { static void registerAggregators(ValuesSourceRegistry.Builder builder) { builder.register( @@ -74,6 +74,11 @@ static void registerAggregators(ValuesSourceRegistry.Builder builder) { super(name, config, queryShardContext, parent, subFactoriesBuilder, metadata); } + @Override + public MetricStat getMetricStat() { + return MetricStat.MIN; + } + @Override protected Aggregator createUnmapped(SearchContext searchContext, Aggregator parent, Map metadata) throws IOException { return new MinAggregator(name, config, searchContext, parent, metadata); diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java index 4b8e882cd69bc..3d237a94c5699 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java @@ -33,9 +33,13 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.util.NumericUtils; import org.opensearch.common.lease.Releasables; import org.opensearch.common.util.BigArrays; import org.opensearch.common.util.DoubleArray; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.compositeindex.datacube.MetricStat; +import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeQueryHelper; import org.opensearch.index.fielddata.SortedNumericDoubleValues; import org.opensearch.search.DocValueFormat; import org.opensearch.search.aggregations.Aggregator; @@ -49,6 +53,8 @@ import java.io.IOException; import java.util.Map; +import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeQueryHelper.getSupportedStarTree; + /** * Aggregate all docs into a single sum value * @@ -89,6 +95,15 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc if (valuesSource == null) { return LeafBucketCollector.NO_OP_COLLECTOR; } + + CompositeIndexFieldInfo supportedStarTree = getSupportedStarTree(this.context); + if (supportedStarTree != null) { + return getStarTreeCollector(ctx, sub, supportedStarTree); + } + return getDefaultLeafCollector(ctx, sub); + } + + private LeafBucketCollector getDefaultLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { final BigArrays bigArrays = context.bigArrays(); final SortedNumericDoubleValues values = valuesSource.doubleValues(ctx); final CompensatedSum kahanSummation = new CompensatedSum(0, 0); @@ -118,6 +133,21 @@ public void collect(int doc, long bucket) throws IOException { }; } + public LeafBucketCollector getStarTreeCollector(LeafReaderContext ctx, LeafBucketCollector sub, CompositeIndexFieldInfo starTree) + throws IOException { + final CompensatedSum kahanSummation = new CompensatedSum(sums.get(0), 0); + return StarTreeQueryHelper.getStarTreeLeafCollector( + context, + valuesSource, + ctx, + sub, + starTree, + MetricStat.SUM.getTypeName(), + value -> kahanSummation.add(NumericUtils.sortableLongToDouble(value)), + () -> sums.set(0, kahanSummation.value()) + ); + } + @Override public double metric(long owningBucketOrd) { if (valuesSource == null || owningBucketOrd >= sums.size()) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregatorFactory.java index ef9b93920ba18..e2e25a8c25a87 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregatorFactory.java @@ -32,13 +32,13 @@ package org.opensearch.search.aggregations.metrics; +import org.opensearch.index.compositeindex.datacube.MetricStat; import org.opensearch.index.query.QueryShardContext; import org.opensearch.search.aggregations.Aggregator; import org.opensearch.search.aggregations.AggregatorFactories; import org.opensearch.search.aggregations.AggregatorFactory; import org.opensearch.search.aggregations.CardinalityUpperBound; import org.opensearch.search.aggregations.support.CoreValuesSourceType; -import org.opensearch.search.aggregations.support.ValuesSourceAggregatorFactory; import org.opensearch.search.aggregations.support.ValuesSourceConfig; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; import org.opensearch.search.internal.SearchContext; @@ -52,7 +52,7 @@ * * @opensearch.internal */ -class SumAggregatorFactory extends ValuesSourceAggregatorFactory { +class SumAggregatorFactory extends MetricAggregatorFactory { SumAggregatorFactory( String name, @@ -65,6 +65,11 @@ class SumAggregatorFactory extends ValuesSourceAggregatorFactory { super(name, config, queryShardContext, parent, subFactoriesBuilder, metadata); } + @Override + public MetricStat getMetricStat() { + return MetricStat.SUM; + } + static void registerAggregators(ValuesSourceRegistry.Builder builder) { builder.register( SumAggregationBuilder.REGISTRY_KEY, diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregator.java index 6f9be06231819..a156ec49983fa 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregator.java @@ -37,6 +37,9 @@ import org.opensearch.common.lease.Releasables; import org.opensearch.common.util.BigArrays; import org.opensearch.common.util.LongArray; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.compositeindex.datacube.MetricStat; +import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeQueryHelper; import org.opensearch.index.fielddata.MultiGeoPointValues; import org.opensearch.index.fielddata.SortedBinaryDocValues; import org.opensearch.search.aggregations.Aggregator; @@ -50,6 +53,8 @@ import java.io.IOException; import java.util.Map; +import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeQueryHelper.getSupportedStarTree; + /** * A field data based aggregator that counts the number of values a specific field has within the aggregation context. *

@@ -88,6 +93,12 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc final BigArrays bigArrays = context.bigArrays(); if (valuesSource instanceof ValuesSource.Numeric) { + + CompositeIndexFieldInfo supportedStarTree = getSupportedStarTree(this.context); + if (supportedStarTree != null) { + return getStarTreeCollector(ctx, sub, supportedStarTree); + } + final SortedNumericDocValues values = ((ValuesSource.Numeric) valuesSource).longValues(ctx); return new LeafBucketCollectorBase(sub, values) { @@ -124,10 +135,23 @@ public void collect(int doc, long bucket) throws IOException { counts.increment(bucket, values.docValueCount()); } } - }; } + public LeafBucketCollector getStarTreeCollector(LeafReaderContext ctx, LeafBucketCollector sub, CompositeIndexFieldInfo starTree) + throws IOException { + return StarTreeQueryHelper.getStarTreeLeafCollector( + context, + (ValuesSource.Numeric) valuesSource, + ctx, + sub, + starTree, + MetricStat.VALUE_COUNT.getTypeName(), + value -> counts.increment(0, value), + () -> {} + ); + } + @Override public double metric(long owningBucketOrd) { return (valuesSource == null || owningBucketOrd >= counts.size()) ? 0 : counts.get(owningBucketOrd); diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregatorFactory.java index 4a04dd2e0a932..0c82279484461 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregatorFactory.java @@ -32,13 +32,13 @@ package org.opensearch.search.aggregations.metrics; +import org.opensearch.index.compositeindex.datacube.MetricStat; import org.opensearch.index.query.QueryShardContext; import org.opensearch.search.aggregations.Aggregator; import org.opensearch.search.aggregations.AggregatorFactories; import org.opensearch.search.aggregations.AggregatorFactory; import org.opensearch.search.aggregations.CardinalityUpperBound; import org.opensearch.search.aggregations.support.CoreValuesSourceType; -import org.opensearch.search.aggregations.support.ValuesSourceAggregatorFactory; import org.opensearch.search.aggregations.support.ValuesSourceConfig; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; import org.opensearch.search.internal.SearchContext; @@ -51,7 +51,7 @@ * * @opensearch.internal */ -class ValueCountAggregatorFactory extends ValuesSourceAggregatorFactory { +class ValueCountAggregatorFactory extends MetricAggregatorFactory { public static void registerAggregators(ValuesSourceRegistry.Builder builder) { builder.register(ValueCountAggregationBuilder.REGISTRY_KEY, CoreValuesSourceType.ALL_CORE, ValueCountAggregator::new, true); @@ -68,6 +68,11 @@ public static void registerAggregators(ValuesSourceRegistry.Builder builder) { super(name, config, queryShardContext, parent, subFactoriesBuilder, metadata); } + @Override + public MetricStat getMetricStat() { + return MetricStat.VALUE_COUNT; + } + @Override protected Aggregator createUnmapped(SearchContext searchContext, Aggregator parent, Map metadata) throws IOException { return new ValueCountAggregator(name, config, searchContext, parent, metadata); diff --git a/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSource.java b/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSource.java index 1f4dd429e094e..5732d545cb2d2 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSource.java +++ b/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSource.java @@ -625,6 +625,10 @@ public SortedNumericDocValues longValues(LeafReaderContext context) { public SortedNumericDoubleValues doubleValues(LeafReaderContext context) { return indexFieldData.load(context).getDoubleValues(); } + + public String getIndexFieldName() { + return indexFieldData.getFieldName(); + } } /** diff --git a/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSourceAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSourceAggregatorFactory.java index 69a4a5d8b6703..d862b2c2784de 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSourceAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSourceAggregatorFactory.java @@ -102,4 +102,8 @@ protected abstract Aggregator doCreateInternal( public String getStatsSubtype() { return config.valueSourceType().typeName(); } + + public String getField() { + return config.fieldContext().field(); + } } diff --git a/server/src/main/java/org/opensearch/search/internal/SearchContext.java b/server/src/main/java/org/opensearch/search/internal/SearchContext.java index 4b1b720a1aed7..641efdec015eb 100644 --- a/server/src/main/java/org/opensearch/search/internal/SearchContext.java +++ b/server/src/main/java/org/opensearch/search/internal/SearchContext.java @@ -76,6 +76,7 @@ import org.opensearch.search.query.ReduceableSearchResult; import org.opensearch.search.rescore.RescoreContext; import org.opensearch.search.sort.SortAndFormats; +import org.opensearch.search.startree.StarTreeQueryContext; import org.opensearch.search.suggest.SuggestionSearchContext; import java.util.Collection; @@ -124,8 +125,8 @@ public List toInternalAggregations(Collection co private final List releasables = new CopyOnWriteArrayList<>(); private final AtomicBoolean closed = new AtomicBoolean(false); private InnerHitsContext innerHitsContext; - private volatile boolean searchTimedOut; + private StarTreeQueryContext starTreeQueryContext; protected SearchContext() {} @@ -535,4 +536,12 @@ public boolean keywordIndexOrDocValuesEnabled() { return false; } + public SearchContext starTreeQueryContext(StarTreeQueryContext starTreeQueryContext) { + this.starTreeQueryContext = starTreeQueryContext; + return this; + } + + public StarTreeQueryContext getStarTreeQueryContext() { + return this.starTreeQueryContext; + } } diff --git a/server/src/main/java/org/opensearch/search/startree/StarTreeFilter.java b/server/src/main/java/org/opensearch/search/startree/StarTreeFilter.java new file mode 100644 index 0000000000000..f7fa210691678 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/startree/StarTreeFilter.java @@ -0,0 +1,228 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.startree; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.util.DocIdSetBuilder; +import org.apache.lucene.util.FixedBitSet; +import org.opensearch.index.compositeindex.datacube.Dimension; +import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues; +import org.opensearch.index.compositeindex.datacube.startree.node.StarTreeNode; +import org.opensearch.index.compositeindex.datacube.startree.node.StarTreeNodeType; +import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedNumericStarTreeValuesIterator; +import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.StarTreeValuesIterator; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; + +/** + * Filter operator for star tree data structure. + * + * @opensearch.experimental + * @opensearch.internal + */ +public class StarTreeFilter { + private static final Logger logger = LogManager.getLogger(StarTreeFilter.class); + + /** + * First go over the star tree and try to match as many dimensions as possible + * For the remaining columns, use star-tree doc values to match them + */ + public static FixedBitSet getStarTreeResult(StarTreeValues starTreeValues, Map predicateEvaluators) throws IOException { + Map queryMap = predicateEvaluators != null ? predicateEvaluators : Collections.emptyMap(); + StarTreeResult starTreeResult = traverseStarTree(starTreeValues, queryMap); + + // Initialize FixedBitSet with size maxMatchedDoc + 1 + FixedBitSet bitSet = new FixedBitSet(starTreeResult.maxMatchedDoc + 1); + SortedNumericStarTreeValuesIterator starTreeValuesIterator = new SortedNumericStarTreeValuesIterator( + starTreeResult.matchedDocIds.build().iterator() + ); + + // No matches, return an empty FixedBitSet + if (starTreeResult.maxMatchedDoc == -1) { + return bitSet; + } + + // Set bits in FixedBitSet for initially matched documents + while (starTreeValuesIterator.nextEntry() != NO_MORE_DOCS) { + bitSet.set(starTreeValuesIterator.entryId()); + } + + // Temporary FixedBitSet reused for filtering + FixedBitSet tempBitSet = new FixedBitSet(starTreeResult.maxMatchedDoc + 1); + + // Process remaining predicate columns to further filter the results + for (String remainingPredicateColumn : starTreeResult.remainingPredicateColumns) { + logger.debug("remainingPredicateColumn : {}, maxMatchedDoc : {} ", remainingPredicateColumn, starTreeResult.maxMatchedDoc); + + SortedNumericStarTreeValuesIterator ndv = (SortedNumericStarTreeValuesIterator) starTreeValues.getDimensionValuesIterator( + remainingPredicateColumn + ); + + long queryValue = queryMap.get(remainingPredicateColumn); // Get the query value directly + + // Clear the temporary bit set before reuse + tempBitSet.clear(0, starTreeResult.maxMatchedDoc + 1); + + if (bitSet.length() > 0) { + // Iterate over the current set of matched document IDs + for (int entryId = bitSet.nextSetBit(0); entryId != DocIdSetIterator.NO_MORE_DOCS; entryId = (entryId + 1 < bitSet.length()) + ? bitSet.nextSetBit(entryId + 1) + : DocIdSetIterator.NO_MORE_DOCS) { + if (ndv.advance(entryId) != StarTreeValuesIterator.NO_MORE_ENTRIES) { + final int valuesCount = ndv.entryValueCount(); + for (int i = 0; i < valuesCount; i++) { + long value = ndv.nextValue(); + // Compare the value with the query value + if (value == queryValue) { + tempBitSet.set(entryId); // Set bit for the matching entryId + break; // No need to check other values for this entryId + } + } + } + } + } + + // Perform intersection of the current matches with the temp results for this predicate + bitSet.and(tempBitSet); + } + + return bitSet; // Return the final FixedBitSet with all matches + } + + /** + * Helper method to traverse the star tree, get matching documents and keep track of all the + * predicate dimensions that are not matched. + */ + private static StarTreeResult traverseStarTree(StarTreeValues starTreeValues, Map queryMap) throws IOException { + DocIdSetBuilder docsWithField = new DocIdSetBuilder(starTreeValues.getStarTreeDocumentCount()); + DocIdSetBuilder.BulkAdder adder; + Set globalRemainingPredicateColumns = null; + StarTreeNode starTree = starTreeValues.getRoot(); + List dimensionNames = starTreeValues.getStarTreeField() + .getDimensionsOrder() + .stream() + .map(Dimension::getField) + .collect(Collectors.toList()); + boolean foundLeafNode = starTree.isLeaf(); + assert foundLeafNode == false; // root node is never leaf + Queue queue = new ArrayDeque<>(); + queue.add(starTree); + int currentDimensionId = -1; + Set remainingPredicateColumns = new HashSet<>(queryMap.keySet()); + int matchedDocsCountInStarTree = 0; + int maxDocNum = -1; + StarTreeNode starTreeNode; + List docIds = new ArrayList<>(); + + while ((starTreeNode = queue.poll()) != null) { + int dimensionId = starTreeNode.getDimensionId(); + if (dimensionId > currentDimensionId) { + String dimension = dimensionNames.get(dimensionId); + remainingPredicateColumns.remove(dimension); + if (foundLeafNode && globalRemainingPredicateColumns == null) { + globalRemainingPredicateColumns = new HashSet<>(remainingPredicateColumns); + } + currentDimensionId = dimensionId; + } + + if (remainingPredicateColumns.isEmpty()) { + int docId = starTreeNode.getAggregatedDocId(); + docIds.add(docId); + matchedDocsCountInStarTree++; + maxDocNum = Math.max(docId, maxDocNum); + continue; + } + + if (starTreeNode.isLeaf()) { + for (long i = starTreeNode.getStartDocId(); i < starTreeNode.getEndDocId(); i++) { + docIds.add((int) i); + matchedDocsCountInStarTree++; + maxDocNum = Math.max((int) i, maxDocNum); + } + continue; + } + + String childDimension = dimensionNames.get(dimensionId + 1); + StarTreeNode starNode = null; + if (globalRemainingPredicateColumns == null || !globalRemainingPredicateColumns.contains(childDimension)) { + starNode = starTreeNode.getChildStarNode(); + } + + if (remainingPredicateColumns.contains(childDimension)) { + long queryValue = queryMap.get(childDimension); // Get the query value directly from the map + StarTreeNode matchingChild = starTreeNode.getChildForDimensionValue(queryValue); + if (matchingChild != null) { + queue.add(matchingChild); + foundLeafNode |= matchingChild.isLeaf(); + } + } else { + if (starNode != null) { + queue.add(starNode); + foundLeafNode |= starNode.isLeaf(); + } else { + Iterator childrenIterator = starTreeNode.getChildrenIterator(); + while (childrenIterator.hasNext()) { + StarTreeNode childNode = childrenIterator.next(); + if (childNode.getStarTreeNodeType() != StarTreeNodeType.STAR.getValue()) { + queue.add(childNode); + foundLeafNode |= childNode.isLeaf(); + } + } + } + } + } + + adder = docsWithField.grow(docIds.size()); + for (int id : docIds) { + adder.add(id); + } + return new StarTreeResult( + docsWithField, + globalRemainingPredicateColumns != null ? globalRemainingPredicateColumns : Collections.emptySet(), + matchedDocsCountInStarTree, + maxDocNum + ); + } + + /** + * Helper class to wrap the result from traversing the star tree. + * */ + private static class StarTreeResult { + public final DocIdSetBuilder matchedDocIds; + public final Set remainingPredicateColumns; + public final int numOfMatchedDocs; + public final int maxMatchedDoc; + + public StarTreeResult( + DocIdSetBuilder matchedDocIds, + Set remainingPredicateColumns, + int numOfMatchedDocs, + int maxMatchedDoc + ) { + this.matchedDocIds = matchedDocIds; + this.remainingPredicateColumns = remainingPredicateColumns; + this.numOfMatchedDocs = numOfMatchedDocs; + this.maxMatchedDoc = maxMatchedDoc; + } + } +} diff --git a/server/src/main/java/org/opensearch/search/startree/StarTreeQueryContext.java b/server/src/main/java/org/opensearch/search/startree/StarTreeQueryContext.java new file mode 100644 index 0000000000000..cda3a25b30e53 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/startree/StarTreeQueryContext.java @@ -0,0 +1,79 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.startree; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.util.FixedBitSet; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; + +import java.util.Map; + +/** + * Query class for querying star tree data structure. + * + * @opensearch.experimental + */ +@ExperimentalApi +public class StarTreeQueryContext { + + /** + * Star tree field info + * This is used to get the star tree data structure + */ + private final CompositeIndexFieldInfo starTree; + + /** + * Map of field name to a value to be queried for that field + * This is used to filter the data based on the query + */ + private final Map queryMap; + + /** + * Cache for leaf results + * This is used to cache the results for each leaf reader context + * to avoid reading the filtered values from the leaf reader context multiple times + */ + private final FixedBitSet[] starTreeValues; + + public StarTreeQueryContext(CompositeIndexFieldInfo starTree, Map queryMap, int numSegmentsCache) { + this.starTree = starTree; + this.queryMap = queryMap; + if (numSegmentsCache > -1) { + starTreeValues = new FixedBitSet[numSegmentsCache]; + } else { + starTreeValues = null; + } + } + + public CompositeIndexFieldInfo getStarTree() { + return starTree; + } + + public Map getQueryMap() { + return queryMap; + } + + public FixedBitSet[] getStarTreeValues() { + return starTreeValues; + } + + public FixedBitSet getStarTreeValues(LeafReaderContext ctx) { + if (starTreeValues != null) { + return starTreeValues[ctx.ord]; + } + return null; + } + + public void setStarTreeValues(LeafReaderContext ctx, FixedBitSet values) { + if (starTreeValues != null) { + starTreeValues[ctx.ord] = values; + } + } +} diff --git a/server/src/main/java/org/opensearch/search/startree/package-info.java b/server/src/main/java/org/opensearch/search/startree/package-info.java new file mode 100644 index 0000000000000..601a588e54e69 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/startree/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Star Tree query classes */ +package org.opensearch.search.startree; diff --git a/server/src/test/java/org/opensearch/index/codec/composite912/datacube/startree/StarTreeDocValuesFormatTests.java b/server/src/test/java/org/opensearch/index/codec/composite912/datacube/startree/StarTreeDocValuesFormatTests.java index d35fc6b111c9f..f081cadc1362c 100644 --- a/server/src/test/java/org/opensearch/index/codec/composite912/datacube/startree/StarTreeDocValuesFormatTests.java +++ b/server/src/test/java/org/opensearch/index/codec/composite912/datacube/startree/StarTreeDocValuesFormatTests.java @@ -109,7 +109,7 @@ protected Codec getCodec() { final Logger testLogger = LogManager.getLogger(StarTreeDocValuesFormatTests.class); try { - createMapperService(getExpandedMapping()); + mapperService = createMapperService(getExpandedMapping()); } catch (IOException e) { throw new RuntimeException(e); } @@ -307,7 +307,7 @@ public void testStarTreeDocValuesWithDeletions() throws IOException { directory.close(); } - private XContentBuilder getExpandedMapping() throws IOException { + public static XContentBuilder getExpandedMapping() throws IOException { return topMapping(b -> { b.startObject("composite"); b.startObject("startree"); @@ -361,13 +361,13 @@ private XContentBuilder getExpandedMapping() throws IOException { }); } - private XContentBuilder topMapping(CheckedConsumer buildFields) throws IOException { + public static XContentBuilder topMapping(CheckedConsumer buildFields) throws IOException { XContentBuilder builder = XContentFactory.jsonBuilder().startObject().startObject("_doc"); buildFields.accept(builder); return builder.endObject().endObject(); } - private void createMapperService(XContentBuilder builder) throws IOException { + public static MapperService createMapperService(XContentBuilder builder) throws IOException { Settings settings = Settings.builder() .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) @@ -377,7 +377,7 @@ private void createMapperService(XContentBuilder builder) throws IOException { .build(); IndexMetadata indexMetadata = IndexMetadata.builder("test").settings(settings).putMapping(builder.toString()).build(); IndicesModule indicesModule = new IndicesModule(Collections.emptyList()); - mapperService = MapperTestUtils.newMapperServiceWithHelperAnalyzer( + MapperService mapperService = MapperTestUtils.newMapperServiceWithHelperAnalyzer( new NamedXContentRegistry(ClusterModule.getNamedXWriteables()), createTempDir(), settings, @@ -385,5 +385,6 @@ private void createMapperService(XContentBuilder builder) throws IOException { "test" ); mapperService.merge(indexMetadata, MapperService.MergeReason.INDEX_TEMPLATE); + return mapperService; } } diff --git a/server/src/test/java/org/opensearch/search/SearchServiceStarTreeTests.java b/server/src/test/java/org/opensearch/search/SearchServiceStarTreeTests.java new file mode 100644 index 0000000000000..0c88154ca2b38 --- /dev/null +++ b/server/src/test/java/org/opensearch/search/SearchServiceStarTreeTests.java @@ -0,0 +1,160 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search; + +import org.opensearch.action.OriginalIndices; +import org.opensearch.action.admin.indices.create.CreateIndexRequestBuilder; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.core.common.Strings; +import org.opensearch.index.IndexService; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.codec.composite912.datacube.startree.StarTreeDocValuesFormatTests; +import org.opensearch.index.compositeindex.CompositeIndexSettings; +import org.opensearch.index.compositeindex.datacube.startree.StarTreeIndexSettings; +import org.opensearch.index.mapper.CompositeMappedFieldType; +import org.opensearch.index.query.MatchAllQueryBuilder; +import org.opensearch.index.query.TermQueryBuilder; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.indices.IndicesService; +import org.opensearch.search.aggregations.AggregationBuilders; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.search.internal.AliasFilter; +import org.opensearch.search.internal.ReaderContext; +import org.opensearch.search.internal.SearchContext; +import org.opensearch.search.internal.ShardSearchRequest; +import org.opensearch.search.startree.StarTreeQueryContext; +import org.opensearch.test.OpenSearchSingleNodeTestCase; + +import java.io.IOException; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; + +public class SearchServiceStarTreeTests extends OpenSearchSingleNodeTestCase { + + public void testParseQueryToOriginalOrStarTreeQuery() throws IOException { + FeatureFlags.initializeFeatureFlags(Settings.builder().put(FeatureFlags.STAR_TREE_INDEX, true).build()); + setStarTreeIndexSetting("true"); + + Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(StarTreeIndexSettings.IS_COMPOSITE_INDEX_SETTING.getKey(), true) + .build(); + CreateIndexRequestBuilder builder = client().admin() + .indices() + .prepareCreate("test") + .setSettings(settings) + .setMapping(StarTreeDocValuesFormatTests.getExpandedMapping()); + createIndex("test", builder); + + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService indexService = indicesService.indexServiceSafe(resolveIndex("test")); + IndexShard indexShard = indexService.getShard(0); + ShardSearchRequest request = new ShardSearchRequest( + OriginalIndices.NONE, + new SearchRequest().allowPartialSearchResults(true), + indexShard.shardId(), + 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), + 1.0f, + -1, + null, + null + ); + + // Case 1: No query or aggregations, should not use star tree + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); + assertStarTreeContext(request, sourceBuilder, null, -1); + + // Case 2: MatchAllQuery present but no aggregations, should not use star tree + sourceBuilder = new SearchSourceBuilder().query(new MatchAllQueryBuilder()); + assertStarTreeContext(request, sourceBuilder, null, -1); + + // Case 3: MatchAllQuery and aggregations present, should use star tree + sourceBuilder = new SearchSourceBuilder().size(0) + .query(new MatchAllQueryBuilder()) + .aggregation(AggregationBuilders.max("test").field("field")); + CompositeIndexFieldInfo expectedStarTree = new CompositeIndexFieldInfo( + "startree", + CompositeMappedFieldType.CompositeFieldType.STAR_TREE + ); + Map expectedQueryMap = null; + assertStarTreeContext(request, sourceBuilder, new StarTreeQueryContext(expectedStarTree, expectedQueryMap, -1), -1); + + // Case 4: MatchAllQuery and aggregations present, but postFilter specified, should not use star tree + sourceBuilder = new SearchSourceBuilder().size(0) + .query(new MatchAllQueryBuilder()) + .aggregation(AggregationBuilders.max("test").field("field")) + .postFilter(new MatchAllQueryBuilder()); + assertStarTreeContext(request, sourceBuilder, null, -1); + + // Case 5: TermQuery and single aggregation, should use star tree, but not initialize query cache + sourceBuilder = new SearchSourceBuilder().size(0) + .query(new TermQueryBuilder("sndv", 1)) + .aggregation(AggregationBuilders.max("test").field("field")); + expectedQueryMap = Map.of("sndv", 1L); + assertStarTreeContext(request, sourceBuilder, new StarTreeQueryContext(expectedStarTree, expectedQueryMap, -1), -1); + + // Case 6: TermQuery and multiple aggregations present, should use star tree & initialize cache + sourceBuilder = new SearchSourceBuilder().size(0) + .query(new TermQueryBuilder("sndv", 1)) + .aggregation(AggregationBuilders.max("test").field("field")) + .aggregation(AggregationBuilders.sum("test2").field("field")); + expectedQueryMap = Map.of("sndv", 1L); + assertStarTreeContext(request, sourceBuilder, new StarTreeQueryContext(expectedStarTree, expectedQueryMap, 0), 0); + + // Case 7: No query, metric aggregations present, should use star tree + sourceBuilder = new SearchSourceBuilder().size(0).aggregation(AggregationBuilders.max("test").field("field")); + assertStarTreeContext(request, sourceBuilder, new StarTreeQueryContext(expectedStarTree, null, -1), -1); + + setStarTreeIndexSetting(null); + } + + private void setStarTreeIndexSetting(String value) throws IOException { + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(CompositeIndexSettings.STAR_TREE_INDEX_ENABLED_SETTING.getKey(), value).build()) + .execute(); + } + + private void assertStarTreeContext( + ShardSearchRequest request, + SearchSourceBuilder sourceBuilder, + StarTreeQueryContext expectedContext, + int expectedCacheUsage + ) throws IOException { + request.source(sourceBuilder); + SearchService searchService = getInstanceFromNode(SearchService.class); + try (ReaderContext reader = searchService.createOrGetReaderContext(request, false)) { + SearchContext context = searchService.createContext(reader, request, null, true); + StarTreeQueryContext actualContext = context.getStarTreeQueryContext(); + + if (expectedContext == null) { + assertThat(context.getStarTreeQueryContext(), nullValue()); + } else { + assertThat(actualContext, notNullValue()); + assertEquals(expectedContext.getStarTree().getType(), actualContext.getStarTree().getType()); + assertEquals(expectedContext.getStarTree().getField(), actualContext.getStarTree().getField()); + assertEquals(expectedContext.getQueryMap(), actualContext.getQueryMap()); + if (expectedCacheUsage > -1) { + assertEquals(expectedCacheUsage, actualContext.getStarTreeValues().length); + } else { + assertNull(actualContext.getStarTreeValues()); + } + } + searchService.doStop(); + } + } +} diff --git a/server/src/test/java/org/opensearch/search/aggregations/startree/MetricAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/startree/MetricAggregatorTests.java new file mode 100644 index 0000000000000..0327bd9990784 --- /dev/null +++ b/server/src/test/java/org/opensearch/search/aggregations/startree/MetricAggregatorTests.java @@ -0,0 +1,317 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.aggregations.startree; + +import com.carrotsearch.randomizedtesting.RandomizedTest; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.codecs.Codec; +import org.apache.lucene.codecs.lucene912.Lucene912Codec; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.SortedNumericDocValuesField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SegmentReader; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.store.Directory; +import org.apache.lucene.tests.index.RandomIndexWriter; +import org.opensearch.common.lucene.Lucene; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.codec.composite.CompositeIndexReader; +import org.opensearch.index.codec.composite.composite912.Composite912Codec; +import org.opensearch.index.codec.composite912.datacube.startree.StarTreeDocValuesFormatTests; +import org.opensearch.index.compositeindex.datacube.Dimension; +import org.opensearch.index.compositeindex.datacube.NumericDimension; +import org.opensearch.index.mapper.MappedFieldType; +import org.opensearch.index.mapper.MapperService; +import org.opensearch.index.mapper.NumberFieldMapper; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.index.query.TermQueryBuilder; +import org.opensearch.search.aggregations.AggregationBuilder; +import org.opensearch.search.aggregations.AggregatorTestCase; +import org.opensearch.search.aggregations.InternalAggregation; +import org.opensearch.search.aggregations.metrics.AvgAggregationBuilder; +import org.opensearch.search.aggregations.metrics.InternalAvg; +import org.opensearch.search.aggregations.metrics.InternalMax; +import org.opensearch.search.aggregations.metrics.InternalMin; +import org.opensearch.search.aggregations.metrics.InternalSum; +import org.opensearch.search.aggregations.metrics.InternalValueCount; +import org.opensearch.search.aggregations.metrics.MaxAggregationBuilder; +import org.opensearch.search.aggregations.metrics.MinAggregationBuilder; +import org.opensearch.search.aggregations.metrics.SumAggregationBuilder; +import org.opensearch.search.aggregations.metrics.ValueCountAggregationBuilder; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; +import java.util.function.BiConsumer; +import java.util.function.Function; + +import static org.opensearch.search.aggregations.AggregationBuilders.avg; +import static org.opensearch.search.aggregations.AggregationBuilders.count; +import static org.opensearch.search.aggregations.AggregationBuilders.max; +import static org.opensearch.search.aggregations.AggregationBuilders.min; +import static org.opensearch.search.aggregations.AggregationBuilders.sum; +import static org.opensearch.test.InternalAggregationTestCase.DEFAULT_MAX_BUCKETS; + +public class MetricAggregatorTests extends AggregatorTestCase { + + private static final String FIELD_NAME = "field"; + private static final NumberFieldMapper.NumberType DEFAULT_FIELD_TYPE = NumberFieldMapper.NumberType.LONG; + private static final MappedFieldType DEFAULT_MAPPED_FIELD = new NumberFieldMapper.NumberFieldType(FIELD_NAME, DEFAULT_FIELD_TYPE); + + @Before + public void setup() { + FeatureFlags.initializeFeatureFlags(Settings.builder().put(FeatureFlags.STAR_TREE_INDEX, true).build()); + } + + @After + public void teardown() throws IOException { + FeatureFlags.initializeFeatureFlags(Settings.EMPTY); + } + + protected Codec getCodec() { + final Logger testLogger = LogManager.getLogger(MetricAggregatorTests.class); + MapperService mapperService; + try { + mapperService = StarTreeDocValuesFormatTests.createMapperService(StarTreeDocValuesFormatTests.getExpandedMapping()); + } catch (IOException e) { + throw new RuntimeException(e); + } + return new Composite912Codec(Lucene912Codec.Mode.BEST_SPEED, mapperService, testLogger); + } + + public void testStarTreeDocValues() throws IOException { + Directory directory = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(null); + conf.setCodec(getCodec()); + conf.setMergePolicy(newLogMergePolicy()); + RandomIndexWriter iw = new RandomIndexWriter(random(), directory, conf); + + Random random = RandomizedTest.getRandom(); + int totalDocs = 100; + final String SNDV = "sndv"; + final String DV = "dv"; + int val; + + List docs = new ArrayList<>(); + // Index 100 random documents + for (int i = 0; i < totalDocs; i++) { + Document doc = new Document(); + if (random.nextBoolean()) { + val = random.nextInt(10) - 5; // Random long between -5 and 4 + doc.add(new SortedNumericDocValuesField(SNDV, val)); + } + if (random.nextBoolean()) { + val = random.nextInt(20) - 10; // Random long between -10 and 9 + doc.add(new SortedNumericDocValuesField(DV, val)); + } + if (random.nextBoolean()) { + val = random.nextInt(50); // Random long between 0 and 49 + doc.add(new SortedNumericDocValuesField(FIELD_NAME, val)); + } + iw.addDocument(doc); + docs.add(doc); + } + + if (randomBoolean()) { + iw.forceMerge(1); + } + iw.close(); + + DirectoryReader ir = DirectoryReader.open(directory); + initValuesSourceRegistry(); + LeafReaderContext context = ir.leaves().get(0); + + SegmentReader reader = Lucene.segmentReader(context.reader()); + IndexSearcher indexSearcher = newSearcher(reader, false, false); + CompositeIndexReader starTreeDocValuesReader = (CompositeIndexReader) reader.getDocValuesReader(); + + List compositeIndexFields = starTreeDocValuesReader.getCompositeIndexFields(); + CompositeIndexFieldInfo starTree = compositeIndexFields.get(0); + + SumAggregationBuilder sumAggregationBuilder = sum("_name").field(FIELD_NAME); + MaxAggregationBuilder maxAggregationBuilder = max("_name").field(FIELD_NAME); + MinAggregationBuilder minAggregationBuilder = min("_name").field(FIELD_NAME); + ValueCountAggregationBuilder valueCountAggregationBuilder = count("_name").field(FIELD_NAME); + AvgAggregationBuilder avgAggregationBuilder = avg("_name").field(FIELD_NAME); + + List supportedDimensions = new LinkedList<>(); + supportedDimensions.add(new NumericDimension(SNDV)); + supportedDimensions.add(new NumericDimension(DV)); + + Query query = new MatchAllDocsQuery(); + // match-all query + QueryBuilder queryBuilder = null; // no predicates + testCase( + indexSearcher, + query, + queryBuilder, + sumAggregationBuilder, + starTree, + supportedDimensions, + verifyAggregation(InternalSum::getValue) + ); + testCase( + indexSearcher, + query, + queryBuilder, + maxAggregationBuilder, + starTree, + supportedDimensions, + verifyAggregation(InternalMax::getValue) + ); + testCase( + indexSearcher, + query, + queryBuilder, + minAggregationBuilder, + starTree, + supportedDimensions, + verifyAggregation(InternalMin::getValue) + ); + testCase( + indexSearcher, + query, + queryBuilder, + valueCountAggregationBuilder, + starTree, + supportedDimensions, + verifyAggregation(InternalValueCount::getValue) + ); + testCase( + indexSearcher, + query, + queryBuilder, + avgAggregationBuilder, + starTree, + supportedDimensions, + verifyAggregation(InternalAvg::getValue) + ); + + // Numeric-terms query + for (int cases = 0; cases < 100; cases++) { + String queryField; + long queryValue; + if (randomBoolean()) { + queryField = SNDV; + queryValue = random.nextInt(10); + } else { + queryField = DV; + queryValue = random.nextInt(20) - 15; + } + + query = SortedNumericDocValuesField.newSlowExactQuery(queryField, queryValue); + queryBuilder = new TermQueryBuilder(queryField, queryValue); + + testCase( + indexSearcher, + query, + queryBuilder, + sumAggregationBuilder, + starTree, + supportedDimensions, + verifyAggregation(InternalSum::getValue) + ); + testCase( + indexSearcher, + query, + queryBuilder, + maxAggregationBuilder, + starTree, + supportedDimensions, + verifyAggregation(InternalMax::getValue) + ); + testCase( + indexSearcher, + query, + queryBuilder, + minAggregationBuilder, + starTree, + supportedDimensions, + verifyAggregation(InternalMin::getValue) + ); + testCase( + indexSearcher, + query, + queryBuilder, + valueCountAggregationBuilder, + starTree, + supportedDimensions, + verifyAggregation(InternalValueCount::getValue) + ); + testCase( + indexSearcher, + query, + queryBuilder, + avgAggregationBuilder, + starTree, + supportedDimensions, + verifyAggregation(InternalAvg::getValue) + ); + } + + ir.close(); + directory.close(); + } + + BiConsumer verifyAggregation(Function valueExtractor) { + return (expectedAggregation, actualAggregation) -> assertEquals( + valueExtractor.apply(expectedAggregation).doubleValue(), + valueExtractor.apply(actualAggregation).doubleValue(), + 0.0f + ); + } + + private void testCase( + IndexSearcher searcher, + Query query, + QueryBuilder queryBuilder, + T aggBuilder, + CompositeIndexFieldInfo starTree, + List supportedDimensions, + BiConsumer verify + ) throws IOException { + V starTreeAggregation = searchAndReduceStarTree( + createIndexSettings(), + searcher, + query, + queryBuilder, + aggBuilder, + starTree, + supportedDimensions, + DEFAULT_MAX_BUCKETS, + false, + DEFAULT_MAPPED_FIELD + ); + V expectedAggregation = searchAndReduceStarTree( + createIndexSettings(), + searcher, + query, + queryBuilder, + aggBuilder, + null, + null, + DEFAULT_MAX_BUCKETS, + false, + DEFAULT_MAPPED_FIELD + ); + verify.accept(expectedAggregation, starTreeAggregation); + } +} diff --git a/server/src/test/java/org/opensearch/search/aggregations/startree/StarTreeFilterTests.java b/server/src/test/java/org/opensearch/search/aggregations/startree/StarTreeFilterTests.java new file mode 100644 index 0000000000000..f8eb71a40319a --- /dev/null +++ b/server/src/test/java/org/opensearch/search/aggregations/startree/StarTreeFilterTests.java @@ -0,0 +1,319 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.aggregations.startree; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.codecs.Codec; +import org.apache.lucene.codecs.lucene912.Lucene912Codec; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.SortedNumericDocValuesField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.IndexableField; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SegmentReader; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.store.Directory; +import org.apache.lucene.tests.index.RandomIndexWriter; +import org.apache.lucene.util.FixedBitSet; +import org.opensearch.common.lucene.Lucene; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.codec.composite.CompositeIndexReader; +import org.opensearch.index.codec.composite.composite912.Composite912Codec; +import org.opensearch.index.codec.composite912.datacube.startree.StarTreeDocValuesFormatTests; +import org.opensearch.index.compositeindex.datacube.MetricStat; +import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues; +import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeQueryHelper; +import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeUtils; +import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedNumericStarTreeValuesIterator; +import org.opensearch.index.mapper.MapperService; +import org.opensearch.search.aggregations.AggregatorTestCase; +import org.opensearch.search.startree.StarTreeFilter; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.opensearch.index.codec.composite912.datacube.startree.StarTreeDocValuesFormatTests.topMapping; + +public class StarTreeFilterTests extends AggregatorTestCase { + + private static final String FIELD_NAME = "field"; + private static final String SNDV = "sndv"; + private static final String SDV = "sdv"; + private static final String DV = "dv"; + + @Before + public void setup() { + FeatureFlags.initializeFeatureFlags(Settings.builder().put(FeatureFlags.STAR_TREE_INDEX, true).build()); + } + + @After + public void teardown() throws IOException { + FeatureFlags.initializeFeatureFlags(Settings.EMPTY); + } + + protected Codec getCodec(int maxLeafDoc, boolean skipStarNodeCreationForSDVDimension) { + final Logger testLogger = LogManager.getLogger(StarTreeFilterTests.class); + MapperService mapperService; + try { + mapperService = StarTreeDocValuesFormatTests.createMapperService( + getExpandedMapping(maxLeafDoc, skipStarNodeCreationForSDVDimension) + ); + } catch (IOException e) { + throw new RuntimeException(e); + } + return new Composite912Codec(Lucene912Codec.Mode.BEST_SPEED, mapperService, testLogger); + } + + public void testStarTreeFilterWithNoDocsInSVDField() throws IOException { + testStarTreeFilter(5, true); + } + + public void testStarTreeFilterWithDocsInSVDFieldButNoStarNode() throws IOException { + testStarTreeFilter(10, false); + } + + private void testStarTreeFilter(int maxLeafDoc, boolean skipStarNodeCreationForSDVDimension) throws IOException { + Directory directory = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(null); + conf.setCodec(getCodec(maxLeafDoc, skipStarNodeCreationForSDVDimension)); + conf.setMergePolicy(newLogMergePolicy()); + RandomIndexWriter iw = new RandomIndexWriter(random(), directory, conf); + int totalDocs = 100; + + List docs = new ArrayList<>(); + for (int i = 0; i < totalDocs; i++) { + Document doc = new Document(); + doc.add(new SortedNumericDocValuesField(SNDV, i)); + doc.add(new SortedNumericDocValuesField(DV, 2 * i)); + doc.add(new SortedNumericDocValuesField(FIELD_NAME, 3 * i)); + if (skipStarNodeCreationForSDVDimension) { + // adding SDV field only star node creation is skipped for SDV dimension + doc.add(new SortedNumericDocValuesField(SDV, 4 * i)); + } + iw.addDocument(doc); + docs.add(doc); + } + iw.forceMerge(1); + iw.close(); + + DirectoryReader ir = DirectoryReader.open(directory); + initValuesSourceRegistry(); + LeafReaderContext context = ir.leaves().get(0); + SegmentReader reader = Lucene.segmentReader(context.reader()); + CompositeIndexReader starTreeDocValuesReader = (CompositeIndexReader) reader.getDocValuesReader(); + + long starTreeDocCount, docCount; + + // assert that all documents are included if no filters are given + starTreeDocCount = getDocCountFromStarTree(starTreeDocValuesReader, Map.of(), context); + docCount = getDocCount(docs, Map.of()); + assertEquals(totalDocs, starTreeDocCount); + assertEquals(docCount, starTreeDocCount); + + // single filter - matches docs + starTreeDocCount = getDocCountFromStarTree(starTreeDocValuesReader, Map.of(SNDV, 0L), context); + docCount = getDocCount(docs, Map.of(SNDV, 0L)); + assertEquals(1, docCount); + assertEquals(docCount, starTreeDocCount); + + // single filter on 3rd field in ordered dimension - matches docs + starTreeDocCount = getDocCountFromStarTree(starTreeDocValuesReader, Map.of(DV, 0L), context); + docCount = getDocCount(docs, Map.of(DV, 0L)); + assertEquals(1, docCount); + assertEquals(docCount, starTreeDocCount); + + // single filter - does not match docs + starTreeDocCount = getDocCountFromStarTree(starTreeDocValuesReader, Map.of(SNDV, 101L), context); + docCount = getDocCount(docs, Map.of(SNDV, 101L)); + assertEquals(0, docCount); + assertEquals(docCount, starTreeDocCount); + + // single filter on 3rd field in ordered dimension - does not match docs + starTreeDocCount = getDocCountFromStarTree(starTreeDocValuesReader, Map.of(DV, -101L), context); + docCount = getDocCount(docs, Map.of(SNDV, -101L)); + assertEquals(0, docCount); + assertEquals(docCount, starTreeDocCount); + + // multiple filters - matches docs + starTreeDocCount = getDocCountFromStarTree(starTreeDocValuesReader, Map.of(SNDV, 0L, DV, 0L), context); + docCount = getDocCount(docs, Map.of(SNDV, 0L, DV, 0L)); + assertEquals(1, docCount); + assertEquals(docCount, starTreeDocCount); + + // no document should match the filter + starTreeDocCount = getDocCountFromStarTree(starTreeDocValuesReader, Map.of(SNDV, 0L, DV, -11L), context); + docCount = getDocCount(docs, Map.of(SNDV, 0L, DV, -11L)); + assertEquals(0, docCount); + assertEquals(docCount, starTreeDocCount); + + // Only the first filter should match some documents, second filter matches none + starTreeDocCount = getDocCountFromStarTree(starTreeDocValuesReader, Map.of(SNDV, 0L, DV, -100L), context); + docCount = getDocCount(docs, Map.of(SNDV, 0L, DV, -100L)); + assertEquals(0, docCount); + assertEquals(docCount, starTreeDocCount); + + // non-dimension fields in filter - should throw IllegalArgumentException + expectThrows( + IllegalArgumentException.class, + () -> getDocCountFromStarTree(starTreeDocValuesReader, Map.of(FIELD_NAME, 0L), context) + ); + + if (skipStarNodeCreationForSDVDimension == true) { + // Documents are not indexed + starTreeDocCount = getDocCountFromStarTree(starTreeDocValuesReader, Map.of(SDV, 4L), context); + docCount = getDocCount(docs, Map.of(SDV, 4L)); + assertEquals(1, docCount); + assertEquals(docCount, starTreeDocCount); + } else { + // Documents are indexed + starTreeDocCount = getDocCountFromStarTree(starTreeDocValuesReader, Map.of(SDV, 4L), context); + docCount = getDocCount(docs, Map.of(SDV, 4L)); + assertEquals(0, docCount); + assertEquals(docCount, starTreeDocCount); + } + + ir.close(); + directory.close(); + } + + // Counts the documents having field SNDV & applied filters + private long getDocCount(List documents, Map filters) { + long count = 0; + for (Document doc : documents) { + // Check if SNDV field is present + IndexableField sndvField = doc.getField(SNDV); + if (sndvField == null) continue; // Skip if SNDV is not present + + // Apply filters if provided + if (!filters.isEmpty()) { + boolean matches = filters.entrySet().stream().allMatch(entry -> { + IndexableField field = doc.getField(entry.getKey()); + return field != null && field.numericValue().longValue() == entry.getValue(); + }); + if (!matches) continue; + } + + // Increment count if the document passes all conditions + count++; + } + return count; + } + + // Returns count of documents in the star tree having field SNDV & applied filters + private long getDocCountFromStarTree(CompositeIndexReader starTreeDocValuesReader, Map filters, LeafReaderContext context) + throws IOException { + List compositeIndexFields = starTreeDocValuesReader.getCompositeIndexFields(); + CompositeIndexFieldInfo starTree = compositeIndexFields.get(0); + StarTreeValues starTreeValues = StarTreeQueryHelper.getStarTreeValues(context, starTree); + FixedBitSet filteredValues = StarTreeFilter.getStarTreeResult(starTreeValues, filters); + + SortedNumericStarTreeValuesIterator valuesIterator = (SortedNumericStarTreeValuesIterator) starTreeValues.getMetricValuesIterator( + StarTreeUtils.fullyQualifiedFieldNameForStarTreeMetricsDocValues( + starTree.getField(), + SNDV, + MetricStat.VALUE_COUNT.getTypeName() + ) + ); + + long docCount = 0; + int numBits = filteredValues.length(); + if (numBits > 0) { + for (int bit = filteredValues.nextSetBit(0); bit != DocIdSetIterator.NO_MORE_DOCS; bit = (bit + 1 < numBits) + ? filteredValues.nextSetBit(bit + 1) + : DocIdSetIterator.NO_MORE_DOCS) { + + // Assert that we can advance to the document ID in the values iterator + boolean canAdvance = valuesIterator.advanceExact(bit); + assert canAdvance : "Cannot advance to document ID " + bit + " in values iterator."; + + // Iterate over values for the current document ID + for (int i = 0, count = valuesIterator.entryValueCount(); i < count; i++) { + long value = valuesIterator.nextValue(); + // Assert that the value is as expected using the provided consumer + docCount += value; + } + } + } + return docCount; + } + + public static XContentBuilder getExpandedMapping(int maxLeafDocs, boolean skipStarNodeCreationForSDVDimension) throws IOException { + return topMapping(b -> { + b.startObject("composite"); + b.startObject("startree"); + b.field("type", "star_tree"); + b.startObject("config"); + b.field("max_leaf_docs", maxLeafDocs); + if (skipStarNodeCreationForSDVDimension) { + b.startArray("skip_star_node_creation_for_dimensions"); + b.value("sdv"); + b.endArray(); + } + b.startArray("ordered_dimensions"); + b.startObject(); + b.field("name", "sndv"); + b.endObject(); + b.startObject(); + b.field("name", "sdv"); + b.endObject(); + b.startObject(); + b.field("name", "dv"); + b.endObject(); + b.endArray(); + b.startArray("metrics"); + b.startObject(); + b.field("name", "field"); + b.startArray("stats"); + b.value("sum"); + b.value("value_count"); + b.value("avg"); + b.value("min"); + b.value("max"); + b.endArray(); + b.endObject(); + b.startObject(); + b.field("name", "sndv"); + b.startArray("stats"); + b.value("sum"); + b.value("value_count"); + b.value("avg"); + b.value("min"); + b.value("max"); + b.endArray(); + b.endObject(); + b.endArray(); + b.endObject(); + b.endObject(); + b.endObject(); + b.startObject("properties"); + b.startObject("sndv"); + b.field("type", "integer"); + b.endObject(); + b.startObject("sdv"); + b.field("type", "integer"); + b.endObject(); + b.startObject("dv"); + b.field("type", "integer"); + b.endObject(); + b.startObject("field"); + b.field("type", "integer"); + b.endObject(); + b.endObject(); + }); + } +} diff --git a/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java index 4abd7fbea9cff..e1728c4476699 100644 --- a/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java @@ -91,11 +91,16 @@ import org.opensearch.index.cache.bitset.BitsetFilterCache; import org.opensearch.index.cache.bitset.BitsetFilterCache.Listener; import org.opensearch.index.cache.query.DisabledQueryCache; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.compositeindex.datacube.Dimension; +import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeQueryHelper; import org.opensearch.index.fielddata.IndexFieldData; import org.opensearch.index.fielddata.IndexFieldDataCache; import org.opensearch.index.fielddata.IndexFieldDataService; import org.opensearch.index.mapper.BinaryFieldMapper; import org.opensearch.index.mapper.CompletionFieldMapper; +import org.opensearch.index.mapper.CompositeDataCubeFieldType; +import org.opensearch.index.mapper.CompositeMappedFieldType; import org.opensearch.index.mapper.ConstantKeywordFieldMapper; import org.opensearch.index.mapper.ContentPath; import org.opensearch.index.mapper.DateFieldMapper; @@ -117,6 +122,7 @@ import org.opensearch.index.mapper.RangeType; import org.opensearch.index.mapper.StarTreeMapper; import org.opensearch.index.mapper.TextFieldMapper; +import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.QueryShardContext; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.SearchOperationListener; @@ -135,12 +141,14 @@ import org.opensearch.search.aggregations.support.CoreValuesSourceType; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; import org.opensearch.search.aggregations.support.ValuesSourceType; +import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.search.fetch.FetchPhase; import org.opensearch.search.fetch.subphase.FetchDocValuesPhase; import org.opensearch.search.fetch.subphase.FetchSourcePhase; import org.opensearch.search.internal.ContextIndexSearcher; import org.opensearch.search.internal.SearchContext; import org.opensearch.search.lookup.SearchLookup; +import org.opensearch.search.startree.StarTreeQueryContext; import org.opensearch.test.InternalAggregationTestCase; import org.opensearch.test.OpenSearchTestCase; import org.junit.After; @@ -155,6 +163,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Function; @@ -331,6 +340,35 @@ protected A createAggregator(AggregationBuilder aggregati return aggregator; } + protected CountingAggregator createCountingAggregator( + Query query, + QueryBuilder queryBuilder, + AggregationBuilder aggregationBuilder, + IndexSearcher indexSearcher, + IndexSettings indexSettings, + CompositeIndexFieldInfo starTree, + List supportedDimensions, + MultiBucketConsumer bucketConsumer, + MappedFieldType... fieldTypes + ) throws IOException { + SearchContext searchContext; + if (starTree != null) { + searchContext = createSearchContextWithStarTreeContext( + indexSearcher, + indexSettings, + query, + queryBuilder, + starTree, + supportedDimensions, + bucketConsumer, + fieldTypes + ); + } else { + searchContext = createSearchContext(indexSearcher, indexSettings, query, bucketConsumer, fieldTypes); + } + return new CountingAggregator(new AtomicInteger(), createAggregator(aggregationBuilder, searchContext)); + } + /** * Create a {@linkplain SearchContext} for testing an {@link Aggregator}. */ @@ -344,6 +382,49 @@ protected SearchContext createSearchContext( return createSearchContext(indexSearcher, indexSettings, query, bucketConsumer, new NoneCircuitBreakerService(), fieldTypes); } + protected SearchContext createSearchContextWithStarTreeContext( + IndexSearcher indexSearcher, + IndexSettings indexSettings, + Query query, + QueryBuilder queryBuilder, + CompositeIndexFieldInfo starTree, + List supportedDimensions, + MultiBucketConsumer bucketConsumer, + MappedFieldType... fieldTypes + ) throws IOException { + SearchContext searchContext = createSearchContext( + indexSearcher, + indexSettings, + query, + bucketConsumer, + new NoneCircuitBreakerService(), + fieldTypes + ); + + // Mock SearchContextAggregations + SearchContextAggregations searchContextAggregations = mock(SearchContextAggregations.class); + AggregatorFactories aggregatorFactories = mock(AggregatorFactories.class); + when(searchContext.aggregations()).thenReturn(searchContextAggregations); + when(searchContextAggregations.factories()).thenReturn(aggregatorFactories); + when(aggregatorFactories.getFactories()).thenReturn(new AggregatorFactory[] {}); + + CompositeDataCubeFieldType compositeMappedFieldType = mock(CompositeDataCubeFieldType.class); + when(compositeMappedFieldType.name()).thenReturn(starTree.getField()); + when(compositeMappedFieldType.getCompositeIndexType()).thenReturn(starTree.getType()); + Set compositeFieldTypes = Set.of(compositeMappedFieldType); + + when((compositeMappedFieldType).getDimensions()).thenReturn(supportedDimensions); + MapperService mapperService = mock(MapperService.class); + when(mapperService.getCompositeFieldTypes()).thenReturn(compositeFieldTypes); + when(searchContext.mapperService()).thenReturn(mapperService); + + SearchSourceBuilder sb = new SearchSourceBuilder().query(queryBuilder); + StarTreeQueryContext starTreeQueryContext = StarTreeQueryHelper.getStarTreeQueryContext(searchContext, sb); + + when(searchContext.getStarTreeQueryContext()).thenReturn(starTreeQueryContext); + return searchContext; + } + protected SearchContext createSearchContext( IndexSearcher indexSearcher, IndexSettings indexSettings, @@ -651,6 +732,67 @@ protected A searchAndReduc return internalAgg; } + protected A searchAndReduceStarTree( + IndexSettings indexSettings, + IndexSearcher searcher, + Query query, + QueryBuilder queryBuilder, + AggregationBuilder builder, + CompositeIndexFieldInfo compositeIndexFieldInfo, + List supportedDimensions, + int maxBucket, + boolean hasNested, + MappedFieldType... fieldTypes + ) throws IOException { + query = query.rewrite(searcher); + final IndexReaderContext ctx = searcher.getTopReaderContext(); + final PipelineTree pipelines = builder.buildPipelineTree(); + List aggs = new ArrayList<>(); + if (hasNested) { + query = Queries.filtered(query, Queries.newNonNestedFilter()); + } + + MultiBucketConsumer bucketConsumer = new MultiBucketConsumer( + maxBucket, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ); + CountingAggregator countingAggregator = createCountingAggregator( + query, + queryBuilder, + builder, + searcher, + indexSettings, + compositeIndexFieldInfo, + supportedDimensions, + bucketConsumer, + fieldTypes + ); + + countingAggregator.preCollection(); + searcher.search(query, countingAggregator); + countingAggregator.postCollection(); + aggs.add(countingAggregator.buildTopLevel()); + if (compositeIndexFieldInfo != null) { + assertEquals(0, countingAggregator.collectCounter.get()); + } + + MultiBucketConsumer reduceBucketConsumer = new MultiBucketConsumer( + maxBucket, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ); + InternalAggregation.ReduceContext context = InternalAggregation.ReduceContext.forFinalReduction( + countingAggregator.context().bigArrays(), + getMockScriptService(), + reduceBucketConsumer, + pipelines + ); + + @SuppressWarnings("unchecked") + A internalAgg = (A) aggs.get(0).reduce(aggs, context); + doAssertReducedMultiBucketConsumer(internalAgg, reduceBucketConsumer); + return internalAgg; + } + protected void doAssertReducedMultiBucketConsumer(Aggregation agg, MultiBucketConsumerService.MultiBucketConsumer bucketConsumer) { InternalAggregationTestCase.assertMultiBucketConsumer(agg, bucketConsumer); }