From bbcb8bf0087aee6e454f54d65649b199ab02b231 Mon Sep 17 00:00:00 2001 From: Sandesh Kumar Date: Fri, 9 Aug 2024 03:40:50 -0700 Subject: [PATCH] Star tree P0 changes Signed-off-by: Sandesh Kumar --- .../opensearch/common/util/FeatureFlags.java | 2 +- .../index/query/QueryShardContext.java | 69 +++++ .../org/opensearch/search/SearchService.java | 57 +++- .../aggregations/AggregatorFactories.java | 6 +- .../aggregations/AggregatorFactory.java | 4 + .../aggregations/metrics/SumAggregator.java | 10 +- .../metrics/SumAggregatorFactory.java | 2 +- .../startree/InternalStarTree.java | 266 ++++++++++++++++ .../startree/StarTreeAggregationBuilder.java | 124 ++++++++ .../startree/StarTreeAggregator.java | 204 ++++++++++++ .../startree/StarTreeAggregatorFactory.java | 54 ++++ .../aggregations/startree/package-info.java | 9 + .../ValuesSourceAggregatorFactory.java | 12 + .../search/query/startree/StarTreeFilter.java | 152 +++++++++ .../search/query/startree/StarTreeQuery.java | 96 ++++++ .../query/startree/StarTreeQueryBuilder.java | 164 ++++++++++ .../search/query/startree/package-info.java | 9 + .../startree/StarTreeAggregatorFactory.java | 64 ++++ .../search/startree/StarTreeFilter.java | 293 ++++++++++++++++++ .../search/startree/StarTreeQuery.java | 119 +++++++ .../startree/StarTreeSumAggregator.java | 76 +++++ .../search/startree/package-info.java | 9 + 22 files changed, 1790 insertions(+), 11 deletions(-) create mode 100644 server/src/main/java/org/opensearch/search/aggregations/startree/InternalStarTree.java create mode 100644 server/src/main/java/org/opensearch/search/aggregations/startree/StarTreeAggregationBuilder.java create mode 100644 server/src/main/java/org/opensearch/search/aggregations/startree/StarTreeAggregator.java create mode 100644 server/src/main/java/org/opensearch/search/aggregations/startree/StarTreeAggregatorFactory.java create mode 100644 server/src/main/java/org/opensearch/search/aggregations/startree/package-info.java create mode 100644 server/src/main/java/org/opensearch/search/query/startree/StarTreeFilter.java create mode 100644 server/src/main/java/org/opensearch/search/query/startree/StarTreeQuery.java create mode 100644 server/src/main/java/org/opensearch/search/query/startree/StarTreeQueryBuilder.java create mode 100644 server/src/main/java/org/opensearch/search/query/startree/package-info.java create mode 100644 server/src/main/java/org/opensearch/search/startree/StarTreeAggregatorFactory.java create mode 100644 server/src/main/java/org/opensearch/search/startree/StarTreeFilter.java create mode 100644 server/src/main/java/org/opensearch/search/startree/StarTreeQuery.java create mode 100644 server/src/main/java/org/opensearch/search/startree/StarTreeSumAggregator.java create mode 100644 server/src/main/java/org/opensearch/search/startree/package-info.java diff --git a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java index ceb2559a0e16c..e4f5e46950b5f 100644 --- a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java +++ b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java @@ -105,7 +105,7 @@ public class FeatureFlags { * aggregations. */ public static final String STAR_TREE_INDEX = "opensearch.experimental.feature.composite_index.star_tree.enabled"; - public static final Setting STAR_TREE_INDEX_SETTING = Setting.boolSetting(STAR_TREE_INDEX, false, Property.NodeScope); + public static final Setting STAR_TREE_INDEX_SETTING = Setting.boolSetting(STAR_TREE_INDEX, true, Property.NodeScope); private static final List> ALL_FEATURE_FLAG_SETTINGS = List.of( REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING, diff --git a/server/src/main/java/org/opensearch/index/query/QueryShardContext.java b/server/src/main/java/org/opensearch/index/query/QueryShardContext.java index 91313092d8d28..28536b9ef564d 100644 --- a/server/src/main/java/org/opensearch/index/query/QueryShardContext.java +++ b/server/src/main/java/org/opensearch/index/query/QueryShardContext.java @@ -56,7 +56,12 @@ import org.opensearch.index.IndexSortConfig; import org.opensearch.index.analysis.IndexAnalyzers; import org.opensearch.index.cache.bitset.BitsetFilterCache; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.compositeindex.datacube.Metric; +import org.opensearch.index.compositeindex.datacube.MetricStat; +import org.opensearch.index.compositeindex.datacube.startree.aggregators.MetricAggregatorInfo; import org.opensearch.index.fielddata.IndexFieldData; +import org.opensearch.index.mapper.CompositeDataCubeFieldType; import org.opensearch.index.mapper.ContentPath; import org.opensearch.index.mapper.DerivedFieldResolver; import org.opensearch.index.mapper.DerivedFieldResolverFactory; @@ -73,12 +78,18 @@ import org.opensearch.script.ScriptContext; import org.opensearch.script.ScriptFactory; import org.opensearch.script.ScriptService; +import org.opensearch.search.aggregations.AggregatorFactory; +import org.opensearch.search.aggregations.metrics.MetricsAggregator; +import org.opensearch.search.aggregations.metrics.SumAggregatorFactory; import org.opensearch.search.aggregations.support.AggregationUsageService; +import org.opensearch.search.aggregations.support.ValuesSourceAggregatorFactory; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; import org.opensearch.search.lookup.SearchLookup; +import org.opensearch.search.startree.StarTreeQuery; import org.opensearch.transport.RemoteClusterAware; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -89,6 +100,7 @@ import java.util.function.LongSupplier; import java.util.function.Predicate; import java.util.function.Supplier; +import java.util.stream.Collectors; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; @@ -522,6 +534,63 @@ private ParsedQuery toQuery(QueryBuilder queryBuilder, CheckedFunction>> predicateMap = getFilterMap(queryBuilder); + StarTreeQuery starTreeQuery = new StarTreeQuery(starTree, predicateMap, null); + return new ParsedQuery(starTreeQuery); + } + + private Map>> getFilterMap(QueryBuilder queryBuilder) { + // Assuming the following variables have been initialized: + Map>> predicateMap = new HashMap<>(); + + + // Check if the query builder is an instance of TermQueryBuilder + if (queryBuilder instanceof TermQueryBuilder) { + TermQueryBuilder tq = (TermQueryBuilder) queryBuilder; + String field = tq.fieldName(); + long inputQueryVal = Long.parseLong(tq.value().toString()); + + // Get or create the list of predicates for the given field + List> predicates = predicateMap.getOrDefault(field, new ArrayList<>()); + + // Create a predicate to match the input query value + Predicate predicate = dimVal -> dimVal == inputQueryVal; + predicates.add(predicate); + + // Put the predicates list back into the map + predicateMap.put(field, predicates); + } else { + throw new IllegalArgumentException("The query is not a term query"); + } + return predicateMap; + + } + + public String getMetricKey(CompositeDataCubeFieldType compositeIndexFieldInfo, AggregatorFactory aggregatorFactory) { + String field = null; + Map> supportedMetrics = compositeIndexFieldInfo.getMetrics().stream() + .collect(Collectors.toMap(Metric::getField, Metric::getMetrics)); + + // Existing support only for MetricAggregators without sub-aggregations + if (aggregatorFactory.getSubFactories().getFactories().length != 0) { + return null; + } + + if (aggregatorFactory instanceof SumAggregatorFactory) { + field = ((SumAggregatorFactory)aggregatorFactory).getField(); + if (!(supportedMetrics.containsKey(field) && supportedMetrics.get(field).contains(MetricStat.SUM))) { + return null; + } + } + + return field != null + ? compositeIndexFieldInfo.name() + "_" + field + "_" + "sum_metric" + : null; + } + + + public Index index() { return indexSettings.getIndex(); } diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index a53a7198c366f..1ae6bd36d9e71 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -35,6 +35,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.search.FieldDoc; +import org.apache.lucene.search.IndexOrDocValuesQuery; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.TopDocs; import org.opensearch.OpenSearchException; @@ -77,16 +78,21 @@ import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; import org.opensearch.index.engine.Engine; +import org.opensearch.index.mapper.CompositeDataCubeFieldType; import org.opensearch.index.mapper.DerivedFieldResolver; import org.opensearch.index.mapper.DerivedFieldResolverFactory; +import org.opensearch.index.mapper.StarTreeMapper; import org.opensearch.index.query.InnerHitContextBuilder; import org.opensearch.index.query.MatchAllQueryBuilder; import org.opensearch.index.query.MatchNoneQueryBuilder; +import org.opensearch.index.query.ParsedQuery; import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.QueryRewriteContext; import org.opensearch.index.query.QueryShardContext; import org.opensearch.index.query.Rewriteable; +import org.opensearch.index.query.TermsQueryBuilder; import org.opensearch.index.shard.IndexEventListener; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.SearchOperationListener; @@ -97,11 +103,14 @@ import org.opensearch.script.ScriptService; import org.opensearch.search.aggregations.AggregationInitializationException; import org.opensearch.search.aggregations.AggregatorFactories; +import org.opensearch.search.aggregations.AggregatorFactory; import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.InternalAggregation.ReduceContext; import org.opensearch.search.aggregations.MultiBucketConsumerService; import org.opensearch.search.aggregations.SearchContextAggregations; import org.opensearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree; +import org.opensearch.search.aggregations.support.ValuesSourceAggregatorFactory; +import org.opensearch.search.aggregations.support.ValuesSourceConfig; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.search.collapse.CollapseContext; import org.opensearch.search.dfs.DfsPhase; @@ -136,6 +145,7 @@ import org.opensearch.search.sort.SortAndFormats; import org.opensearch.search.sort.SortBuilder; import org.opensearch.search.sort.SortOrder; +import org.opensearch.search.startree.StarTreeAggregatorFactory; import org.opensearch.search.suggest.Suggest; import org.opensearch.search.suggest.completion.CompletionSuggestion; import org.opensearch.tasks.TaskResourceTrackingService; @@ -1314,6 +1324,11 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc context.evaluateRequestShouldUseConcurrentSearch(); return; } + // Can be marked false for majority cases for which star-tree cannot be used + // Will save checking the criteria later and we can have a limit on what search requests are supported + // As we increment the cases where star-tree can be used, this can be set back to true + boolean canUseStarTree = context.mapperService().isCompositeIndexPresent(); + SearchShardTarget shardTarget = context.shardTarget(); QueryShardContext queryShardContext = context.getQueryShardContext(); context.from(source.from()); @@ -1339,9 +1354,7 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc if (source.sorts() != null) { try { Optional optionalSort = SortBuilder.buildSort(source.sorts(), context.getQueryShardContext()); - if (optionalSort.isPresent()) { - context.sort(optionalSort.get()); - } + optionalSort.ifPresent(context::sort); } catch (IOException e) { throw new SearchException(shardTarget, "failed to create sort elements", e); } @@ -1496,6 +1509,44 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc if (source.profile()) { context.setProfilers(new Profilers(context.searcher(), context.shouldUseConcurrentSearch())); } + + if (canUseStarTree) { + try { + setStarTreeQuery(context, queryShardContext, source); + logger.info("using star tree"); + } + catch (IOException e) { + logger.info("not using star tree"); + } + } + } + + private boolean setStarTreeQuery(SearchContext context, QueryShardContext queryShardContext, SearchSourceBuilder source) throws IOException { + + if (source.aggregations() == null) { + return false; + } + + // TODO: Support for multiple startrees + CompositeDataCubeFieldType compositeMappedFieldType = (StarTreeMapper.StarTreeFieldType) context.mapperService().getCompositeFieldTypes().iterator().next(); + CompositeIndexFieldInfo starTree = new CompositeIndexFieldInfo(compositeMappedFieldType.name(), compositeMappedFieldType.getCompositeIndexType()); + + ParsedQuery parsedQuery = queryShardContext.toStarTreeQuery(starTree, source.query(), context.query()); + AggregatorFactory aggregatorFactory = context.aggregations().factories().getFactories()[0]; + if (!(aggregatorFactory instanceof ValuesSourceAggregatorFactory)) { + return false; + } + ValuesSourceConfig config = ((ValuesSourceAggregatorFactory) aggregatorFactory).getConfig(); + String metricKey = queryShardContext.getMetricKey(compositeMappedFieldType, aggregatorFactory); + StarTreeAggregatorFactory factory = new StarTreeAggregatorFactory(aggregatorFactory.name(), queryShardContext, config, List.of(metricKey)); + + + AggregatorFactories aggregatorFactories = new AggregatorFactories(new StarTreeAggregatorFactory[]{factory}); + + context.parsedQuery(parsedQuery) + .aggregations(new SearchContextAggregations(aggregatorFactories, multiBucketConsumerService.create())); + + return false; } /** diff --git a/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactories.java b/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactories.java index eeb0c606694b0..dfcb245ef3656 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactories.java +++ b/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactories.java @@ -255,7 +255,7 @@ public static Builder builder() { return new Builder(); } - private AggregatorFactories(AggregatorFactory[] factories) { + public AggregatorFactories(AggregatorFactory[] factories) { this.factories = factories; } @@ -661,4 +661,8 @@ public PipelineTree buildPipelineTree() { return new PipelineTree(subTrees, aggregators); } } + + public AggregatorFactory[] getFactories() { + return factories; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactory.java index 6cc3a78fb1e36..86fbb46a9ad3c 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactory.java @@ -127,4 +127,8 @@ protected boolean supportsConcurrentSegmentSearch() { public boolean evaluateChildFactories() { return factories.allFactoriesSupportConcurrentSearch(); } + + public AggregatorFactories getSubFactories() { + return factories; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java index 4b8e882cd69bc..8acaae366380b 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java @@ -56,13 +56,13 @@ */ public class SumAggregator extends NumericMetricsAggregator.SingleValue { - private final ValuesSource.Numeric valuesSource; - private final DocValueFormat format; + protected final ValuesSource.Numeric valuesSource; + protected final DocValueFormat format; - private DoubleArray sums; - private DoubleArray compensations; + protected DoubleArray sums; + protected DoubleArray compensations; - SumAggregator( + public SumAggregator( String name, ValuesSourceConfig valuesSourceConfig, SearchContext context, diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregatorFactory.java index ef9b93920ba18..e0cd44f2672a8 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregatorFactory.java @@ -52,7 +52,7 @@ * * @opensearch.internal */ -class SumAggregatorFactory extends ValuesSourceAggregatorFactory { +public class SumAggregatorFactory extends ValuesSourceAggregatorFactory { SumAggregatorFactory( String name, diff --git a/server/src/main/java/org/opensearch/search/aggregations/startree/InternalStarTree.java b/server/src/main/java/org/opensearch/search/aggregations/startree/InternalStarTree.java new file mode 100644 index 0000000000000..18613932db6ec --- /dev/null +++ b/server/src/main/java/org/opensearch/search/aggregations/startree/InternalStarTree.java @@ -0,0 +1,266 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.aggregations.startree; + +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.search.aggregations.Aggregation; +import org.opensearch.search.aggregations.InternalAggregation; +import org.opensearch.search.aggregations.InternalAggregations; +import org.opensearch.search.aggregations.InternalMultiBucketAggregation; +import org.opensearch.search.aggregations.support.CoreValuesSourceType; +import org.opensearch.search.aggregations.support.ValueType; +import org.opensearch.search.aggregations.support.ValuesSourceType; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +public class InternalStarTree> extends InternalMultiBucketAggregation< + R, + B> { + static final InternalStarTree.Factory FACTORY = new InternalStarTree.Factory(); + + public static class Bucket extends InternalMultiBucketAggregation.InternalBucket { + public long sum; + public InternalAggregations aggregations; + private final String key; + + public Bucket(String key, long sum, InternalAggregations aggregations) { + this.key = key; + this.sum = sum; + this.aggregations = aggregations; + } + + @Override + public String getKey() { + return getKeyAsString(); + } + + @Override + public String getKeyAsString() { + return key; + } + + @Override + public long getDocCount() { + return sum; + } + + @Override + public InternalAggregations getAggregations() { + return aggregations; + } + + protected InternalStarTree.Factory getFactory() { + return FACTORY; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(Aggregation.CommonFields.KEY.getPreferredName(), key); + // TODO : this is hack ( we are mapping bucket.noofdocs to sum ) + builder.field("SUM", sum); + aggregations.toXContentInternal(builder, params); + builder.endObject(); + return builder; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(key); + out.writeVLong(sum); + aggregations.writeTo(out); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (other == null || getClass() != other.getClass()) { + return false; + } + InternalStarTree.Bucket that = (InternalStarTree.Bucket) other; + return Objects.equals(sum, that.sum) && Objects.equals(aggregations, that.aggregations) && Objects.equals(key, that.key); + } + + @Override + public int hashCode() { + return Objects.hash(getClass(), sum, aggregations, key); + } + } + + public static class Factory> { + public ValuesSourceType getValueSourceType() { + return CoreValuesSourceType.NUMERIC; + } + + public ValueType getValueType() { + return ValueType.NUMERIC; + } + + @SuppressWarnings("unchecked") + public R create(String name, List ranges, Map metadata) { + return (R) new InternalStarTree(name, ranges, metadata); + } + + @SuppressWarnings("unchecked") + public B createBucket(String key, long docCount, InternalAggregations aggregations) { + return (B) new InternalStarTree.Bucket(key, docCount, aggregations); + } + + @SuppressWarnings("unchecked") + public R create(List ranges, R prototype) { + return (R) new InternalStarTree(prototype.name, ranges, prototype.metadata); + } + + @SuppressWarnings("unchecked") + public B createBucket(InternalAggregations aggregations, B prototype) { + // TODO : prototype.getDocCount() -- is mapped to sum - change this + return (B) new InternalStarTree.Bucket(prototype.getKey(), prototype.getDocCount(), aggregations); + } + } + + public InternalStarTree.Factory getFactory() { + return FACTORY; + } + + private final List ranges; + + public InternalStarTree(String name, List ranges, Map metadata) { + super(name, metadata); + this.ranges = ranges; + } + + /** + * Read from a stream. + */ + public InternalStarTree(StreamInput in) throws IOException { + super(in); + int size = in.readVInt(); + List ranges = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + String key = in.readString(); + ranges.add(getFactory().createBucket(key, in.readVLong(), InternalAggregations.readFrom(in))); + } + this.ranges = ranges; + } + + @Override + protected void doWriteTo(StreamOutput out) throws IOException { + out.writeVInt(ranges.size()); + for (B bucket : ranges) { + bucket.writeTo(out); + } + } + + @Override + public String getWriteableName() { + return "startree"; + } + + @Override + public List getBuckets() { + return ranges; + } + + public R create(List buckets) { + return getFactory().create(buckets, (R) this); + } + + @Override + public B createBucket(InternalAggregations aggregations, B prototype) { + return getFactory().createBucket(aggregations, prototype); + } + + @Override + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { + Map> bucketsMap = new HashMap<>(); + + for (InternalAggregation aggregation : aggregations) { + InternalStarTree filters = (InternalStarTree) aggregation; + int i = 0; + for (B bucket : filters.ranges) { + String key = bucket.getKey(); + List sameRangeList = bucketsMap.get(key); + if (sameRangeList == null) { + sameRangeList = new ArrayList<>(aggregations.size()); + bucketsMap.put(key, sameRangeList); + } + sameRangeList.add(bucket); + } + } + + ArrayList reducedBuckets = new ArrayList<>(bucketsMap.size()); + + for (List sameRangeList : bucketsMap.values()) { + B reducedBucket = reduceBucket(sameRangeList, reduceContext); + if (reducedBucket.getDocCount() >= 1) { + reducedBuckets.add(reducedBucket); + } + } + reduceContext.consumeBucketsAndMaybeBreak(reducedBuckets.size()); + Collections.sort(reducedBuckets, Comparator.comparing(InternalStarTree.Bucket::getKey)); + + return getFactory().create(name, reducedBuckets, getMetadata()); + } + + @Override + protected B reduceBucket(List buckets, ReduceContext context) { + assert buckets.size() > 0; + + B reduced = null; + List aggregationsList = new ArrayList<>(buckets.size()); + for (B bucket : buckets) { + if (reduced == null) { + reduced = (B) new Bucket(bucket.getKey(), bucket.getDocCount(), bucket.getAggregations()); + } else { + reduced.sum += bucket.sum; + } + aggregationsList.add(bucket.getAggregations()); + } + reduced.aggregations = InternalAggregations.reduce(aggregationsList, context); + return reduced; + } + + @Override + public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { + builder.startArray(CommonFields.BUCKETS.getPreferredName()); + + for (B range : ranges) { + range.toXContent(builder, params); + } + builder.endArray(); + return builder; + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), ranges); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + if (super.equals(obj) == false) return false; + + InternalStarTree that = (InternalStarTree) obj; + return Objects.equals(ranges, that.ranges); + } + +} diff --git a/server/src/main/java/org/opensearch/search/aggregations/startree/StarTreeAggregationBuilder.java b/server/src/main/java/org/opensearch/search/aggregations/startree/StarTreeAggregationBuilder.java new file mode 100644 index 0000000000000..b489cd793f641 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/aggregations/startree/StarTreeAggregationBuilder.java @@ -0,0 +1,124 @@ +///* +// * SPDX-License-Identifier: Apache-2.0 +// * +// * The OpenSearch Contributors require contributions made to +// * this file be licensed under the Apache-2.0 license or a +// * compatible open source license. +// */ +// +//package org.opensearch.search.aggregations.startree; +// +//import org.opensearch.core.ParseField; +//import org.opensearch.core.common.io.stream.StreamInput; +//import org.opensearch.core.common.io.stream.StreamOutput; +//import org.opensearch.core.xcontent.ObjectParser; +//import org.opensearch.core.xcontent.XContentBuilder; +//import org.opensearch.index.query.QueryShardContext; +//import org.opensearch.search.aggregations.AbstractAggregationBuilder; +//import org.opensearch.search.aggregations.AggregationBuilder; +//import org.opensearch.search.aggregations.AggregatorFactories; +//import org.opensearch.search.aggregations.AggregatorFactory; +//import org.opensearch.search.aggregations.metrics.MetricAggregatorSupplier; +//import org.opensearch.search.aggregations.support.ValuesSourceRegistry; +// +//import java.io.IOException; +//import java.util.ArrayList; +//import java.util.Arrays; +//import java.util.List; +//import java.util.Map; +// +//public class StarTreeAggregationBuilder extends AbstractAggregationBuilder { +// public static final String NAME = "startree"; +// +// private List fieldCols; +// private List metrics; +// public static final ObjectParser PARSER = ObjectParser.fromBuilder( +// NAME, +// StarTreeAggregationBuilder::new +// ); +// public static final ValuesSourceRegistry.RegistryKey REGISTRY_KEY = new ValuesSourceRegistry.RegistryKey<>( +// NAME, +// MetricAggregatorSupplier.class +// ); +// +// static { +// PARSER.declareStringArray(StarTreeAggregationBuilder::groupby, new ParseField("groupby")); +// PARSER.declareStringArray(StarTreeAggregationBuilder::metrics, new ParseField("metrics")); +// } +// +// private void groupby(List strings) { +// fieldCols = new ArrayList<>(); +// fieldCols.addAll(strings); +// } +// +// private void metrics(List strings) { +// metrics = new ArrayList<>(); +// metrics.addAll(strings); +// } +// +// public StarTreeAggregationBuilder(String name) { +// super(name); +// } +// +// protected StarTreeAggregationBuilder( +// StarTreeAggregationBuilder clone, +// AggregatorFactories.Builder factoriesBuilder, +// Map metadata +// ) { +// super(clone, factoriesBuilder, metadata); +// } +// +// @Override +// protected AggregationBuilder shallowCopy(AggregatorFactories.Builder factoriesBuilder, Map metadata) { +// return new StarTreeAggregationBuilder(this, factoriesBuilder, metadata); +// } +// +// /** +// * Read from a stream. +// */ +// public StarTreeAggregationBuilder(StreamInput in) throws IOException { +// super(in); +// String[] fieldArr = in.readOptionalStringArray(); +// String[] metrics = in.readOptionalStringArray(); +// if (fieldArr != null) { +// fieldCols = Arrays.asList(fieldArr); +// } +// if(metrics != null) { +// this.metrics = Arrays.asList(metrics); +// } +// } +// +// @Override +// protected void doWriteTo(StreamOutput out) throws IOException { +// // Nothing to write +// out.writeOptionalStringArray(fieldCols.toArray(new String[0])); +// out.writeOptionalStringArray(metrics.toArray(new String[0])); +// } +// +// @Override +// public BucketCardinality bucketCardinality() { +// return BucketCardinality.MANY; +// } +// +// @Override +// protected AggregatorFactory doBuild( +// QueryShardContext queryShardContext, +// AggregatorFactory parent, +// AggregatorFactories.Builder subFactoriesBuilder +// ) throws IOException { +// return new StarTreeAggregatorFactory(name, queryShardContext, parent, subFactoriesBuilder, metadata, fieldCols, metrics); +// } +// +// @Override +// protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException { +// builder.startObject(); +// builder.endObject(); +// return builder; +// } +// +// @Override +// public String getType() { +// return NAME; +// } +// +//} diff --git a/server/src/main/java/org/opensearch/search/aggregations/startree/StarTreeAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/startree/StarTreeAggregator.java new file mode 100644 index 0000000000000..90af9e6be5696 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/aggregations/startree/StarTreeAggregator.java @@ -0,0 +1,204 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.aggregations.startree; + +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.search.CollectionTerminatedException; +import org.opensearch.core.ParseField; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ConstructingObjectParser; +import org.opensearch.core.xcontent.ObjectParser; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.search.aggregations.Aggregator; +import org.opensearch.search.aggregations.AggregatorFactories; +import org.opensearch.search.aggregations.CardinalityUpperBound; +import org.opensearch.search.aggregations.InternalAggregation; +import org.opensearch.search.aggregations.LeafBucketCollector; +import org.opensearch.search.aggregations.LeafBucketCollectorBase; +import org.opensearch.search.aggregations.bucket.BucketsAggregator; +import org.opensearch.search.aggregations.bucket.SingleBucketAggregator; +import org.opensearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.StringJoiner; + +import static org.opensearch.core.xcontent.ConstructingObjectParser.optionalConstructorArg; + +public class StarTreeAggregator extends BucketsAggregator implements SingleBucketAggregator { + + private Map sumMap = new HashMap<>(); + private Map indexMap = new HashMap<>(); + + final StarTree[] _starTrees; + + private List fieldCols; + + private List metrics; + + final InternalStarTree.Factory starTreeFactory; + + private static final Logger logger = LogManager.getLogger(StarTreeAggregator.class); + + public StarTreeAggregator( + String name, + AggregatorFactories factories, + InternalStarTree.Factory starTreeFactory, + StarTree[] starTrees, + SearchContext context, + Aggregator parent, + Map metadata, + List fieldCols, + List metrics + ) throws IOException { + super(name, factories, context, parent, CardinalityUpperBound.MANY, metadata); + this._starTrees = starTrees; + this.starTreeFactory = starTreeFactory; + this.fieldCols = fieldCols; + this.metrics = metrics; + } + + public static class StarTree implements Writeable, ToXContentObject { + public static final ParseField KEY_FIELD = new ParseField("key"); + + protected final String key; + + public StarTree(String key) { + this.key = key; + } + + /** + * Read from a stream. + */ + public StarTree(StreamInput in) throws IOException { + key = in.readOptionalString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalString(key); + } + + public String getKey() { + return this.key; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + if (key != null) { + builder.field(KEY_FIELD.getPreferredName(), key); + } + builder.endObject(); + return builder; + } + + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("startree", arg -> { + String key = (String) arg[0]; + return new StarTree(key); + }); + + static { + PARSER.declareField(optionalConstructorArg(), (p, c) -> p.text(), KEY_FIELD, ObjectParser.ValueType.DOUBLE); + } + + @Override + public int hashCode() { + return Objects.hash(key); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + StarTree other = (StarTree) obj; + return Objects.equals(key, other.key); + } + } + + @Override + public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + + return buildAggregationsForFixedBucketCount( + owningBucketOrds, + indexMap.size(), + (offsetInOwningOrd, docCount, subAggregationResults) -> { + // TODO : make this better + String key = ""; + for (Map.Entry entry : indexMap.entrySet()) { + if (offsetInOwningOrd == entry.getValue()) { + key = entry.getKey(); + break; + } + } + + // return starTreeFactory.createBucket(key, docCount, subAggregationResults); + return new InternalStarTree.Bucket(key, sumMap.get(key), subAggregationResults); + }, + buckets -> create(name, buckets, metadata()) + ); + } + + public InternalStarTree create(String name, List ranges, Map metadata) { + return new InternalStarTree(name, ranges, metadata); + } + + @Override + public InternalAggregation buildEmptyAggregation() { + return new InternalStarTree(name, new ArrayList(), new HashMap<>()); + } + + @Override + protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { + return new LeafBucketCollectorBase(sub, null) { + + @Override + public void collect(int doc, long owningBucketOrd) throws IOException { + throw new CollectionTerminatedException(); +// for (StarTree starTree : _starTrees) { +// String key = starTree.getKey(); +// long sum = sumMap.getOrDefault(key, 0L); +// sumMap.put(key, sum + 1); +// indexMap.putIfAbsent(key, indexMap.size()); +// } + } + }; + + } + + private String getKey(List dimensionsKeyList, int doc) throws IOException { + StringJoiner sj = new StringJoiner("-"); + for (SortedNumericDocValues dim : dimensionsKeyList) { + dim.advanceExact(doc); + long val = dim.nextValue(); + sj.add("" + val); + } + return sj.toString(); + } + + private long subBucketOrdinal(long owningBucketOrdinal, int keyOrd) { + return owningBucketOrdinal * indexMap.size() + keyOrd; + } +} diff --git a/server/src/main/java/org/opensearch/search/aggregations/startree/StarTreeAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/startree/StarTreeAggregatorFactory.java new file mode 100644 index 0000000000000..7be38a226e8c1 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/aggregations/startree/StarTreeAggregatorFactory.java @@ -0,0 +1,54 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.aggregations.startree; + +import org.opensearch.index.query.QueryShardContext; +import org.opensearch.search.aggregations.Aggregator; +import org.opensearch.search.aggregations.AggregatorFactories; +import org.opensearch.search.aggregations.AggregatorFactory; +import org.opensearch.search.aggregations.CardinalityUpperBound; +import org.opensearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public class StarTreeAggregatorFactory extends AggregatorFactory { + private List fieldCols; + private List metrics; + + public StarTreeAggregatorFactory( + String name, + QueryShardContext queryShardContext, + AggregatorFactory parent, + AggregatorFactories.Builder subFactoriesBuilder, + Map metadata, + List fieldCols, + List metrics + ) throws IOException { + super(name, queryShardContext, parent, subFactoriesBuilder, metadata); + this.fieldCols = fieldCols; + this.metrics = metrics; + } + + @Override + public Aggregator createInternal( + SearchContext searchContext, + Aggregator parent, + CardinalityUpperBound cardinality, + Map metadata + ) throws IOException { + return new StarTreeAggregator(name, factories, null, null, searchContext, parent, metadata, fieldCols, metrics); + } + + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } +} diff --git a/server/src/main/java/org/opensearch/search/aggregations/startree/package-info.java b/server/src/main/java/org/opensearch/search/aggregations/startree/package-info.java new file mode 100644 index 0000000000000..ef76726106a25 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/aggregations/startree/package-info.java @@ -0,0 +1,9 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.aggregations.startree; diff --git a/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSourceAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSourceAggregatorFactory.java index 69a4a5d8b6703..b19e466b081f9 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSourceAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSourceAggregatorFactory.java @@ -102,4 +102,16 @@ protected abstract Aggregator doCreateInternal( public String getStatsSubtype() { return config.valueSourceType().typeName(); } + + public String getField() { + return config.fieldContext().field(); + } + + public String getAggregationName() { + return name; + } + + public ValuesSourceConfig getConfig() { + return config; + } } diff --git a/server/src/main/java/org/opensearch/search/query/startree/StarTreeFilter.java b/server/src/main/java/org/opensearch/search/query/startree/StarTreeFilter.java new file mode 100644 index 0000000000000..43a001d77efc8 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/query/startree/StarTreeFilter.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opensearch.search.query.startree; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.util.DocIdSetBuilder; +import org.opensearch.index.compositeindex.datacube.startree.StarTreeField; +import org.opensearch.search.aggregations.startree.StarTreeAggregator; +//import org.opensearch.search.aggregations.startree.StarTreeAggregator; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.function.Predicate; + +import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; + +/** Filter operator for star tree data structure. */ +public class StarTreeFilter { + private static final Logger logger = LogManager.getLogger(StarTreeFilter.class); + + + /** Helper class to wrap the result from traversing the star tree. */ + static class StarTreeResult { + final DocIdSetBuilder _matchedDocIds; + final Set _remainingPredicateColumns; + final int numOfMatchedDocs; + final int maxMatchedDoc; + + StarTreeResult(DocIdSetBuilder matchedDocIds, Set remainingPredicateColumns, int numOfMatchedDocs, + int maxMatchedDoc) { + _matchedDocIds = matchedDocIds; + _remainingPredicateColumns = remainingPredicateColumns; + this.numOfMatchedDocs = numOfMatchedDocs; + this.maxMatchedDoc = maxMatchedDoc; + } + } + +// private final StarTreeField _starTree; + + Map>> _predicateEvaluators; + private final Set _groupByColumns; + + DocIdSetBuilder docsWithField; + + DocIdSetBuilder.BulkAdder adder; + Map dimValueMap; + public StarTreeFilter( + Map>> predicateEvaluators, + Set groupByColumns + ) throws IOException { + // This filter operator does not support AND/OR/NOT operations. +// _starTree = starTreeAggrStructure._starTree; +// dimValueMap = starTreeAggrStructure.dimensionValues; + _predicateEvaluators = predicateEvaluators != null ? predicateEvaluators : Collections.emptyMap(); + _groupByColumns = groupByColumns != null ? groupByColumns : Collections.emptySet(); + + // TODO : this should be the maximum number of doc values + docsWithField = new DocIdSetBuilder(Integer.MAX_VALUE); + } + + /** + *
    + *
  • First go over the star tree and try to match as many dimensions as possible + *
  • For the remaining columns, use doc values indexes to match them + *
+ */ + public DocIdSetIterator getStarTreeResult() throws IOException { + StarTreeResult starTreeResult = traverseStarTree(); + //logger.info("Matched docs in star tree : {}" , starTreeResult.numOfMatchedDocs); + List andIterators = new ArrayList<>(); + andIterators.add(starTreeResult._matchedDocIds.build().iterator()); + DocIdSetIterator docIdSetIterator = andIterators.get(0); + // No matches, return + if(starTreeResult.maxMatchedDoc == -1) { + return docIdSetIterator; + } + int docCount = 0; + for (String remainingPredicateColumn : starTreeResult._remainingPredicateColumns) { + // TODO : set to max value of doc values + logger.info("remainingPredicateColumn : {}, maxMatchedDoc : {} ", remainingPredicateColumn, starTreeResult.maxMatchedDoc); + DocIdSetBuilder builder = new DocIdSetBuilder(starTreeResult.maxMatchedDoc + 1); + List> compositePredicateEvaluators = _predicateEvaluators.get(remainingPredicateColumn); + SortedNumericDocValues ndv = this.dimValueMap.get(remainingPredicateColumn); + List docIds = new ArrayList<>(); + while (docIdSetIterator.nextDoc() != NO_MORE_DOCS) { + docCount++; + int docID = docIdSetIterator.docID(); + if(ndv.advanceExact(docID)) { + final int valuesCount = ndv.docValueCount(); + long value = ndv.nextValue(); + for (Predicate compositePredicateEvaluator : compositePredicateEvaluators) { + // TODO : this might be expensive as its done against all doc values docs + if (compositePredicateEvaluator.test(value)) { + docIds.add(docID); + for (int i = 0; i < valuesCount - 1; i++) { + while(docIdSetIterator.nextDoc() != NO_MORE_DOCS) { + docIds.add(docIdSetIterator.docID()); + } + } + break; + } + } + } + } + DocIdSetBuilder.BulkAdder adder = builder.grow(docIds.size()); + for(int docID : docIds) { + adder.add(docID); + } + docIdSetIterator = builder.build().iterator(); + } + return docIdSetIterator; + } + + /** + * Helper method to traverse the star tree, get matching documents and keep track of all the + * predicate dimensions that are not matched. + */ + private StarTreeResult traverseStarTree() throws IOException { + return new StarTreeResult( + docsWithField, + Collections.emptySet(), + 0, + 0 + ); + } +} diff --git a/server/src/main/java/org/opensearch/search/query/startree/StarTreeQuery.java b/server/src/main/java/org/opensearch/search/query/startree/StarTreeQuery.java new file mode 100644 index 0000000000000..ba4e5dbc95cfe --- /dev/null +++ b/server/src/main/java/org/opensearch/search/query/startree/StarTreeQuery.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opensearch.search.query.startree; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.ConstantScoreScorer; +import org.apache.lucene.search.ConstantScoreWeight; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.QueryVisitor; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Scorer; +import org.apache.lucene.search.Weight; +import org.apache.lucene.util.Accountable; +//import org.opensearch.index.codec.startree.codec.StarTreeAggregatedValues; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Predicate; + +/** Query class for querying star tree data structure */ +public class StarTreeQuery extends Query implements Accountable { + + Map>> compositePredicateMap; + Set groupByColumns; + + public StarTreeQuery(Map>> compositePredicateMap, Set groupByColumns) { + this.compositePredicateMap = compositePredicateMap; + this.groupByColumns = groupByColumns; + } + + @Override + public String toString(String field) { + return null; + } + + @Override + public void visit(QueryVisitor visitor) { + visitor.visitLeaf(this); + } + + @Override + public boolean equals(Object obj) { + return sameClassAs(obj); + } + + @Override + public int hashCode() { + return classHash(); + } + + @Override + public long ramBytesUsed() { + return 0; + } + + @Override + public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float boost) throws IOException { + return new ConstantScoreWeight(this, boost) { + @Override + public Scorer scorer(LeafReaderContext context) throws IOException { +// Object obj = context.reader().getAggregatedDocValues(); + DocIdSetIterator result = null; +// if (obj != null) { +//// StarTreeAggregatedValues val = (StarTreeAggregatedValues) obj; +// StarTreeFilter filter = new StarTreeFilter(compositePredicateMap, groupByColumns); +// result = filter.getStarTreeResult(); +// } + return new ConstantScoreScorer(this, score(), scoreMode, DocIdSetIterator.empty()); + } + + @Override + public boolean isCacheable(LeafReaderContext ctx) { + return false; + } + }; + } +} diff --git a/server/src/main/java/org/opensearch/search/query/startree/StarTreeQueryBuilder.java b/server/src/main/java/org/opensearch/search/query/startree/StarTreeQueryBuilder.java new file mode 100644 index 0000000000000..d96546ca4d170 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/query/startree/StarTreeQueryBuilder.java @@ -0,0 +1,164 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.query.startree; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.search.Query; +import org.opensearch.core.ParseField; +import org.opensearch.core.common.ParsingException; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.xcontent.ObjectParser; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.index.query.AbstractQueryBuilder; +import org.opensearch.index.query.BoolQueryBuilder; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.index.query.QueryShardContext; +import org.opensearch.index.query.TermQueryBuilder; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Predicate; + +public class StarTreeQueryBuilder extends AbstractQueryBuilder { + public static final String NAME = "startree"; + private static final ParseField FILTER = new ParseField("filter"); + private final List filterClauses = new ArrayList<>(); + + private final Set groupBy = new HashSet<>(); + Map>> predicateMap = new HashMap<>(); + private static final Logger logger = LogManager.getLogger(StarTreeQueryBuilder.class); + + public StarTreeQueryBuilder() {} + + /** + * Read from a stream. + */ + public StarTreeQueryBuilder(StreamInput in) throws IOException { + super(in); + filterClauses.addAll(readQueries(in)); + in.readOptionalStringArray(); + } + + static List readQueries(StreamInput in) throws IOException { + int size = in.readVInt(); + List queries = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + queries.add(in.readNamedWriteable(QueryBuilder.class)); + } + return queries; + } + + @Override + protected void doWriteTo(StreamOutput out) { + // only superclass has state + } + + @Override + protected void doXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(NAME); + doXArrayContent(FILTER, filterClauses, builder, params); + builder.endObject(); + } + + private static void doXArrayContent(ParseField field, List clauses, XContentBuilder builder, Params params) + throws IOException { + if (clauses.isEmpty()) { + return; + } + builder.startArray(field.getPreferredName()); + for (QueryBuilder clause : clauses) { + clause.toXContent(builder, params); + } + builder.endArray(); + } + + private static final ObjectParser PARSER = new ObjectParser<>(NAME, StarTreeQueryBuilder::new); + + static { + PARSER.declareObjectArrayOrNull( + (builder, clauses) -> clauses.forEach(builder::filter), + (p, c) -> parseInnerQueryBuilder(p), + FILTER + ); + PARSER.declareStringArray(StarTreeQueryBuilder::groupby, new ParseField("groupby")); + + } + + private void groupby(List strings) { + groupBy.addAll(strings); + } + + public StarTreeQueryBuilder filter(QueryBuilder queryBuilder) { + if (queryBuilder == null) { + throw new IllegalArgumentException("inner bool query clause cannot be null"); + } + filterClauses.add(queryBuilder); + + for (QueryBuilder filterClause : filterClauses) { + if (filterClause instanceof BoolQueryBuilder) { + BoolQueryBuilder bq = (BoolQueryBuilder) filterClause; + List shouldQbs = bq.should(); + for (QueryBuilder sqb : shouldQbs) { + if (sqb instanceof TermQueryBuilder) { + TermQueryBuilder tq = (TermQueryBuilder) sqb; + String field = tq.fieldName(); + long inputQueryVal = Long.valueOf((String) tq.value()); + List> predicates = predicateMap.getOrDefault(field, new ArrayList<>()); + Predicate predicate = dimVal -> dimVal == inputQueryVal; + predicates.add(predicate); + predicateMap.put(field, predicates); + } + } + } + } + + return this; + } + + public static StarTreeQueryBuilder fromXContent(XContentParser parser) { + try { + return PARSER.apply(parser, null); + } catch (IllegalArgumentException e) { + throw new ParsingException(parser.getTokenLocation(), e.getMessage(), e); + } + } + + @Override + protected Query doToQuery(QueryShardContext context) { + // TODO : star tree supports either group by or filter + if (predicateMap.size() > 0) { + return new StarTreeQuery(predicateMap, new HashSet<>()); + } + logger.info("Group by : {} ", this.groupBy.toString() ); + return new StarTreeQuery(new HashMap<>(), this.groupBy); + } + + @Override + protected boolean doEquals(StarTreeQueryBuilder other) { + return true; + } + + @Override + protected int doHashCode() { + return 0; + } + + @Override + public String getWriteableName() { + return NAME; + } +} diff --git a/server/src/main/java/org/opensearch/search/query/startree/package-info.java b/server/src/main/java/org/opensearch/search/query/startree/package-info.java new file mode 100644 index 0000000000000..93d166e7c0af6 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/query/startree/package-info.java @@ -0,0 +1,9 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.query.startree; diff --git a/server/src/main/java/org/opensearch/search/startree/StarTreeAggregatorFactory.java b/server/src/main/java/org/opensearch/search/startree/StarTreeAggregatorFactory.java new file mode 100644 index 0000000000000..c04774dfd9350 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/startree/StarTreeAggregatorFactory.java @@ -0,0 +1,64 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.startree; + + +import org.opensearch.index.query.QueryShardContext; +import org.opensearch.search.aggregations.Aggregator; +import org.opensearch.search.aggregations.AggregatorFactories; +import org.opensearch.search.aggregations.AggregatorFactory; +import org.opensearch.search.aggregations.CardinalityUpperBound; +import org.opensearch.search.aggregations.support.CoreValuesSourceType; +import org.opensearch.search.aggregations.support.ValuesSource; +import org.opensearch.search.aggregations.support.ValuesSourceAggregatorFactory; +import org.opensearch.search.aggregations.support.ValuesSourceConfig; +import org.opensearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public class StarTreeAggregatorFactory extends ValuesSourceAggregatorFactory { + private List metrics; + + public StarTreeAggregatorFactory( + String aggregationName, + QueryShardContext queryShardContext, + ValuesSourceConfig config, + List metrics + ) throws IOException { + super(aggregationName, config, queryShardContext, null, AggregatorFactories.builder(), null); + this.metrics = metrics; + } + + @Override + public Aggregator createInternal( + SearchContext searchContext, + Aggregator parent, + CardinalityUpperBound cardinality, + Map metadata + ) throws IOException { + return new StarTreeSumAggregator(name, this.config , searchContext, null, metadata, metrics); + } + + @Override + protected Aggregator createUnmapped(SearchContext searchContext, Aggregator parent, Map metadata) throws IOException { + return null; + } + + @Override + protected Aggregator doCreateInternal(SearchContext searchContext, Aggregator parent, CardinalityUpperBound cardinality, Map metadata) throws IOException { + return null; + } + + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } +} diff --git a/server/src/main/java/org/opensearch/search/startree/StarTreeFilter.java b/server/src/main/java/org/opensearch/search/startree/StarTreeFilter.java new file mode 100644 index 0000000000000..f7a5f4378422f --- /dev/null +++ b/server/src/main/java/org/opensearch/search/startree/StarTreeFilter.java @@ -0,0 +1,293 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.startree; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.util.DocIdSetBuilder; +import org.opensearch.index.codec.composite.datacube.startree.StarTreeValues; +import org.opensearch.index.compositeindex.datacube.startree.node.StarTreeNode; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.function.Predicate; + +import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; + +/** Filter operator for star tree data structure. */ +public class StarTreeFilter { + private static final Logger logger = LogManager.getLogger(StarTreeFilter.class); + + /** Helper class to wrap the result from traversing the star tree. */ + static class StarTreeResult { + final DocIdSetBuilder _matchedDocIds; + final Set _remainingPredicateColumns; + final int numOfMatchedDocs; + final int maxMatchedDoc; + + StarTreeResult(DocIdSetBuilder matchedDocIds, Set remainingPredicateColumns, int numOfMatchedDocs, int maxMatchedDoc) { + _matchedDocIds = matchedDocIds; + _remainingPredicateColumns = remainingPredicateColumns; + this.numOfMatchedDocs = numOfMatchedDocs; + this.maxMatchedDoc = maxMatchedDoc; + } + } + + private final StarTreeNode starTreeRoot; + + Map>> _predicateEvaluators; + // private final List _groupByColumns; + + DocIdSetBuilder docsWithField; + + DocIdSetBuilder.BulkAdder adder; + Map dimValueMap; + + public StarTreeFilter( + StarTreeValues starTreeAggrStructure, + Map>> predicateEvaluators, + List groupByColumns + ) { + // This filter operator does not support AND/OR/NOT operations. + starTreeRoot = starTreeAggrStructure.getRoot(); + dimValueMap = starTreeAggrStructure.getDimensionDocValuesIteratorMap(); + _predicateEvaluators = predicateEvaluators != null ? predicateEvaluators : Collections.emptyMap(); + // _groupByColumns = groupByColumns != null ? groupByColumns : Collections.emptyList(); + + // TODO : this should be the maximum number of doc values + docsWithField = new DocIdSetBuilder(Integer.MAX_VALUE); + } + + /** + *
    + *
  • First go over the star tree and try to match as many dimensions as possible + *
  • For the remaining columns, use doc values indexes to match them + *
+ */ + public DocIdSetIterator getStarTreeResult() throws IOException { + StarTreeResult starTreeResult = traverseStarTree(); + List andIterators = new ArrayList<>(); + andIterators.add(starTreeResult._matchedDocIds.build().iterator()); + DocIdSetIterator docIdSetIterator = andIterators.get(0); + // No matches, return + if (starTreeResult.maxMatchedDoc == -1) { + return docIdSetIterator; + } + int docCount = 0; + for (String remainingPredicateColumn : starTreeResult._remainingPredicateColumns) { + // TODO : set to max value of doc values + logger.info("remainingPredicateColumn : {}, maxMatchedDoc : {} ", remainingPredicateColumn, starTreeResult.maxMatchedDoc); + DocIdSetBuilder builder = new DocIdSetBuilder(starTreeResult.maxMatchedDoc + 1); + List> compositePredicateEvaluators = _predicateEvaluators.get(remainingPredicateColumn); + SortedNumericDocValues ndv = (SortedNumericDocValues) this.dimValueMap.get(remainingPredicateColumn); + List docIds = new ArrayList<>(); + while (docIdSetIterator.nextDoc() != NO_MORE_DOCS) { + docCount++; + int docID = docIdSetIterator.docID(); + if (ndv.advanceExact(docID)) { + final int valuesCount = ndv.docValueCount(); + long value = ndv.nextValue(); + for (Predicate compositePredicateEvaluator : compositePredicateEvaluators) { + // TODO : this might be expensive as its done against all doc values docs + if (compositePredicateEvaluator.test(value)) { + docIds.add(docID); + for (int i = 0; i < valuesCount - 1; i++) { + while (docIdSetIterator.nextDoc() != NO_MORE_DOCS) { + docIds.add(docIdSetIterator.docID()); + } + } + break; + } + } + } + } + DocIdSetBuilder.BulkAdder adder = builder.grow(docIds.size()); + for (int docID : docIds) { + adder.add(docID); + } + docIdSetIterator = builder.build().iterator(); + } + return docIdSetIterator; + } + + /** + * Helper method to traverse the star tree, get matching documents and keep track of all the + * predicate dimensions that are not matched. + */ + private StarTreeResult traverseStarTree() throws IOException { + Set globalRemainingPredicateColumns = null; + + StarTreeNode starTree = starTreeRoot; + + List dimensionNames = new ArrayList<>(dimValueMap.keySet()); + + // Track whether we have found a leaf node added to the queue. If we have found a leaf node, and + // traversed to the + // level of the leave node, we can set globalRemainingPredicateColumns if not already set + // because we know the leaf + // node won't split further on other predicate columns. + boolean foundLeafNode = starTree.isLeaf(); + + // Use BFS to traverse the star tree + Queue queue = new ArrayDeque<>(); + queue.add(starTree); + int currentDimensionId = -1; + Set remainingPredicateColumns = new HashSet<>(_predicateEvaluators.keySet()); + // Set remainingGroupByColumns = new HashSet<>(_groupByColumns); + if (foundLeafNode) { + globalRemainingPredicateColumns = new HashSet<>(remainingPredicateColumns); + } + + int matchedDocsCountInStarTree = 0; + int maxDocNum = -1; + + StarTreeNode starTreeNode; + List docIds = new ArrayList<>(); + while ((starTreeNode = queue.poll()) != null) { + int dimensionId = starTreeNode.getDimensionId(); + if (dimensionId > currentDimensionId) { + // Previous level finished + String dimension = dimensionNames.get(dimensionId); + remainingPredicateColumns.remove(dimension); + // remainingGroupByColumns.remove(dimension); + if (foundLeafNode && globalRemainingPredicateColumns == null) { + globalRemainingPredicateColumns = new HashSet<>(remainingPredicateColumns); + } + currentDimensionId = dimensionId; + } + + // If all predicate columns columns are matched, we can use aggregated document + if (remainingPredicateColumns.isEmpty()) { + int docId = starTreeNode.getAggregatedDocId(); + docIds.add(docId); + matchedDocsCountInStarTree++; + maxDocNum = Math.max(docId, maxDocNum); + continue; + } + + // For leaf node, because we haven't exhausted all predicate columns and group-by columns, we + // cannot use + // the aggregated document. Add the range of documents for this node to the bitmap, and keep + // track of the + // remaining predicate columns for this node + if (starTreeNode.isLeaf()) { + for (long i = starTreeNode.getStartDocId(); i < starTreeNode.getEndDocId(); i++) { + docIds.add((int) i); + matchedDocsCountInStarTree++; + maxDocNum = Math.max((int) i, maxDocNum); + } + continue; + } + + // For non-leaf node, proceed to next level + String childDimension = dimensionNames.get(dimensionId + 1); + + // Only read star-node when the dimension is not in the global remaining predicate columns or + // group-by columns + // because we cannot use star-node in such cases + StarTreeNode starNode = null; + if ((globalRemainingPredicateColumns == null || !globalRemainingPredicateColumns.contains(childDimension))) { + starNode = starTreeNode.getChildForDimensionValue(StarTreeNode.ALL); + } + + if (remainingPredicateColumns.contains(childDimension)) { + // Have predicates on the next level, add matching nodes to the queue + + // Calculate the matching dictionary ids for the child dimension + int numChildren = starTreeNode.getNumChildren(); + + // If number of matching dictionary ids is large, use scan instead of binary search + + Iterator childrenIterator = starTreeNode.getChildrenIterator(); + + // When the star-node exists, and the number of matching doc ids is more than or equal to + // the + // number of non-star child nodes, check if all the child nodes match the predicate, and use + // the star-node if so + if (starNode != null) { + List matchingChildNodes = new ArrayList<>(); + boolean findLeafChildNode = false; + while (childrenIterator.hasNext()) { + StarTreeNode childNode = childrenIterator.next(); + List> predicates = _predicateEvaluators.get(childDimension); + for (Predicate predicate : predicates) { + long val = childNode.getDimensionValue(); + if (predicate.test(val)) { + matchingChildNodes.add(childNode); + findLeafChildNode |= childNode.isLeaf(); + break; + } + } + } + if (matchingChildNodes.size() == numChildren - 1) { + // All the child nodes (except for the star-node) match the predicate, use the star-node + queue.add(starNode); + foundLeafNode |= starNode.isLeaf(); + } else { + // Some child nodes do not match the predicate, use the matching child nodes + queue.addAll(matchingChildNodes); + foundLeafNode |= findLeafChildNode; + } + } else { + // Cannot use the star-node, use the matching child nodes + while (childrenIterator.hasNext()) { + StarTreeNode childNode = childrenIterator.next(); + List> predicates = _predicateEvaluators.get(childDimension); + for (Predicate predicate : predicates) { + if (predicate.test(childNode.getDimensionValue())) { + queue.add(childNode); + foundLeafNode |= childNode.isLeaf(); + break; + } + } + } + } + } else { + // No predicate on the next level + + if (starNode != null) { + // Star-node exists, use it + queue.add(starNode); + foundLeafNode |= starNode.isLeaf(); + } else { + // Star-node does not exist or cannot be used, add all non-star nodes to the queue + Iterator childrenIterator = starTreeNode.getChildrenIterator(); + while (childrenIterator.hasNext()) { + StarTreeNode childNode = childrenIterator.next(); + if (childNode.getDimensionValue() != StarTreeNode.ALL) { + queue.add(childNode); + foundLeafNode |= childNode.isLeaf(); + } + } + } + } + } + + adder = docsWithField.grow(docIds.size()); + for (int id : docIds) { + adder.add(id); + } + return new StarTreeResult( + docsWithField, + globalRemainingPredicateColumns != null ? globalRemainingPredicateColumns : Collections.emptySet(), + matchedDocsCountInStarTree, + maxDocNum + ); + } +} diff --git a/server/src/main/java/org/opensearch/search/startree/StarTreeQuery.java b/server/src/main/java/org/opensearch/search/startree/StarTreeQuery.java new file mode 100644 index 0000000000000..32bcc8b113c06 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/startree/StarTreeQuery.java @@ -0,0 +1,119 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.startree; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SegmentReader; +import org.apache.lucene.search.ConstantScoreScorer; +import org.apache.lucene.search.ConstantScoreWeight; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.QueryVisitor; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Scorer; +import org.apache.lucene.search.Weight; +import org.apache.lucene.util.Accountable; +import org.opensearch.common.lucene.Lucene; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.codec.composite.CompositeIndexReader; +import org.opensearch.index.codec.composite.datacube.startree.StarTreeValues; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; + +/** Query class for querying star tree data structure */ +public class StarTreeQuery extends Query implements Accountable { + + /** + * Star tree field info + * This is used to get the star tree data structure + */ + CompositeIndexFieldInfo starTree; + + /** + * Map of field name to a list of predicates to be applied on that field + * This is used to filter the data based on the predicates + */ + Map>> compositePredicateMap; + + /** + * Set of field names to be used for grouping the results + * This is used to group the data based on the fields + */ + List groupByColumns; + + public StarTreeQuery( + CompositeIndexFieldInfo starTree, + Map>> compositePredicateMap, + List groupByColumns + ) { + this.starTree = starTree; + this.compositePredicateMap = compositePredicateMap; + this.groupByColumns = groupByColumns; + } + + @Override + public String toString(String field) { + return null; + } + + @Override + public void visit(QueryVisitor visitor) { + visitor.visitLeaf(this); + } + + @Override + public boolean equals(Object obj) { + return sameClassAs(obj); + } + + @Override + public int hashCode() { + return classHash(); + } + + @Override + public long ramBytesUsed() { + return 0; + } + + @Override + public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float boost) throws IOException { + return new ConstantScoreWeight(this, boost) { + @Override + public Scorer scorer(LeafReaderContext context) throws IOException { + SegmentReader reader = Lucene.segmentReader(context.reader()); + + // We get the 'CompositeIndexReader' instance so that we can get StarTreeValues + if (!(reader.getDocValuesReader() instanceof CompositeIndexReader)) return null; + + CompositeIndexReader starTreeDocValuesReader = (CompositeIndexReader) reader.getDocValuesReader(); + List compositeIndexFields = starTreeDocValuesReader.getCompositeIndexFields(); + StarTreeValues starTreeValues = null; + if (compositeIndexFields != null && !compositeIndexFields.isEmpty()) { + starTreeValues = (StarTreeValues) starTreeDocValuesReader.getCompositeIndexValues(starTree); + } else { + return null; + } + + StarTreeFilter filter = new StarTreeFilter(starTreeValues, compositePredicateMap, groupByColumns); + DocIdSetIterator result = filter.getStarTreeResult(); + return new ConstantScoreScorer(this, score(), scoreMode, result); + } + + @Override + public boolean isCacheable(LeafReaderContext ctx) { + return false; + } + }; + } +} diff --git a/server/src/main/java/org/opensearch/search/startree/StarTreeSumAggregator.java b/server/src/main/java/org/opensearch/search/startree/StarTreeSumAggregator.java new file mode 100644 index 0000000000000..9180aac80100e --- /dev/null +++ b/server/src/main/java/org/opensearch/search/startree/StarTreeSumAggregator.java @@ -0,0 +1,76 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.startree; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SegmentReader; +import org.opensearch.common.lucene.Lucene; +import org.opensearch.common.util.BigArrays; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.codec.composite.CompositeIndexReader; +import org.opensearch.index.codec.composite.datacube.startree.StarTreeValues; +import org.opensearch.index.fielddata.SortedNumericDoubleValues; +import org.opensearch.search.aggregations.Aggregator; +import org.opensearch.search.aggregations.LeafBucketCollector; +import org.opensearch.search.aggregations.LeafBucketCollectorBase; +import org.opensearch.search.aggregations.metrics.CompensatedSum; +import org.opensearch.search.aggregations.metrics.SumAggregator; +import org.opensearch.search.aggregations.support.ValuesSource; +import org.opensearch.search.aggregations.support.ValuesSourceConfig; +import org.opensearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +public class StarTreeSumAggregator extends SumAggregator { + + + public StarTreeSumAggregator( + String name, + ValuesSourceConfig config, + SearchContext context, + Aggregator parent, + Map metadata, + List metrics + ) throws IOException { + super(name, config, context, parent, metadata); + } + + private List metrics; + + @Override + public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { + //StarTreeAggregatedValues values = (StarTreeAggregatedValues) ctx.reader().getAggregatedDocValues(); + SegmentReader reader = Lucene.segmentReader(ctx.reader()); + + if (!(reader.getDocValuesReader() instanceof CompositeIndexReader)) return null; + CompositeIndexReader starTreeDocValuesReader = (CompositeIndexReader) reader.getDocValuesReader(); + List fiList = starTreeDocValuesReader.getCompositeIndexFields(); + StarTreeValues values = (StarTreeValues) starTreeDocValuesReader.getCompositeIndexValues(fiList.get(0)); + final AtomicReference aggrVal = new AtomicReference<>(null); + + final BigArrays bigArrays = context.bigArrays(); + final CompensatedSum kahanSummation = new CompensatedSum(0, 0); + + return new LeafBucketCollectorBase(sub, values) { + @Override + public void collect(int doc, long bucket) throws IOException { + sums = bigArrays.grow(sums, bucket + 1); + compensations = bigArrays.grow(compensations, bucket + 1); + compensations.set(bucket, kahanSummation.delta()); + sums.set(bucket, kahanSummation.value()); + } + + + }; + } + } diff --git a/server/src/main/java/org/opensearch/search/startree/package-info.java b/server/src/main/java/org/opensearch/search/startree/package-info.java new file mode 100644 index 0000000000000..cb0802988f1f9 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/startree/package-info.java @@ -0,0 +1,9 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.startree;