Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Draft] [Star Tree] [Search] Resolving Date histogram with metric aggregation using star-tree #16674

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.opensearch.search.aggregations.AggregatorFactory;
import org.opensearch.search.aggregations.LeafBucketCollector;
import org.opensearch.search.aggregations.LeafBucketCollectorBase;
import org.opensearch.search.aggregations.bucket.histogram.DateHistogramAggregatorFactory;
import org.opensearch.search.aggregations.metrics.MetricAggregatorFactory;
import org.opensearch.search.aggregations.support.ValuesSource;
import org.opensearch.search.builder.SearchSourceBuilder;
Expand All @@ -40,6 +41,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -74,9 +76,14 @@ public static StarTreeQueryContext getStarTreeQueryContext(SearchContext context
);

for (AggregatorFactory aggregatorFactory : context.aggregations().factories().getFactories()) {
// first check for aggregation is a metric aggregation
MetricStat metricStat = validateStarTreeMetricSupport(compositeMappedFieldType, aggregatorFactory);

// if not a metric aggregation, check for applicable date histogram shape
if (metricStat == null) {
return null;
if (validateDateHistogramSupport(compositeMappedFieldType, aggregatorFactory) == false) {
return null;
}
}
}

Expand Down Expand Up @@ -159,6 +166,20 @@ private static MetricStat validateStarTreeMetricSupport(
return null;
}

private static boolean validateDateHistogramSupport(
CompositeDataCubeFieldType compositeIndexFieldInfo,
AggregatorFactory aggregatorFactory
) {
if (aggregatorFactory instanceof DateHistogramAggregatorFactory && aggregatorFactory.getSubFactories().getFactories().length == 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably have a recursive method which checks if parent aggregator is star tree supported and if children aggregators are supported and so on.

And we can make the supported aggregators configurable.

AggregatorFactory subFactory = aggregatorFactory.getSubFactories().getFactories()[0];
MetricStat metricStat = validateStarTreeMetricSupport(compositeIndexFieldInfo, subFactory);
if (metricStat != null) {
return true;
}
}
return false;
}

public static CompositeIndexFieldInfo getSupportedStarTree(SearchContext context) {
StarTreeQueryContext starTreeQueryContext = context.getStarTreeQueryContext();
return (starTreeQueryContext != null) ? starTreeQueryContext.getStarTree() : null;
Expand Down Expand Up @@ -240,7 +261,7 @@ public static FixedBitSet getStarTreeFilteredValues(SearchContext context, LeafR
throws IOException {
FixedBitSet result = context.getStarTreeQueryContext().getStarTreeValues(ctx);
if (result == null) {
result = StarTreeFilter.getStarTreeResult(starTreeValues, context.getStarTreeQueryContext().getQueryMap());
result = StarTreeFilter.getStarTreeResult(starTreeValues, context.getStarTreeQueryContext().getQueryMap(), Set.of());
context.getStarTreeQueryContext().setStarTreeValues(ctx, result);
}
return result;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/// *
// * SPDX-License-Identifier: Apache-2.0
// *
// * The OpenSearch Contributors require contributions made to
// * this file be licensed under the Apache-2.0 license or a
// * compatible open source license.
// */
//
// package org.opensearch.search.aggregations;
//
// import java.io.IOException;
//
// public abstract class StarTreeBucketCollector extends LeafBucketCollector {
//
// public abstract void collectStarEntry(int starTreeEntry, long bucket) throws IOException;
// }
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/// *
// * SPDX-License-Identifier: Apache-2.0
// *
// * The OpenSearch Contributors require contributions made to
// * this file be licensed under the Apache-2.0 license or a
// * compatible open source license.
// */
//
/// *
// * Modifications Copyright OpenSearch Contributors. See
// * GitHub history for details.
// */
// package org.opensearch.search.aggregations;
//
// import org.apache.lucene.search.Scorable;
// import org.opensearch.common.lucene.ScorerAware;
//
// import java.io.IOException;
//
/// **
// * A {@link LeafBucketCollector} that delegates all calls to the sub leaf
// * aggregator and sets the scorer on its source of values if it implements
// * {@link ScorerAware}.
// *
// * @opensearch.internal
// */
// public class StarTreeLeafBucketCollectorBase extends StarTreeBucketCollector {
// private final LeafBucketCollector sub;
// private final ScorerAware values;
//
// /**
// * @param sub The leaf collector for sub aggregations.
// * @param values The values. {@link ScorerAware#setScorer} will be called automatically on them if they implement {@link ScorerAware}.
// */
// public StarTreeLeafBucketCollectorBase(LeafBucketCollector sub, Object values) {
// this.sub = sub;
// if (values instanceof ScorerAware) {
// this.values = (ScorerAware) values;
// } else {
// this.values = null;
// }
// }
//
// @Override
// public void setScorer(Scorable s) throws IOException {
// sub.setScorer(s);
// if (values != null) {
// values.setScorer(s);
// }
// }
//
// @Override
// public void collect(int doc, long bucket) throws IOException {
// sub.collect(doc, bucket);
// }
//
// @Override
// public void collectStarEntry(int starTreeEntry, long bucket) throws IOException {}
// }
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,15 @@ public final void collectExistingBucket(LeafBucketCollector subCollector, int do
subCollector.collect(doc, bucketOrd);
}

public final void collectStarTreeBucket(long docCount, long bucketOrd)
throws IOException {
if (docCounts.increment(bucketOrd, docCount) == docCount) {
multiBucketConsumer.accept(0);
}
// Only collect own bucket & not sub-aggregator buckets
// subCollector.collectStarEntry(entryBit, bucketOrd);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line won't exist, right?

Could we also do this by calling collectExistingBucket with LeafBucketCollector.NO_OP_COLLECTOR?

}

/**
* This only tidies up doc counts. Call {@link MergingBucketsDeferringCollector#mergeBuckets(long[])} to merge the actual
* ordinals and doc ID deltas.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,16 @@
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.util.CollectionUtil;
import org.apache.lucene.util.FixedBitSet;
import org.opensearch.common.Nullable;
import org.opensearch.common.Rounding;
import org.opensearch.common.lease.Releasables;
import org.opensearch.index.codec.composite.CompositeIndexFieldInfo;
import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues;
import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedNumericStarTreeValuesIterator;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.aggregations.Aggregator;
import org.opensearch.search.aggregations.AggregatorFactories;
Expand All @@ -51,16 +56,20 @@
import org.opensearch.search.aggregations.bucket.filterrewrite.DateHistogramAggregatorBridge;
import org.opensearch.search.aggregations.bucket.filterrewrite.FilterRewriteOptimizationContext;
import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
import org.opensearch.search.aggregations.metrics.StarTreeCollector;
import org.opensearch.search.aggregations.support.ValuesSource;
import org.opensearch.search.aggregations.support.ValuesSourceConfig;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.startree.StarTreeFilter;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Function;

import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeQueryHelper.getStarTreeValues;
import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeQueryHelper.getSupportedStarTree;
import static org.opensearch.search.aggregations.bucket.filterrewrite.DateHistogramAggregatorBridge.segmentMatchAll;

/**
Expand Down Expand Up @@ -171,6 +180,59 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol
if (optimized) throw new CollectionTerminatedException();

SortedNumericDocValues values = valuesSource.longValues(ctx);

// Will migrate this to a separate precompute utility
CompositeIndexFieldInfo supportedStarTree = getSupportedStarTree(this.context);
if (supportedStarTree != null) {
StarTreeValues starTreeValues = getStarTreeValues(ctx, supportedStarTree);
assert starTreeValues != null;

FixedBitSet matchingDocsBitSet = StarTreeFilter.getPredicateValueToFixedBitSetMap(starTreeValues, "@timestamp_month");

SortedNumericStarTreeValuesIterator valuesIterator = (SortedNumericStarTreeValuesIterator) starTreeValues
.getDimensionValuesIterator("@timestamp_month");

SortedNumericStarTreeValuesIterator metricValuesIterator = (SortedNumericStarTreeValuesIterator) starTreeValues
.getMetricValuesIterator("startree1__doc_count_doc_count_metric");

int numBits = matchingDocsBitSet.length();

if (numBits > 0) {
for (int bit = matchingDocsBitSet.nextSetBit(0); bit != DocIdSetIterator.NO_MORE_DOCS; bit = (bit + 1 < numBits)
? matchingDocsBitSet.nextSetBit(bit + 1)
: DocIdSetIterator.NO_MORE_DOCS) {

if (!valuesIterator.advanceExact(bit)) {
continue;
}

for (int i = 0, count = valuesIterator.entryValueCount(); i < count; i++) {
long dimensionValue = valuesIterator.nextValue();

if (metricValuesIterator.advanceExact(bit)) {
long metricValue = metricValuesIterator.nextValue();

long bucketOrd = bucketOrds.add(0, dimensionValue);
if (bucketOrd < 0) {
bucketOrd = -1 - bucketOrd;
collectStarTreeBucket(metricValue, bucketOrd);
} else {
grow(bucketOrd + 1);
collectStarTreeBucket(metricValue, bucketOrd);
}
}
}
}
}

// Run preCompute for all sub-aggregators
for (Aggregator subAggregator : subAggregators) {
// assuming query-matching was already done and only supported query shapes are executed here
((StarTreeCollector) subAggregator).preCompute(ctx, supportedStarTree, bucketOrds);
}
throw new CollectionTerminatedException();
}

return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.aggregations.metrics;

import org.apache.lucene.index.LeafReaderContext;
import org.opensearch.index.codec.composite.CompositeIndexFieldInfo;
import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;

import java.io.IOException;

public interface StarTreeCollector {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe call this StarTreeAggregator, since it's expected that Aggregator subclasses would implement this interface?

// public void collectStarEntry(int starTreeEntryBit, long bucket) throws IOException;

public void preCompute(LeafReaderContext ctx, CompositeIndexFieldInfo starTree, LongKeyedBucketOrds bucketOrds) throws IOException;
}
Loading
Loading