Skip to content

Commit

Permalink
data histogram temp changes
Browse files Browse the repository at this point in the history
Signed-off-by: Sandesh Kumar <[email protected]>
  • Loading branch information
Sandesh Kumar committed Nov 13, 2024
1 parent a2a01f8 commit d2cd24d
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.opensearch.search.aggregations.AggregatorFactory;
import org.opensearch.search.aggregations.LeafBucketCollector;
import org.opensearch.search.aggregations.LeafBucketCollectorBase;
import org.opensearch.search.aggregations.bucket.histogram.DateHistogramAggregatorFactory;
import org.opensearch.search.aggregations.metrics.MetricAggregatorFactory;
import org.opensearch.search.aggregations.support.ValuesSource;
import org.opensearch.search.builder.SearchSourceBuilder;
Expand Down Expand Up @@ -74,9 +75,14 @@ public static StarTreeQueryContext getStarTreeQueryContext(SearchContext context
);

for (AggregatorFactory aggregatorFactory : context.aggregations().factories().getFactories()) {
// first check for aggregation is a metric aggregation
MetricStat metricStat = validateStarTreeMetricSupport(compositeMappedFieldType, aggregatorFactory);

// if not a metric aggregation, check for applicable date histogram shape
if (metricStat == null) {
return null;
if (validateDateHistogramSupport(compositeMappedFieldType, aggregatorFactory) == false) {
return null;
}
}
}

Expand Down Expand Up @@ -159,6 +165,18 @@ private static MetricStat validateStarTreeMetricSupport(
return null;
}

private static boolean validateDateHistogramSupport(CompositeDataCubeFieldType compositeIndexFieldInfo,
AggregatorFactory aggregatorFactory) {
if (aggregatorFactory instanceof DateHistogramAggregatorFactory && aggregatorFactory.getSubFactories().getFactories().length == 1) {
AggregatorFactory subFactory = aggregatorFactory.getSubFactories().getFactories()[0];
MetricStat metricStat = validateStarTreeMetricSupport(compositeIndexFieldInfo, subFactory);
if (metricStat != null) {
return true;
}
}
return false;
}

public static CompositeIndexFieldInfo getSupportedStarTree(SearchContext context) {
StarTreeQueryContext starTreeQueryContext = context.getStarTreeQueryContext();
return (starTreeQueryContext != null) ? starTreeQueryContext.getStarTree() : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,23 @@
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.util.CollectionUtil;
import org.apache.lucene.util.FixedBitSet;
import org.apache.lucene.util.NumericUtils;
import org.opensearch.common.Nullable;
import org.opensearch.common.Rounding;
import org.opensearch.common.lease.Releasables;
import org.opensearch.index.codec.composite.CompositeIndexFieldInfo;
import org.opensearch.index.compositeindex.datacube.MetricStat;
import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues;
import org.opensearch.index.compositeindex.datacube.startree.node.StarTreeNode;
import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeQueryHelper;
import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeUtils;
import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedNumericStarTreeValuesIterator;
import org.opensearch.index.mapper.CompositeDataCubeFieldType;
import org.opensearch.index.compositeindex.datacube.Dimension;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.aggregations.Aggregator;
import org.opensearch.search.aggregations.AggregatorFactories;
Expand All @@ -51,16 +63,29 @@
import org.opensearch.search.aggregations.bucket.filterrewrite.DateHistogramAggregatorBridge;
import org.opensearch.search.aggregations.bucket.filterrewrite.FilterRewriteOptimizationContext;
import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
import org.opensearch.search.aggregations.metrics.CompensatedSum;
import org.opensearch.search.aggregations.support.ValuesSource;
import org.opensearch.search.aggregations.support.ValuesSourceConfig;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.startree.StarTreeFilter;

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeQueryHelper.getStarTreeFilteredValues;
import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeQueryHelper.getStarTreeValues;
import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeQueryHelper.getSupportedStarTree;
import static org.opensearch.search.aggregations.bucket.filterrewrite.DateHistogramAggregatorBridge.segmentMatchAll;

/**
Expand Down Expand Up @@ -171,6 +196,26 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol
if (optimized) throw new CollectionTerminatedException();

SortedNumericDocValues values = valuesSource.longValues(ctx);
CompositeIndexFieldInfo supportedStarTree = getSupportedStarTree(this.context);
if (supportedStarTree != null) {
StarTreeValues starTreeValues = getStarTreeValues(ctx, supportedStarTree);
assert starTreeValues != null;

Map<Long, FixedBitSet> matchingDocsBitSet = StarTreeFilter.getPredicateValueToFixedBitSetMap(starTreeValues, "@timestamp_month");

assert (matchingDocsBitSet.size() == 3);
// return getStarTreeLeafCollector(ctx, sub, supportedStarTree);

// TODO: get doc_coun as well here other than sums (hard-coded right now)
Map<Long, Double> sumMap = getPredicateValueToSumMap(starTreeValues, matchingDocsBitSet, "startree1_status_sum_metric");

assert (sumMap.size() == 3);

// Here we have the sumMap which has all sums
// To extract this info to sub-collector / sub-aggregator buckets is the blocker rn.

}

return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
Expand Down Expand Up @@ -201,6 +246,105 @@ public void collect(int doc, long owningBucketOrd) throws IOException {
};
}

// ignore this method
public LeafBucketCollector getStarTreeLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub, CompositeIndexFieldInfo starTree)
throws IOException {
StarTreeValues starTreeValues = getStarTreeValues(ctx, starTree);
assert starTreeValues != null;

// Field for the date to group by calendar interval (e.g., month)
String dateField = ((ValuesSource.Numeric.FieldData) valuesSource).getIndexFieldName();
String sumMetricName = StarTreeUtils.fullyQualifiedFieldNameForStarTreeMetricsDocValues(
starTree.getField(), "status", MetricStat.SUM.getTypeName()
);

// Date values iterator for each date bucket (e.g., month)
SortedNumericStarTreeValuesIterator dateValuesIterator =
(SortedNumericStarTreeValuesIterator) starTreeValues.getMetricValuesIterator(dateField);
SortedNumericStarTreeValuesIterator sumValuesIterator =
(SortedNumericStarTreeValuesIterator) starTreeValues.getMetricValuesIterator(sumMetricName);

FixedBitSet matchedDocIds = getStarTreeFilteredValues(context, ctx, starTreeValues);
assert matchedDocIds != null;

Map<Long, CompensatedSum> bucketSums = new HashMap<>();
int numBits = matchedDocIds.length();

if (numBits > 0) {
for (int bit = matchedDocIds.nextSetBit(0); bit != DocIdSetIterator.NO_MORE_DOCS;
bit = (bit + 1 < numBits) ? matchedDocIds.nextSetBit(bit + 1) : DocIdSetIterator.NO_MORE_DOCS) {

if (!dateValuesIterator.advanceExact(bit) || !sumValuesIterator.advanceExact(bit)) continue;

// Get the unique calendar interval value for this document
long bucketId = dateValuesIterator.nextValue();

for (int i = 0; i < sumValuesIterator.entryValueCount(); i++) {
long metricValue = sumValuesIterator.nextValue();

if (hardBounds == null || hardBounds.contain(bucketId)) {
bucketSums.computeIfAbsent(bucketId, k -> new CompensatedSum(0.0, 0))
.add(NumericUtils.sortableLongToDouble(metricValue));
}
}
}
}

// Transfer the results from bucketSums to the `sums` array
int idx = 0;
for (Map.Entry<Long, CompensatedSum> entry : bucketSums.entrySet()) {
// sums.set(idx++, entry.getValue().value()); // Assuming `sums` is a pre-allocated array
}

return new LeafBucketCollectorBase(sub, valuesSource.doubleValues(ctx)) {
@Override
public void collect(int doc, long bucket) {
throw new CollectionTerminatedException();
}
};
}

public static Map<Long, Double> getPredicateValueToSumMap(
StarTreeValues starTreeValues,
Map<Long, FixedBitSet> predicateValueToBitSet,
String sumField // The field to compute the sum from
) throws IOException {

Map<Long, Double> predicateValueToSum = new HashMap<>();

// Iterate over each entry in the predicateValueToBitSet map
for (Map.Entry<Long, FixedBitSet> entry : predicateValueToBitSet.entrySet()) {
Long predicateValue = entry.getKey();
FixedBitSet bitSet = entry.getValue();

double sum = 0;

// Get the doc values for the sumField
SortedNumericStarTreeValuesIterator valuesIterator =
(SortedNumericStarTreeValuesIterator) starTreeValues.getMetricValuesIterator(sumField);

// Iterate over the matching documents in the FixedBitSet
int numBits = bitSet.length();
if (numBits > 0) {
for (int bit = bitSet.nextSetBit(0); bit != DocIdSetIterator.NO_MORE_DOCS; bit = (bit + 1 < numBits)
? bitSet.nextSetBit(bit + 1)
: DocIdSetIterator.NO_MORE_DOCS) {

if (valuesIterator.advanceExact(bit)) {
for (int i = 0, count = valuesIterator.entryValueCount(); i < count; i++) {
sum += NumericUtils.sortableLongToDouble(valuesIterator.nextValue());
}
}
}
}

predicateValueToSum.put(predicateValue, sum);
}

return predicateValueToSum;
}


@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
return buildAggregationsForVariableBuckets(owningBucketOrds, bucketOrds, (bucketValue, docCount, subAggregationResults) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -225,4 +226,36 @@ public StarTreeResult(
this.maxMatchedDoc = maxMatchedDoc;
}
}


public static Map<Long, FixedBitSet> getPredicateValueToFixedBitSetMap(
StarTreeValues starTreeValues,
String predicateField
) throws IOException {

Map<Long, FixedBitSet> predicateValueToBitSet = new HashMap<>();

// 1. Get all distinct values for the predicate field from the star-tree
SortedNumericStarTreeValuesIterator valuesIterator =
(SortedNumericStarTreeValuesIterator) starTreeValues.getDimensionValuesIterator(predicateField);

Set<Long> distinctValues = new HashSet<>();
while (valuesIterator.nextEntry() != NO_MORE_DOCS) {
for (int i = 0; i < valuesIterator.entryValueCount(); i++) {
distinctValues.add(valuesIterator.nextValue());
}
}

// 2. For each distinct value, create a predicate map and call getStarTreeResult
for (Long value : distinctValues) {
Map<String, Long> predicateEvaluators = new HashMap<>();
predicateEvaluators.put(predicateField, value);

FixedBitSet bitSet = getStarTreeResult(starTreeValues, predicateEvaluators);
predicateValueToBitSet.put(value, bitSet);
}

return predicateValueToBitSet;
}

}

0 comments on commit d2cd24d

Please sign in to comment.