Skip to content

Commit

Permalink
[Backport 2.x] Improving the performance of date histogram aggregatio…
Browse files Browse the repository at this point in the history
…n (without any … (#11390)

* Improving the performance of date histogram aggregation (without any sub-aggregation) (#11083)

* Adding filter based optimization logic to date histogram aggregation

Signed-off-by: Ankit Jain <[email protected]>

* Reading the field name for aggregation correctly

Signed-off-by: Ankit Jain <[email protected]>

* Adding the limit on number of buckets for filter aggregation

Signed-off-by: Ankit Jain <[email protected]>

* Applying the optimizations for match all query as well

Signed-off-by: Ankit Jain <[email protected]>

* Handling the unwrapped match all query

Signed-off-by: Ankit Jain <[email protected]>

* Adding logic for recursively unwrapping the query

Signed-off-by: Ankit Jain <[email protected]>

* Restructuring the code for making it more reusable and unit testable

Signed-off-by: Ankit Jain <[email protected]>

* Adding javadocs for fixing build failure

Signed-off-by: Ankit Jain <[email protected]>

* Fixing minor bugs in refactoring

Signed-off-by: Ankit Jain <[email protected]>

* Adding logic for optimizing auto date histogram

Signed-off-by: Ankit Jain <[email protected]>

* Fixing bugs and passing unit tests for date histogram

Signed-off-by: Ankit Jain <[email protected]>

* Temporarily reverting auto date histogram changes

Signed-off-by: Ankit Jain <[email protected]>

* Fixing spotless check bugs

Signed-off-by: Ankit Jain <[email protected]>

* Adding back auto date histogram and passing all unit tests

Signed-off-by: Ankit Jain <[email protected]>

* Fixing the integration tests for reduced collector work

Signed-off-by: Ankit Jain <[email protected]>

* Fixing the integration test regression

Signed-off-by: Ankit Jain <[email protected]>

* Addressing code review comments

Signed-off-by: Ankit Jain <[email protected]>

* Fixing hardbound, missing and script test cases

Signed-off-by: Ankit Jain <[email protected]>

* Removing collect_count validation to prevent backward compatibility tests from failing

Signed-off-by: Ankit Jain <[email protected]>

* Finally fixing hardbounds test case

Signed-off-by: Ankit Jain <[email protected]>

* Refactoring code for reusability

Signed-off-by: Ankit Jain <[email protected]>

---------

Signed-off-by: Ankit Jain <[email protected]>
(cherry picked from commit 0ddbd96)

* Revert Rounding API visibility changes

Signed-off-by: Ankit Jain <[email protected]>

* Reverting missed rounding API visibility change

Co-authored-by: Andriy Redko <[email protected]>
Signed-off-by: Ankit Jain <[email protected]>

---------

Signed-off-by: Ankit Jain <[email protected]>
Co-authored-by: Andriy Redko <[email protected]>
  • Loading branch information
jainankitk and reta authored Dec 1, 2023
1 parent 2d0ab2b commit ffaabc2
Show file tree
Hide file tree
Showing 8 changed files with 445 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Disallow removing some metadata fields by remove ingest processor ([#10895](https://github.com/opensearch-project/OpenSearch/pull/10895))
- Performance improvement for MultiTerm Queries on Keyword fields ([#7057](https://github.com/opensearch-project/OpenSearch/issues/7057))
- Refactor common parts from the Rounding class into a separate 'round' package ([#11023](https://github.com/opensearch-project/OpenSearch/issues/11023))
- Performance improvement for date histogram aggregations without sub-aggregations ([#11083](https://github.com/opensearch-project/OpenSearch/pull/11083))
- Disable concurrent aggs for Diversified Sampler and Sampler aggs ([#11087](https://github.com/opensearch-project/OpenSearch/issues/11087))
- Made leader/follower check timeout setting dynamic ([#10528](https://github.com/opensearch-project/OpenSearch/pull/10528))
- Change error message when per shard document limit is breached ([#11312](https://github.com/opensearch-project/OpenSearch/pull/11312))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,6 @@ setup:
- match: { aggregations.histo.buckets.0.doc_count: 2 }
- match: { profile.shards.0.aggregations.0.type: DateHistogramAggregator }
- match: { profile.shards.0.aggregations.0.description: histo }
- match: { profile.shards.0.aggregations.0.breakdown.collect_count: 4 }
- match: { profile.shards.0.aggregations.0.debug.total_buckets: 3 }

---
Expand Down
38 changes: 38 additions & 0 deletions server/src/main/java/org/opensearch/common/Rounding.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.time.OffsetDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.TextStyle;
import java.time.temporal.ChronoField;
import java.time.temporal.ChronoUnit;
import java.time.temporal.IsoFields;
Expand All @@ -65,6 +66,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -1383,4 +1385,40 @@ public static Rounding read(StreamInput in) throws IOException {
throw new OpenSearchException("unknown rounding id [" + id + "]");
}
}

/**
* Extracts the interval value from the {@link Rounding} instance
* @param rounding {@link Rounding} instance
* @return the interval value from the {@link Rounding} instance or {@code OptionalLong.empty()}
* if the interval is not available
*/
public static OptionalLong getInterval(Rounding rounding) {
long interval = 0;

if (rounding instanceof TimeUnitRounding) {
interval = (((TimeUnitRounding) rounding).unit).extraLocalOffsetLookup();
if (!isUTCTimeZone(((TimeUnitRounding) rounding).timeZone)) {
// Fast filter aggregation cannot be used if it needs time zone rounding
return OptionalLong.empty();
}
} else if (rounding instanceof TimeIntervalRounding) {
interval = ((TimeIntervalRounding) rounding).interval;
if (!isUTCTimeZone(((TimeIntervalRounding) rounding).timeZone)) {
// Fast filter aggregation cannot be used if it needs time zone rounding
return OptionalLong.empty();
}
} else {
return OptionalLong.empty();
}

return OptionalLong.of(interval);
}

/**
* Helper function for checking if the time zone requested for date histogram
* aggregation is utc or not
*/
private static boolean isUTCTimeZone(final ZoneId zoneId) {
return "Z".equals(zoneId.getDisplayName(TextStyle.FULL, Locale.ENGLISH));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,16 @@ public long parse(String value) {
return resolution.convert(DateFormatters.from(dateTimeFormatter().parse(value), dateTimeFormatter().locale()).toInstant());
}

public long convertNanosToMillis(long nanoSecondsSinceEpoch) {
if (resolution.numericType.equals(NumericType.DATE_NANOSECONDS)) return DateUtils.toMilliSeconds(nanoSecondsSinceEpoch);
return nanoSecondsSinceEpoch;
}

public long convertRoundedMillisToNanos(long milliSecondsSinceEpoch) {
if (resolution.numericType.equals(NumericType.DATE_NANOSECONDS)) return DateUtils.toNanoSeconds(milliSecondsSinceEpoch);
return milliSecondsSinceEpoch;
}

@Override
public ValueFetcher valueFetcher(QueryShardContext context, SearchLookup searchLookup, String format) {
DateFormatter defaultFormatter = dateTimeFormatter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.CollectionUtil;
import org.opensearch.common.Rounding;
import org.opensearch.common.Rounding.Prepared;
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.util.IntArray;
import org.opensearch.common.util.LongArray;
import org.opensearch.core.common.util.ByteArray;
import org.opensearch.index.mapper.DateFieldMapper;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.aggregations.Aggregator;
import org.opensearch.search.aggregations.AggregatorFactories;
Expand Down Expand Up @@ -125,9 +127,13 @@ static AutoDateHistogramAggregator build(
* {@link MergingBucketsDeferringCollector#mergeBuckets(long[])}.
*/
private MergingBucketsDeferringCollector deferringCollector;
private final Weight[] filters;
private final DateFieldMapper.DateFieldType fieldType;

protected final RoundingInfo[] roundingInfos;
protected final int targetBuckets;
protected int roundingIdx;
protected Rounding.Prepared preparedRounding;

private AutoDateHistogramAggregator(
String name,
Expand All @@ -148,8 +154,51 @@ private AutoDateHistogramAggregator(
this.formatter = valuesSourceConfig.format();
this.roundingInfos = roundingInfos;
this.roundingPreparer = roundingPreparer;
this.preparedRounding = prepareRounding(0);

FilterRewriteHelper.FilterContext filterContext = FilterRewriteHelper.buildFastFilterContext(
parent(),
subAggregators.length,
context,
b -> getMinimumRounding(b[0], b[1]),
// Passing prepared rounding as supplier to ensure the correct prepared
// rounding is set as it is done during getMinimumRounding
() -> preparedRounding,
valuesSourceConfig,
fc -> FilterRewriteHelper.getAggregationBounds(context, fc.field())
);
if (filterContext != null) {
fieldType = filterContext.fieldType;
filters = filterContext.filters;
} else {
fieldType = null;
filters = null;
}
}

private Rounding getMinimumRounding(final long low, final long high) {
// max - min / targetBuckets = bestDuration
// find the right innerInterval this bestDuration belongs to
// since we cannot exceed targetBuckets, bestDuration should go up,
// so the right innerInterval should be an upper bound
long bestDuration = (high - low) / targetBuckets;
while (roundingIdx < roundingInfos.length - 1) {
final RoundingInfo curRoundingInfo = roundingInfos[roundingIdx];
final int temp = curRoundingInfo.innerIntervals[curRoundingInfo.innerIntervals.length - 1];
// If the interval duration is covered by the maximum inner interval,
// we can start with this outer interval for creating the buckets
if (bestDuration <= temp * curRoundingInfo.roughEstimateDurationMillis) {
break;
}
roundingIdx++;
}

preparedRounding = prepareRounding(roundingIdx);
return roundingInfos[roundingIdx].rounding;
}

protected abstract LongKeyedBucketOrds getBucketOrds();

@Override
public final ScoreMode scoreMode() {
if (valuesSource != null && valuesSource.needsScores()) {
Expand All @@ -176,7 +225,32 @@ public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBuc
if (valuesSource == null) {
return LeafBucketCollector.NO_OP_COLLECTOR;
}
return getLeafCollector(valuesSource.longValues(ctx), sub);

final SortedNumericDocValues values = valuesSource.longValues(ctx);
final LeafBucketCollector iteratingCollector = getLeafCollector(values, sub);

// Need to be declared as final and array for usage within the
// LeafBucketCollectorBase subclass below
final boolean[] useOpt = new boolean[1];
useOpt[0] = filters != null;

return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
// Try fast filter aggregation if the filters have been created
// Skip if tried before and gave incorrect/incomplete results
if (useOpt[0]) {
useOpt[0] = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType, (key, count) -> {
incrementBucketDocCount(
FilterRewriteHelper.getBucketOrd(getBucketOrds().add(owningBucketOrd, preparedRounding.round(key))),
count
);
});
}

iteratingCollector.collect(doc, owningBucketOrd);
}
};
}

protected final InternalAggregation[] buildAggregations(
Expand Down Expand Up @@ -247,8 +321,6 @@ protected final void merge(long[] mergeMap, long newNumBuckets) {
* @opensearch.internal
*/
private static class FromSingle extends AutoDateHistogramAggregator {
private int roundingIdx;
private Rounding.Prepared preparedRounding;
/**
* Map from value to bucket ordinals.
* <p>
Expand Down Expand Up @@ -286,10 +358,14 @@ private static class FromSingle extends AutoDateHistogramAggregator {
metadata
);

preparedRounding = prepareRounding(0);
bucketOrds = new LongKeyedBucketOrds.FromSingle(context.bigArrays());
}

@Override
protected LongKeyedBucketOrds getBucketOrds() {
return bucketOrds;
}

@Override
protected LeafBucketCollector getLeafCollector(SortedNumericDocValues values, LeafBucketCollector sub) throws IOException {
return new LeafBucketCollectorBase(sub, values) {
Expand Down Expand Up @@ -507,6 +583,11 @@ private static class FromMany extends AutoDateHistogramAggregator {
liveBucketCountUnderestimate = context.bigArrays().newIntArray(1, true);
}

@Override
protected LongKeyedBucketOrds getBucketOrds() {
return bucketOrds;
}

@Override
protected LeafBucketCollector getLeafCollector(SortedNumericDocValues values, LeafBucketCollector sub) throws IOException {
return new LeafBucketCollectorBase(sub, values) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.CollectionUtil;
import org.opensearch.common.Nullable;
import org.opensearch.common.Rounding;
import org.opensearch.common.lease.Releasables;
import org.opensearch.index.mapper.DateFieldMapper;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.aggregations.Aggregator;
import org.opensearch.search.aggregations.AggregatorFactories;
Expand All @@ -48,6 +50,7 @@
import org.opensearch.search.aggregations.LeafBucketCollectorBase;
import org.opensearch.search.aggregations.bucket.BucketsAggregator;
import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
import org.opensearch.search.aggregations.support.FieldContext;
import org.opensearch.search.aggregations.support.ValuesSource;
import org.opensearch.search.aggregations.support.ValuesSourceConfig;
import org.opensearch.search.internal.SearchContext;
Expand All @@ -66,7 +69,6 @@
* @opensearch.internal
*/
class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAggregator {

private final ValuesSource.Numeric valuesSource;
private final DocValueFormat formatter;
private final Rounding rounding;
Expand All @@ -76,12 +78,12 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg
private final Rounding.Prepared preparedRounding;
private final BucketOrder order;
private final boolean keyed;

private final long minDocCount;
private final LongBounds extendedBounds;
private final LongBounds hardBounds;

private final Weight[] filters;
private final LongKeyedBucketOrds bucketOrds;
private final DateFieldMapper.DateFieldType fieldType;

DateHistogramAggregator(
String name,
Expand All @@ -99,7 +101,6 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg
CardinalityUpperBound cardinality,
Map<String, Object> metadata
) throws IOException {

super(name, factories, aggregationContext, parent, CardinalityUpperBound.MANY, metadata);
this.rounding = rounding;
this.preparedRounding = preparedRounding;
Expand All @@ -114,6 +115,35 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg
this.formatter = valuesSourceConfig.format();

bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), cardinality);

FilterRewriteHelper.FilterContext filterContext = FilterRewriteHelper.buildFastFilterContext(
parent,
subAggregators.length,
context,
x -> rounding,
() -> preparedRounding,
valuesSourceConfig,
this::computeBounds
);
if (filterContext != null) {
fieldType = filterContext.fieldType;
filters = filterContext.filters;
} else {
filters = null;
fieldType = null;
}
}

private long[] computeBounds(final FieldContext fieldContext) throws IOException {
final long[] bounds = FilterRewriteHelper.getAggregationBounds(context, fieldContext.field());
if (bounds != null) {
// Update min/max limit if user specified any hard bounds
if (hardBounds != null) {
bounds[0] = Math.max(bounds[0], hardBounds.getMin());
bounds[1] = Math.min(bounds[1], hardBounds.getMax() - 1); // hard bounds max is exclusive
}
}
return bounds;
}

@Override
Expand All @@ -129,10 +159,27 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol
if (valuesSource == null) {
return LeafBucketCollector.NO_OP_COLLECTOR;
}

// Need to be declared as final and array for usage within the
// LeafBucketCollectorBase subclass below
final boolean[] useOpt = new boolean[1];
useOpt[0] = filters != null;

SortedNumericDocValues values = valuesSource.longValues(ctx);
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
// Try fast filter aggregation if the filters have been created
// Skip if tried before and gave incorrect/incomplete results
if (useOpt[0]) {
useOpt[0] = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType, (key, count) -> {
incrementBucketDocCount(
FilterRewriteHelper.getBucketOrd(bucketOrds.add(owningBucketOrd, preparedRounding.round(key))),
count
);
});
}

if (values.advanceExact(doc)) {
int valuesCount = values.docValueCount();

Expand Down
Loading

0 comments on commit ffaabc2

Please sign in to comment.