Skip to content

Commit

Permalink
original or star tree
Browse files Browse the repository at this point in the history
Signed-off-by: Sandesh Kumar <[email protected]>
  • Loading branch information
sandeshkr419 committed Aug 8, 2024
1 parent deb0e0a commit ca12366
Show file tree
Hide file tree
Showing 13 changed files with 220 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -499,10 +499,10 @@ public boolean indexSortedOnField(String field) {
return indexSortConfig.hasPrimarySortOnField(field);
}

public ParsedQuery toStarTreeQuery(Map<String, List<Predicate<Long>>> compositePredicateMap,
public StarTreeQuery toStarTreeQuery(Map<String, List<Predicate<Long>>> compositePredicateMap,
Set<String> groupByColumns) {
StarTreeQuery starTreeQuery = new StarTreeQuery(compositePredicateMap, groupByColumns);
return new ParsedQuery(starTreeQuery);
return starTreeQuery;
}

public ParsedQuery toQuery(QueryBuilder queryBuilder) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1188,10 +1188,10 @@ public static Engine.Index prepareIndex(
);
}

private Engine.IndexResult index(Engine engine, Engine.Index index) throws IOException {
private Engine.IndexResult index(Engine engine, final Engine.Index preindex) throws IOException {
active.set(true);
final Engine.IndexResult result;
index = indexingOperationListeners.preIndex(shardId, index);
final Engine.Index index = indexingOperationListeners.preIndex(shardId, preindex);
try {
if (logger.isTraceEnabled()) {
// don't use index.source().utf8ToString() here source might not be valid UTF-8
Expand Down
17 changes: 12 additions & 5 deletions server/src/main/java/org/opensearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TopDocs;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionRunnable;
Expand Down Expand Up @@ -116,6 +117,7 @@
import org.opensearch.search.aggregations.metrics.MinAggregatorFactory;
import org.opensearch.search.aggregations.metrics.SumAggregatorFactory;
import org.opensearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree;
import org.opensearch.search.aggregations.startree.OriginalOrStarTreeAggregatorFactory;
import org.opensearch.search.aggregations.startree.StarTreeAggregator;
import org.opensearch.search.aggregations.startree.StarTreeAggregatorFactory;
import org.opensearch.search.aggregations.support.ValuesSourceAggregatorFactory;
Expand All @@ -142,10 +144,12 @@
import org.opensearch.search.internal.ShardSearchRequest;
import org.opensearch.search.lookup.SearchLookup;
import org.opensearch.search.profile.Profilers;
import org.opensearch.search.query.OriginalOrStarTreeQuery;
import org.opensearch.search.query.QueryPhase;
import org.opensearch.search.query.QuerySearchRequest;
import org.opensearch.search.query.QuerySearchResult;
import org.opensearch.search.query.ScrollQuerySearchResult;
import org.opensearch.search.query.startree.StarTreeQuery;
import org.opensearch.search.rescore.RescorerBuilder;
import org.opensearch.search.searchafter.SearchAfterBuilder;
import org.opensearch.search.sort.FieldSortBuilder;
Expand Down Expand Up @@ -1564,7 +1568,7 @@ private boolean setStarTreeQuery(SearchContext context, QueryShardContext queryS
}

// TODO: Support for multiple startrees
CompositeDataCubeFieldType compositeMappedFieldType = (StarTreeMapper.StarTreeFieldType) context.mapperService().getCompositeFieldTypes().iterator().next();;
CompositeDataCubeFieldType compositeMappedFieldType = (StarTreeMapper.StarTreeFieldType) context.mapperService().getCompositeFieldTypes().iterator().next();
List<String> supportedDimensions = new ArrayList<>(compositeMappedFieldType.fields());
Map<String, List<MetricStat>> supportedMetrics = compositeMappedFieldType.getMetrics().stream()
.collect(Collectors.toMap(Metric::getField, Metric::getMetrics));
Expand Down Expand Up @@ -1612,13 +1616,16 @@ private boolean setStarTreeQuery(SearchContext context, QueryShardContext queryS
}
String metricKey = StarTreeHelper.fullFieldNameForStarTreeMetricsDocValues(compositeMappedFieldType.name(), field, metric.toLowerCase());

ParsedQuery query = queryShardContext.toStarTreeQuery(null, Set.of(dimension));

StarTreeQuery query = queryShardContext.toStarTreeQuery(null, Set.of(dimension));
AggregatorFactories originalFactories = source.aggregations().build(queryShardContext, null);
StarTreeAggregatorFactory factory = new StarTreeAggregatorFactory(defaultFactory.name(), defaultSubFactory.name(), queryShardContext, null, AggregatorFactories.builder(), null, List.of(dimension), List.of(metricKey));
StarTreeAggregatorFactory[] factories = {factory};
OriginalOrStarTreeAggregatorFactory[] factories = { new OriginalOrStarTreeAggregatorFactory(defaultFactory.name(), queryShardContext, defaultFactory.getParent(), AggregatorFactories.builder(), null, List.of(factory), (TermsAggregatorFactory) originalFactories.getFactories()[0])};
AggregatorFactories aggregatorFactories = new AggregatorFactories(factories);

context.parsedQuery(query)



context.parsedQuery(new ParsedQuery(new OriginalOrStarTreeQuery(List.of(query), context.query())))
.aggregations(new SearchContextAggregations(aggregatorFactories, multiBucketConsumerService.create()));

return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ public final void collectExistingBucket(LeafBucketCollector subCollector, int do
// the execution if we are running out. To achieve that we are passing 0 as a bucket count.
multiBucketConsumer.accept(0);
}
subCollector.collect(doc, bucketOrd);

.collect(doc, bucketOrd);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public static class Bucket extends InternalMultiTerms.InternalBucket {
public Bucket(String key, double sum, InternalAggregations aggregations) {
this.key = key;
this.sum = sum;
this.docCount = 0;
this.aggregations = aggregations;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.aggregations.startree;

import org.apache.lucene.index.LeafReaderContext;
import org.opensearch.search.aggregations.Aggregator;
import org.opensearch.search.aggregations.AggregatorFactories;
import org.opensearch.search.aggregations.CardinalityUpperBound;
import org.opensearch.search.aggregations.InternalAggregation;
import org.opensearch.search.aggregations.LeafBucketCollector;
import org.opensearch.search.aggregations.bucket.BucketsAggregator;
import org.opensearch.search.aggregations.bucket.SingleBucketAggregator;
import org.opensearch.search.internal.SearchContext;

import java.io.IOException;
import java.util.Map;

public class OriginalOrStarTreeAggregator extends BucketsAggregator implements SingleBucketAggregator {


public OriginalOrStarTreeAggregator(String name, AggregatorFactories factories, SearchContext context, Aggregator parent, CardinalityUpperBound bucketCardinality, Map<String, Object> metadata) throws IOException {
super(name, factories, context, parent, bucketCardinality, metadata);
}

@Override
protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
return null;
}

@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
return new InternalAggregation[0];
}

@Override
public InternalAggregation buildEmptyAggregation() {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.aggregations.startree;

import org.opensearch.index.query.QueryShardContext;
import org.opensearch.search.aggregations.Aggregator;
import org.opensearch.search.aggregations.AggregatorFactories;
import org.opensearch.search.aggregations.AggregatorFactory;
import org.opensearch.search.aggregations.CardinalityUpperBound;
import org.opensearch.search.aggregations.bucket.terms.TermsAggregator;
import org.opensearch.search.aggregations.bucket.terms.TermsAggregatorFactory;
import org.opensearch.search.internal.SearchContext;

import java.io.IOException;
import java.util.List;
import java.util.Map;

public class OriginalOrStarTreeAggregatorFactory extends AggregatorFactory {

List<StarTreeAggregatorFactory> starTreeAggregatorFactories;
TermsAggregatorFactory originalAggregatorFactory;

public OriginalOrStarTreeAggregatorFactory(
String aggregationName,
QueryShardContext queryShardContext,
AggregatorFactory parent,
AggregatorFactories.Builder subFactoriesBuilder,
Map<String, Object> metadata,
List<StarTreeAggregatorFactory> starTreeAggregatorFactories,
TermsAggregatorFactory originalAggregatorFactory
) throws IOException {
super(aggregationName, queryShardContext, parent, subFactoriesBuilder, metadata);
this.starTreeAggregatorFactories = starTreeAggregatorFactories;
this.originalAggregatorFactory = originalAggregatorFactory;
}

@Override
protected Aggregator createInternal(SearchContext searchContext, Aggregator parent, CardinalityUpperBound cardinality, Map<String, Object> metadata) throws IOException {
TermsAggregator originalAggregator = (TermsAggregator) this.originalAggregatorFactory.createInternal(searchContext, parent, cardinality, metadata);
this.starTreeAggregatorFactories.get(0).setOriginalAggregator(originalAggregator);
return this.starTreeAggregatorFactories.get(0).createInternal(searchContext, parent, cardinality, metadata);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@
import org.opensearch.search.aggregations.AggregatorFactories;
import org.opensearch.search.aggregations.CardinalityUpperBound;
import org.opensearch.search.aggregations.InternalAggregation;
import org.opensearch.search.aggregations.InternalAggregations;
import org.opensearch.search.aggregations.LeafBucketCollector;
import org.opensearch.search.aggregations.LeafBucketCollectorBase;
import org.opensearch.search.aggregations.bucket.BucketsAggregator;
import org.opensearch.search.aggregations.bucket.SingleBucketAggregator;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.query.OriginalOrStarTreeQuery;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -48,6 +50,7 @@

public class StarTreeAggregator extends BucketsAggregator implements SingleBucketAggregator {

BucketsAggregator originalAggregator;
private Map<String, Double> sumMap = new HashMap<>();
private Map<String, Integer> indexMap = new HashMap<>();

Expand All @@ -56,9 +59,11 @@ public class StarTreeAggregator extends BucketsAggregator implements SingleBucke

String subAggregationName;


private static final Logger logger = LogManager.getLogger(StarTreeAggregator.class);

public StarTreeAggregator(
BucketsAggregator originalAggregator,
String aggregationName,
String subAggregationName,
AggregatorFactories factories,
Expand All @@ -72,6 +77,7 @@ public StarTreeAggregator(
this.fieldCols = fieldCols;
this.metrics = metrics;
this.subAggregationName = subAggregationName;
this.originalAggregator = originalAggregator;
}

// public static class StarTree implements Writeable, ToXContentObject {
Expand Down Expand Up @@ -138,12 +144,10 @@ public StarTreeAggregator(

@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {

return buildAggregationsForFixedBucketCount(
owningBucketOrds,
indexMap.size(),
(offsetInOwningOrd, docCount, subAggregationResults) -> {
// TODO : make this better
String key = "";
for (Map.Entry<String, Integer> entry : indexMap.entrySet()) {
if (offsetInOwningOrd == entry.getValue()) {
Expand All @@ -165,14 +169,20 @@ public InternalStarTree create(String name, List<InternalStarTree.Bucket> ranges

@Override
public InternalAggregation buildEmptyAggregation() {
return new InternalStarTree(name, new ArrayList(), new HashMap<>());
return this.originalAggregator.buildEmptyAggregation();
// return new InternalStarTree(name, new ArrayList(), new HashMap<>());
}

@Override
protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
//StarTreeAggregatedValues values = (StarTreeAggregatedValues) ctx.reader().getAggregatedDocValues();
SegmentReader reader = Lucene.segmentReader(ctx.reader());

// override sub-aggregations
if (context.query() instanceof OriginalOrStarTreeQuery) {
int startreeUsed = ((OriginalOrStarTreeQuery)context.query()).getStarTreeQueryUsed();
}

if(!(reader.getDocValuesReader() instanceof CompositeIndexReader)) return null;
CompositeIndexReader starTreeDocValuesReader = (CompositeIndexReader) reader.getDocValuesReader();
List<CompositeIndexFieldInfo> fiList = starTreeDocValuesReader.getCompositeIndexFields();
Expand Down Expand Up @@ -211,6 +221,7 @@ public void collect(int doc, long bucket) throws IOException {
sumMap.put(key, val);
}
collectBucket(sub, doc, subBucketOrdinal(bucket, indexMap.get(key)));
// incrementBucketDocCount(bucket, 1);
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.search.aggregations.AggregatorFactories;
import org.opensearch.search.aggregations.AggregatorFactory;
import org.opensearch.search.aggregations.CardinalityUpperBound;
import org.opensearch.search.aggregations.bucket.terms.TermsAggregator;
import org.opensearch.search.internal.SearchContext;

import java.io.IOException;
Expand All @@ -23,6 +24,7 @@ public class StarTreeAggregatorFactory extends AggregatorFactory {
private List<String> fieldCols;
private List<String> metrics;
String subAggregationName;
TermsAggregator originalAggregator;

public StarTreeAggregatorFactory(
String aggregationName,
Expand All @@ -47,11 +49,15 @@ public Aggregator createInternal(
CardinalityUpperBound cardinality,
Map<String, Object> metadata
) throws IOException {
return new StarTreeAggregator(name, subAggregationName, factories, searchContext, parent, metadata, fieldCols, metrics);
return new StarTreeAggregator(this.originalAggregator, name, subAggregationName, factories, searchContext, parent, metadata, fieldCols, metrics);
}

@Override
protected boolean supportsConcurrentSegmentSearch() {
return true;
}

public void setOriginalAggregator(TermsAggregator aggregator) {
this.originalAggregator = aggregator;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -524,9 +524,7 @@ static class RequestRewritable implements Rewriteable<Rewriteable> {
public Rewriteable rewrite(QueryRewriteContext ctx) throws IOException {
SearchSourceBuilder newSource = request.source() == null ? null : Rewriteable.rewrite(request.source(), ctx);
AliasFilter newAliasFilter = Rewriteable.rewrite(request.getAliasFilter(), ctx);

QueryShardContext shardContext = ctx.convertToShardContext();

FieldSortBuilder primarySort = FieldSortBuilder.getPrimaryFieldSortOrNull(newSource);
if (shardContext != null
&& primarySort != null
Expand Down
Loading

0 comments on commit ca12366

Please sign in to comment.