Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Apply fast date histogram optimization at the segment level #12234

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,9 @@ public void setupSuiteScopeCluster() throws Exception {
indexDoc(2, 15, 3), // date: Feb 15, dates: Feb 15, Mar 16
indexDoc(3, 2, 4), // date: Mar 2, dates: Mar 2, Apr 3
indexDoc(3, 15, 5), // date: Mar 15, dates: Mar 15, Apr 16
indexDoc(3, 23, 6)
indexDoc(3, 23, 6) // date: Mar 23, dates: Mar 23, Apr 24
)
); // date: Mar 23, dates: Mar 23, Apr 24
);
indexRandom(true, builders);
ensureSearchable();
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -164,24 +164,55 @@ final class CompositeAggregator extends BucketsAggregator {
this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size, rawAfterKey);
this.rawAfterKey = rawAfterKey;

fastFilterContext = new FastFilterRewriteHelper.FastFilterContext();
fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(context);
if (!FastFilterRewriteHelper.isCompositeAggRewriteable(sourceConfigs)) return;
fastFilterContext.setAggregationType(
new FastFilterRewriteHelper.CompositeAggregationType(sourceConfigs, rawAfterKey, formats, size)
);
fastFilterContext.setAggregationType(new CompositeAggregationType());
if (fastFilterContext.isRewriteable(parent, subAggregators.length)) {
// bucketOrds is the data structure for saving date histogram results
// bucketOrds is used for saving date histogram results
bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), CardinalityUpperBound.ONE);
// Currently the filter rewrite is only supported for date histograms
FastFilterRewriteHelper.CompositeAggregationType aggregationType =
(FastFilterRewriteHelper.CompositeAggregationType) fastFilterContext.aggregationType;
preparedRounding = aggregationType.getRoundingPreparer();
fastFilterContext.buildFastFilter(
context,
fc -> FastFilterRewriteHelper.getAggregationBounds(context, fc.getFieldType().name()),
x -> aggregationType.getRounding(),
() -> preparedRounding
);
preparedRounding = ((CompositeAggregationType) fastFilterContext.getAggregationType()).getRoundingPrepared();
fastFilterContext.buildFastFilter();
}
}

/**
* Currently the filter rewrite is only supported for date histograms
*/
private class CompositeAggregationType extends FastFilterRewriteHelper.AbstractDateHistogramAggregationType {
private final RoundingValuesSource valuesSource;
private long afterKey = -1L;

public CompositeAggregationType() {
super(sourceConfigs[0].fieldType(), sourceConfigs[0].missingBucket(), sourceConfigs[0].hasScript());
this.valuesSource = (RoundingValuesSource) sourceConfigs[0].valuesSource();
if (rawAfterKey != null) {
assert rawAfterKey.size() == 1 && formats.size() == 1;
this.afterKey = formats.get(0).parseLong(rawAfterKey.get(0).toString(), false, () -> {
throw new IllegalArgumentException("now() is not supported in [after] key");
});
}
}

public Rounding getRounding(final long low, final long high) {
return valuesSource.getRounding();
}

public Rounding.Prepared getRoundingPrepared() {
return valuesSource.getPreparedRounding();
}

@Override
protected void processAfterKey(long[] bound, long interval) {
// afterKey is the last bucket key in previous response, and the bucket key
// is the minimum of all values in the bucket, so need to add the interval
if (afterKey != -1L) {
bound[0] = afterKey + interval;
}
}

@Override
public int getSize() {
return size;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.common.util.IntArray;
import org.opensearch.common.util.LongArray;
import org.opensearch.core.common.util.ByteArray;
import org.opensearch.index.mapper.MappedFieldType;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.aggregations.Aggregator;
import org.opensearch.search.aggregations.AggregatorFactories;
Expand Down Expand Up @@ -156,45 +157,53 @@ private AutoDateHistogramAggregator(
this.roundingPreparer = roundingPreparer;
this.preparedRounding = prepareRounding(0);

fastFilterContext = new FastFilterRewriteHelper.FastFilterContext();
fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(context);
fastFilterContext.setAggregationType(
new FastFilterRewriteHelper.DateHistogramAggregationType(
new AutoHistogramAggregationType(
valuesSourceConfig.fieldType(),
valuesSourceConfig.missing() != null,
valuesSourceConfig.script() != null
)
);
if (fastFilterContext.isRewriteable(parent, subAggregators.length)) {
fastFilterContext.buildFastFilter(
context,
fc -> FastFilterRewriteHelper.getAggregationBounds(context, fc.getFieldType().name()),
b -> getMinimumRounding(b[0], b[1]),
// Passing prepared rounding as supplier to ensure the correct prepared
// rounding is set as it is done during getMinimumRounding
() -> preparedRounding
);
fastFilterContext.buildFastFilter();
}
}

private Rounding getMinimumRounding(final long low, final long high) {
// max - min / targetBuckets = bestDuration
// find the right innerInterval this bestDuration belongs to
// since we cannot exceed targetBuckets, bestDuration should go up,
// so the right innerInterval should be an upper bound
long bestDuration = (high - low) / targetBuckets;
while (roundingIdx < roundingInfos.length - 1) {
final RoundingInfo curRoundingInfo = roundingInfos[roundingIdx];
final int temp = curRoundingInfo.innerIntervals[curRoundingInfo.innerIntervals.length - 1];
// If the interval duration is covered by the maximum inner interval,
// we can start with this outer interval for creating the buckets
if (bestDuration <= temp * curRoundingInfo.roughEstimateDurationMillis) {
break;
private class AutoHistogramAggregationType extends FastFilterRewriteHelper.AbstractDateHistogramAggregationType {

public AutoHistogramAggregationType(MappedFieldType fieldType, boolean missing, boolean hasScript) {
super(fieldType, missing, hasScript);
}

@Override
protected Rounding getRounding(final long low, final long high) {
// max - min / targetBuckets = bestDuration
// find the right innerInterval this bestDuration belongs to
// since we cannot exceed targetBuckets, bestDuration should go up,
// so the right innerInterval should be an upper bound
long bestDuration = (high - low) / targetBuckets;
// reset so this function is idempotent
roundingIdx = 0;
while (roundingIdx < roundingInfos.length - 1) {
final RoundingInfo curRoundingInfo = roundingInfos[roundingIdx];
final int temp = curRoundingInfo.innerIntervals[curRoundingInfo.innerIntervals.length - 1];
// If the interval duration is covered by the maximum inner interval,
// we can start with this outer interval for creating the buckets
if (bestDuration <= temp * curRoundingInfo.roughEstimateDurationMillis) {
break;
}
roundingIdx++;
}
roundingIdx++;

preparedRounding = prepareRounding(roundingIdx);
return roundingInfos[roundingIdx].rounding;
}

preparedRounding = prepareRounding(roundingIdx);
return roundingInfos[roundingIdx].rounding;
@Override
protected Prepared getRoundingPrepared() {
return preparedRounding;
}
}

protected abstract LongKeyedBucketOrds getBucketOrds();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.common.Nullable;
import org.opensearch.common.Rounding;
import org.opensearch.common.lease.Releasables;
import org.opensearch.index.mapper.MappedFieldType;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.aggregations.Aggregator;
import org.opensearch.search.aggregations.AggregatorFactories;
Expand Down Expand Up @@ -115,29 +116,35 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg

bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), cardinality);

fastFilterContext = new FastFilterRewriteHelper.FastFilterContext();
fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(context);
fastFilterContext.setAggregationType(
new FastFilterRewriteHelper.DateHistogramAggregationType(
new DateHistogramAggregationType(
valuesSourceConfig.fieldType(),
valuesSourceConfig.missing() != null,
valuesSourceConfig.script() != null
valuesSourceConfig.script() != null,
hardBounds
)
);
if (fastFilterContext.isRewriteable(parent, subAggregators.length)) {
fastFilterContext.buildFastFilter(context, this::computeBounds, x -> rounding, () -> preparedRounding);
fastFilterContext.buildFastFilter();
}
}

private long[] computeBounds(final FastFilterRewriteHelper.DateHistogramAggregationType fieldContext) throws IOException {
final long[] bounds = FastFilterRewriteHelper.getAggregationBounds(context, fieldContext.getFieldType().name());
if (bounds != null) {
// Update min/max limit if user specified any hard bounds
if (hardBounds != null) {
bounds[0] = Math.max(bounds[0], hardBounds.getMin());
bounds[1] = Math.min(bounds[1], hardBounds.getMax() - 1); // hard bounds max is exclusive
}
private class DateHistogramAggregationType extends FastFilterRewriteHelper.AbstractDateHistogramAggregationType {

public DateHistogramAggregationType(MappedFieldType fieldType, boolean missing, boolean hasScript, LongBounds hardBounds) {
super(fieldType, missing, hasScript, hardBounds);
}

@Override
protected Rounding getRounding(long low, long high) {
return rounding;
}

@Override
protected Rounding.Prepared getRoundingPrepared() {
return preparedRounding;
}
return bounds;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.DocValuesFieldExistsQuery;
import org.apache.lucene.search.FieldExistsQuery;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.TermQuery;
import org.opensearch.OpenSearchParseException;
Expand Down Expand Up @@ -1253,6 +1254,74 @@ public void testMultiValuedWithKeywordLongAndDouble() throws Exception {
);
}

public void testDateHistogramSourceWithSize() throws IOException {
final List<Map<String, List<Object>>> dataset = new ArrayList<>(
Arrays.asList(
createDocument("date", asLong("2017-10-20T03:08:45")),
createDocument("date", asLong("2016-09-20T09:00:34")),
createDocument("date", asLong("2016-09-20T11:34:00")),
createDocument("date", asLong("2017-10-20T06:09:24")),
createDocument("date", asLong("2017-10-19T06:09:24")),
createDocument("long", 4L)
)
);
testSearchCase(
Arrays.asList(
new MatchAllDocsQuery(),
new FieldExistsQuery("date"),
LongPoint.newRangeQuery("date", asLong("2016-09-20T09:00:34"), asLong("2017-10-20T06:09:24"))
),
dataset,
() -> {
DateHistogramValuesSourceBuilder histo = new DateHistogramValuesSourceBuilder("date").field("date")
.calendarInterval(DateHistogramInterval.days(1));
return new CompositeAggregationBuilder("name", Collections.singletonList(histo)).size(1);
},
(result) -> {
assertEquals(1, result.getBuckets().size());
assertEquals("{date=1474329600000}", result.afterKey().toString()); // 2017-10-20T00:00:00
assertEquals("{date=1474329600000}", result.getBuckets().get(0).getKeyAsString());
assertEquals(2L, result.getBuckets().get(0).getDocCount());
}
);
}

public void testDateHistogramSourceWithDocCountField() throws IOException {
final List<Map<String, List<Object>>> dataset = new ArrayList<>(
Arrays.asList(
createDocument("date", asLong("2017-10-20T03:08:45"), "_doc_count", 5),
createDocument("date", asLong("2016-09-20T09:00:34")),
createDocument("date", asLong("2016-09-20T11:34:00"), "_doc_count", 2),
createDocument("date", asLong("2017-10-20T06:09:24")),
createDocument("date", asLong("2017-10-19T06:09:24"), "_doc_count", 3),
createDocument("long", 4L)
)
);
testSearchCase(
Arrays.asList(
new MatchAllDocsQuery(),
new FieldExistsQuery("date"),
LongPoint.newRangeQuery("date", asLong("2016-09-20T09:00:34"), asLong("2017-10-20T06:09:24"))
),
dataset,
() -> {
DateHistogramValuesSourceBuilder histo = new DateHistogramValuesSourceBuilder("date").field("date")
.calendarInterval(DateHistogramInterval.days(1));
return new CompositeAggregationBuilder("name", Collections.singletonList(histo));
},
(result) -> {
assertEquals(3, result.getBuckets().size());
assertEquals("{date=1508457600000}", result.afterKey().toString());
assertEquals("{date=1474329600000}", result.getBuckets().get(0).getKeyAsString());
assertEquals(3L, result.getBuckets().get(0).getDocCount());
assertEquals("{date=1508371200000}", result.getBuckets().get(1).getKeyAsString());
assertEquals(3L, result.getBuckets().get(1).getDocCount());
assertEquals("{date=1508457600000}", result.getBuckets().get(2).getKeyAsString());
assertEquals(6L, result.getBuckets().get(2).getDocCount());
}
);
}

public void testWithDateHistogram() throws IOException {
final List<Map<String, List<Object>>> dataset = new ArrayList<>();
dataset.addAll(
Expand Down
Loading
Loading