Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Improving the performance of date histogram aggregation (without any … #11390

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Disallow removing some metadata fields by remove ingest processor ([#10895](https://github.com/opensearch-project/OpenSearch/pull/10895))
- Performance improvement for MultiTerm Queries on Keyword fields ([#7057](https://github.com/opensearch-project/OpenSearch/issues/7057))
- Refactor common parts from the Rounding class into a separate 'round' package ([#11023](https://github.com/opensearch-project/OpenSearch/issues/11023))
- Performance improvement for date histogram aggregations without sub-aggregations ([#11083](https://github.com/opensearch-project/OpenSearch/pull/11083))
- Disable concurrent aggs for Diversified Sampler and Sampler aggs ([#11087](https://github.com/opensearch-project/OpenSearch/issues/11087))
- Made leader/follower check timeout setting dynamic ([#10528](https://github.com/opensearch-project/OpenSearch/pull/10528))
- Change error message when per shard document limit is breached ([#11312](https://github.com/opensearch-project/OpenSearch/pull/11312))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,6 @@ setup:
- match: { aggregations.histo.buckets.0.doc_count: 2 }
- match: { profile.shards.0.aggregations.0.type: DateHistogramAggregator }
- match: { profile.shards.0.aggregations.0.description: histo }
- match: { profile.shards.0.aggregations.0.breakdown.collect_count: 4 }
- match: { profile.shards.0.aggregations.0.debug.total_buckets: 3 }

---
Expand Down
38 changes: 38 additions & 0 deletions server/src/main/java/org/opensearch/common/Rounding.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.time.OffsetDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.TextStyle;
import java.time.temporal.ChronoField;
import java.time.temporal.ChronoUnit;
import java.time.temporal.IsoFields;
Expand All @@ -65,6 +66,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -1383,4 +1385,40 @@
throw new OpenSearchException("unknown rounding id [" + id + "]");
}
}

/**
* Extracts the interval value from the {@link Rounding} instance
* @param rounding {@link Rounding} instance
* @return the interval value from the {@link Rounding} instance or {@code OptionalLong.empty()}
* if the interval is not available
*/
public static OptionalLong getInterval(Rounding rounding) {
long interval = 0;

if (rounding instanceof TimeUnitRounding) {
interval = (((TimeUnitRounding) rounding).unit).extraLocalOffsetLookup();
if (!isUTCTimeZone(((TimeUnitRounding) rounding).timeZone)) {
// Fast filter aggregation cannot be used if it needs time zone rounding
return OptionalLong.empty();
}
} else if (rounding instanceof TimeIntervalRounding) {
interval = ((TimeIntervalRounding) rounding).interval;
if (!isUTCTimeZone(((TimeIntervalRounding) rounding).timeZone)) {
// Fast filter aggregation cannot be used if it needs time zone rounding
return OptionalLong.empty();

Check warning on line 1408 in server/src/main/java/org/opensearch/common/Rounding.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/Rounding.java#L1408

Added line #L1408 was not covered by tests
}
} else {
return OptionalLong.empty();

Check warning on line 1411 in server/src/main/java/org/opensearch/common/Rounding.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/Rounding.java#L1411

Added line #L1411 was not covered by tests
}

return OptionalLong.of(interval);
}

/**
* Helper function for checking if the time zone requested for date histogram
* aggregation is utc or not
*/
private static boolean isUTCTimeZone(final ZoneId zoneId) {
return "Z".equals(zoneId.getDisplayName(TextStyle.FULL, Locale.ENGLISH));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,16 @@ public long parse(String value) {
return resolution.convert(DateFormatters.from(dateTimeFormatter().parse(value), dateTimeFormatter().locale()).toInstant());
}

public long convertNanosToMillis(long nanoSecondsSinceEpoch) {
if (resolution.numericType.equals(NumericType.DATE_NANOSECONDS)) return DateUtils.toMilliSeconds(nanoSecondsSinceEpoch);
return nanoSecondsSinceEpoch;
}

public long convertRoundedMillisToNanos(long milliSecondsSinceEpoch) {
if (resolution.numericType.equals(NumericType.DATE_NANOSECONDS)) return DateUtils.toNanoSeconds(milliSecondsSinceEpoch);
return milliSecondsSinceEpoch;
}

@Override
public ValueFetcher valueFetcher(QueryShardContext context, SearchLookup searchLookup, String format) {
DateFormatter defaultFormatter = dateTimeFormatter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.CollectionUtil;
import org.opensearch.common.Rounding;
import org.opensearch.common.Rounding.Prepared;
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.util.IntArray;
import org.opensearch.common.util.LongArray;
import org.opensearch.core.common.util.ByteArray;
import org.opensearch.index.mapper.DateFieldMapper;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.aggregations.Aggregator;
import org.opensearch.search.aggregations.AggregatorFactories;
Expand Down Expand Up @@ -125,9 +127,13 @@
* {@link MergingBucketsDeferringCollector#mergeBuckets(long[])}.
*/
private MergingBucketsDeferringCollector deferringCollector;
private final Weight[] filters;
private final DateFieldMapper.DateFieldType fieldType;

protected final RoundingInfo[] roundingInfos;
protected final int targetBuckets;
protected int roundingIdx;
protected Rounding.Prepared preparedRounding;

private AutoDateHistogramAggregator(
String name,
Expand All @@ -148,8 +154,51 @@
this.formatter = valuesSourceConfig.format();
this.roundingInfos = roundingInfos;
this.roundingPreparer = roundingPreparer;
this.preparedRounding = prepareRounding(0);

FilterRewriteHelper.FilterContext filterContext = FilterRewriteHelper.buildFastFilterContext(
parent(),
subAggregators.length,
context,
b -> getMinimumRounding(b[0], b[1]),
// Passing prepared rounding as supplier to ensure the correct prepared
// rounding is set as it is done during getMinimumRounding
() -> preparedRounding,
valuesSourceConfig,
fc -> FilterRewriteHelper.getAggregationBounds(context, fc.field())
);
if (filterContext != null) {
fieldType = filterContext.fieldType;
filters = filterContext.filters;
} else {
fieldType = null;
filters = null;
}
}

private Rounding getMinimumRounding(final long low, final long high) {
// max - min / targetBuckets = bestDuration
// find the right innerInterval this bestDuration belongs to
// since we cannot exceed targetBuckets, bestDuration should go up,
// so the right innerInterval should be an upper bound
long bestDuration = (high - low) / targetBuckets;
while (roundingIdx < roundingInfos.length - 1) {
final RoundingInfo curRoundingInfo = roundingInfos[roundingIdx];
final int temp = curRoundingInfo.innerIntervals[curRoundingInfo.innerIntervals.length - 1];
// If the interval duration is covered by the maximum inner interval,
// we can start with this outer interval for creating the buckets
if (bestDuration <= temp * curRoundingInfo.roughEstimateDurationMillis) {
break;
}
roundingIdx++;
}

preparedRounding = prepareRounding(roundingIdx);
return roundingInfos[roundingIdx].rounding;
}

protected abstract LongKeyedBucketOrds getBucketOrds();

@Override
public final ScoreMode scoreMode() {
if (valuesSource != null && valuesSource.needsScores()) {
Expand All @@ -176,7 +225,32 @@
if (valuesSource == null) {
return LeafBucketCollector.NO_OP_COLLECTOR;
}
return getLeafCollector(valuesSource.longValues(ctx), sub);

final SortedNumericDocValues values = valuesSource.longValues(ctx);
final LeafBucketCollector iteratingCollector = getLeafCollector(values, sub);

// Need to be declared as final and array for usage within the
// LeafBucketCollectorBase subclass below
final boolean[] useOpt = new boolean[1];
useOpt[0] = filters != null;

return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
// Try fast filter aggregation if the filters have been created
// Skip if tried before and gave incorrect/incomplete results
if (useOpt[0]) {
useOpt[0] = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType, (key, count) -> {

Check warning on line 243 in server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java#L243

Added line #L243 was not covered by tests
incrementBucketDocCount(
FilterRewriteHelper.getBucketOrd(getBucketOrds().add(owningBucketOrd, preparedRounding.round(key))),
count
);
});
}

iteratingCollector.collect(doc, owningBucketOrd);
}
};
}

protected final InternalAggregation[] buildAggregations(
Expand Down Expand Up @@ -247,8 +321,6 @@
* @opensearch.internal
*/
private static class FromSingle extends AutoDateHistogramAggregator {
private int roundingIdx;
private Rounding.Prepared preparedRounding;
/**
* Map from value to bucket ordinals.
* <p>
Expand Down Expand Up @@ -286,10 +358,14 @@
metadata
);

preparedRounding = prepareRounding(0);
bucketOrds = new LongKeyedBucketOrds.FromSingle(context.bigArrays());
}

@Override
protected LongKeyedBucketOrds getBucketOrds() {
return bucketOrds;
}

@Override
protected LeafBucketCollector getLeafCollector(SortedNumericDocValues values, LeafBucketCollector sub) throws IOException {
return new LeafBucketCollectorBase(sub, values) {
Expand Down Expand Up @@ -507,6 +583,11 @@
liveBucketCountUnderestimate = context.bigArrays().newIntArray(1, true);
}

@Override
protected LongKeyedBucketOrds getBucketOrds() {
return bucketOrds;

Check warning on line 588 in server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java#L588

Added line #L588 was not covered by tests
}

@Override
protected LeafBucketCollector getLeafCollector(SortedNumericDocValues values, LeafBucketCollector sub) throws IOException {
return new LeafBucketCollectorBase(sub, values) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.CollectionUtil;
import org.opensearch.common.Nullable;
import org.opensearch.common.Rounding;
import org.opensearch.common.lease.Releasables;
import org.opensearch.index.mapper.DateFieldMapper;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.aggregations.Aggregator;
import org.opensearch.search.aggregations.AggregatorFactories;
Expand All @@ -48,6 +50,7 @@
import org.opensearch.search.aggregations.LeafBucketCollectorBase;
import org.opensearch.search.aggregations.bucket.BucketsAggregator;
import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
import org.opensearch.search.aggregations.support.FieldContext;
import org.opensearch.search.aggregations.support.ValuesSource;
import org.opensearch.search.aggregations.support.ValuesSourceConfig;
import org.opensearch.search.internal.SearchContext;
Expand All @@ -66,7 +69,6 @@
* @opensearch.internal
*/
class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAggregator {

private final ValuesSource.Numeric valuesSource;
private final DocValueFormat formatter;
private final Rounding rounding;
Expand All @@ -76,12 +78,12 @@
private final Rounding.Prepared preparedRounding;
private final BucketOrder order;
private final boolean keyed;

private final long minDocCount;
private final LongBounds extendedBounds;
private final LongBounds hardBounds;

private final Weight[] filters;
private final LongKeyedBucketOrds bucketOrds;
private final DateFieldMapper.DateFieldType fieldType;

DateHistogramAggregator(
String name,
Expand All @@ -99,7 +101,6 @@
CardinalityUpperBound cardinality,
Map<String, Object> metadata
) throws IOException {

super(name, factories, aggregationContext, parent, CardinalityUpperBound.MANY, metadata);
this.rounding = rounding;
this.preparedRounding = preparedRounding;
Expand All @@ -114,6 +115,35 @@
this.formatter = valuesSourceConfig.format();

bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), cardinality);

FilterRewriteHelper.FilterContext filterContext = FilterRewriteHelper.buildFastFilterContext(
parent,
subAggregators.length,
context,
x -> rounding,
() -> preparedRounding,
valuesSourceConfig,
this::computeBounds
);
if (filterContext != null) {
fieldType = filterContext.fieldType;
filters = filterContext.filters;
} else {
filters = null;
fieldType = null;
}
}

private long[] computeBounds(final FieldContext fieldContext) throws IOException {
final long[] bounds = FilterRewriteHelper.getAggregationBounds(context, fieldContext.field());
if (bounds != null) {
// Update min/max limit if user specified any hard bounds
if (hardBounds != null) {
bounds[0] = Math.max(bounds[0], hardBounds.getMin());
bounds[1] = Math.min(bounds[1], hardBounds.getMax() - 1); // hard bounds max is exclusive

Check warning on line 143 in server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java#L142-L143

Added lines #L142 - L143 were not covered by tests
}
}
return bounds;
}

@Override
Expand All @@ -129,10 +159,27 @@
if (valuesSource == null) {
return LeafBucketCollector.NO_OP_COLLECTOR;
}

// Need to be declared as final and array for usage within the
// LeafBucketCollectorBase subclass below
final boolean[] useOpt = new boolean[1];
useOpt[0] = filters != null;

SortedNumericDocValues values = valuesSource.longValues(ctx);
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
// Try fast filter aggregation if the filters have been created
// Skip if tried before and gave incorrect/incomplete results
if (useOpt[0]) {
useOpt[0] = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType, (key, count) -> {

Check warning on line 175 in server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java#L175

Added line #L175 was not covered by tests
incrementBucketDocCount(
FilterRewriteHelper.getBucketOrd(bucketOrds.add(owningBucketOrd, preparedRounding.round(key))),
count
);
});
}

if (values.advanceExact(doc)) {
int valuesCount = values.docValueCount();

Expand Down
Loading
Loading