Skip to content

Commit

Permalink
Revert "Avoid bucket copies in Aggs (elastic#110261)" (elastic#111758) (
Browse files Browse the repository at this point in the history
elastic#111761)

This reverts elastic#110261 which we can't land until elastic#111757 - we need to be
sure that the `equals` implementations on subclasses of
`InternalAggregations` is correct before this optimization is safe.

Closes elastic#111679
  • Loading branch information
nik9000 authored Aug 9, 2024
1 parent ba5ca2b commit 801e1b9
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 66 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/111758.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 111758
summary: Revert "Avoid bucket copies in Aggs"
area: Aggregations
type: bug
issues:
- 111679
Original file line number Diff line number Diff line change
Expand Up @@ -226,34 +226,11 @@ public static InternalAggregations topLevelReduce(List<InternalAggregations> agg
}
if (context.isFinalReduce()) {
List<InternalAggregation> reducedInternalAggs = reduced.getInternalAggregations();
List<InternalAggregation> internalAggregations = null;
for (int i = 0; i < reducedInternalAggs.size(); i++) {
InternalAggregation agg = reducedInternalAggs.get(i);
InternalAggregation internalAggregation = agg.reducePipelines(
agg,
context,
context.pipelineTreeRoot().subTree(agg.getName())
);
if (internalAggregation.equals(agg) == false) {
if (internalAggregations == null) {
internalAggregations = new ArrayList<>(reducedInternalAggs);
}
internalAggregations.set(i, internalAggregation);
}
}
reducedInternalAggs = reducedInternalAggs.stream()
.map(agg -> agg.reducePipelines(agg, context, context.pipelineTreeRoot().subTree(agg.getName())))
.collect(Collectors.toCollection(ArrayList::new));

var pipelineAggregators = context.pipelineTreeRoot().aggregators();
if (pipelineAggregators.isEmpty()) {
if (internalAggregations == null) {
return reduced;
}
return from(internalAggregations);
}
if (internalAggregations != null) {
reducedInternalAggs = internalAggregations;
}
reducedInternalAggs = new ArrayList<>(reducedInternalAggs);
for (PipelineAggregator pipelineAggregator : pipelineAggregators) {
for (PipelineAggregator pipelineAggregator : context.pipelineTreeRoot().aggregators()) {
SiblingPipelineAggregator sib = (SiblingPipelineAggregator) pipelineAggregator;
InternalAggregation newAgg = sib.doReduce(from(reducedInternalAggs), context);
reducedInternalAggs.add(newAgg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,31 +207,16 @@ public void forEachBucket(Consumer<InternalAggregations> consumer) {
}

private List<B> reducePipelineBuckets(AggregationReduceContext reduceContext, PipelineTree pipelineTree) {
List<B> reducedBuckets = null;
var buckets = getBuckets();
for (int bucketIndex = 0; bucketIndex < buckets.size(); bucketIndex++) {
B bucket = buckets.get(bucketIndex);
List<InternalAggregation> aggs = null;
int aggIndex = 0;
for (InternalAggregation agg : bucket.getAggregations()) {
List<B> reducedBuckets = new ArrayList<>();
for (B bucket : getBuckets()) {
List<InternalAggregation> aggs = new ArrayList<>();
for (Aggregation agg : bucket.getAggregations()) {
PipelineTree subTree = pipelineTree.subTree(agg.getName());
var reduced = agg.reducePipelines(agg, reduceContext, subTree);
if (reduced.equals(agg) == false) {
if (aggs == null) {
aggs = bucket.getAggregations().copyResults();
}
aggs.set(aggIndex, reduced);
}
aggIndex++;
}
if (aggs != null) {
if (reducedBuckets == null) {
reducedBuckets = new ArrayList<>(buckets);
}
reducedBuckets.set(bucketIndex, createBucket(InternalAggregations.from(aggs), bucket));
aggs.add(((InternalAggregation) agg).reducePipelines((InternalAggregation) agg, reduceContext, subTree));
}
reducedBuckets.add(createBucket(InternalAggregations.from(aggs), bucket));
}
return reducedBuckets == null ? buckets : reducedBuckets;
return reducedBuckets;
}

public abstract static class InternalBucket implements Bucket, Writeable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,6 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
double key = roundKey * interval + offset;
return new InternalHistogram.Bucket(key, docCount, keyed, formatter, subAggregationResults);
}, (owningBucketOrd, buckets) -> {
if (buckets.isEmpty()) {
return buildEmptyAggregation();
}
// the contract of the histogram aggregation is that shards must return buckets ordered by key in ascending order
CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,9 +340,6 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
return buildAggregationsForVariableBuckets(owningBucketOrds, bucketOrds, (bucketValue, docCount, subAggregationResults) -> {
return new InternalDateHistogram.Bucket(bucketValue, docCount, keyed, formatter, subAggregationResults);
}, (owningBucketOrd, buckets) -> {
if (buckets.isEmpty()) {
return buildEmptyAggregation();
}
// the contract of the histogram aggregation is that shards must return buckets ordered by key in ascending order
CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,17 +259,11 @@ BucketOrder getOrder() {

@Override
public InternalHistogram create(List<Bucket> buckets) {
if (this.buckets.equals(buckets)) {
return this;
}
return new InternalHistogram(name, buckets, order, minDocCount, emptyBucketInfo, format, keyed, metadata);
}

@Override
public Bucket createBucket(InternalAggregations aggregations, Bucket prototype) {
if (prototype.aggregations.equals(aggregations)) {
return prototype;
}
return new Bucket(prototype.key, prototype.docCount, prototype.keyed, prototype.format, aggregations);
}

Expand Down Expand Up @@ -456,9 +450,6 @@ public InternalAggregation get() {
CollectionUtil.introSort(reducedBuckets, order.comparator());
}
}
if (reducedBuckets.equals(buckets)) {
return InternalHistogram.this;
}
return new InternalHistogram(getName(), reducedBuckets, order, minDocCount, emptyBucketInfo, format, keyed, getMetadata());
}
};
Expand Down Expand Up @@ -504,9 +495,14 @@ public Number getKey(MultiBucketsAggregation.Bucket bucket) {
}

@Override
@SuppressWarnings({ "rawtypes", "unchecked" })
public InternalAggregation createAggregation(List<MultiBucketsAggregation.Bucket> buckets) {
return new InternalHistogram(name, (List) buckets, order, minDocCount, emptyBucketInfo, format, keyed, getMetadata());
// convert buckets to the right type
List<Bucket> buckets2 = new ArrayList<>(buckets.size());
for (Object b : buckets) {
buckets2.add((Bucket) b);
}
buckets2 = Collections.unmodifiableList(buckets2);
return new InternalHistogram(name, buckets2, order, minDocCount, emptyBucketInfo, format, keyed, getMetadata());
}

@Override
Expand Down

0 comments on commit 801e1b9

Please sign in to comment.