diff --git a/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java b/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java index 0bb2d1d7ca933..d87582a2e944a 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java +++ b/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java @@ -56,17 +56,19 @@ public String getCollectorReason() { @Override public ReduceableSearchResult reduce(Collection collectors) throws IOException { - final List aggregators = context.bucketCollectorProcessor().toAggregators(collectors); - final List internals = new ArrayList<>(aggregators.size()); + // This should get the InternalAggregations from the Aggregators +// final List aggregators = context.bucketCollectorProcessor().toAggregators(collectors); + final List internals = context.bucketCollectorProcessor().toInternalAggregations(collectors); +// final List internals = new ArrayList<>(aggregators.size()); context.aggregations().resetBucketMultiConsumer(); - for (Aggregator aggregator : aggregators) { - try { - // post collection is called in ContextIndexSearcher after search on leaves are completed - internals.add(aggregator.buildTopLevel()); - } catch (IOException e) { - throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e); - } - } +// for (Aggregator aggregator : aggregators) { +// try { +// // post collection is called in ContextIndexSearcher after search on leaves are completed +// internals.add(aggregator.buildTopLevel()); +// } catch (IOException e) { +// throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e); +// } +// } final InternalAggregations internalAggregations = InternalAggregations.from(internals); return buildAggregationResult(internalAggregations); diff --git a/server/src/main/java/org/opensearch/search/aggregations/AggregatorBase.java b/server/src/main/java/org/opensearch/search/aggregations/AggregatorBase.java index 47e9def094623..e7a5aaeab219b 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/AggregatorBase.java +++ b/server/src/main/java/org/opensearch/search/aggregations/AggregatorBase.java @@ -34,6 +34,7 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.ScoreMode; +import org.opensearch.common.SetOnce; import org.opensearch.core.common.breaker.CircuitBreaker; import org.opensearch.core.common.breaker.CircuitBreakingException; import org.opensearch.core.indices.breaker.CircuitBreakerService; @@ -72,6 +73,8 @@ public abstract class AggregatorBase extends Aggregator { private final CircuitBreakerService breakerService; private long requestBytesUsed; + private final SetOnce internalAggregation = new SetOnce<>(); + /** * Constructs a new Aggregator. * @@ -277,6 +280,10 @@ public void postCollection() throws IOException { // post-collect this agg before subs to make it possible to buffer and then replay in postCollection() doPostCollection(); collectableSubAggregators.postCollection(); + if (parent == null) + { + internalAggregation.set(buildTopLevel()); + } } /** Called upon release of the aggregator. */ @@ -305,6 +312,10 @@ protected final InternalAggregations buildEmptySubAggregations() { return InternalAggregations.from(aggs); } + public InternalAggregation getInternalAggregation() { + return internalAggregation.get(); + } + @Override public String toString() { return name; diff --git a/server/src/main/java/org/opensearch/search/aggregations/BucketCollectorProcessor.java b/server/src/main/java/org/opensearch/search/aggregations/BucketCollectorProcessor.java index 135fda71a757a..1e6b49b5bc9e5 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/BucketCollectorProcessor.java +++ b/server/src/main/java/org/opensearch/search/aggregations/BucketCollectorProcessor.java @@ -71,6 +71,8 @@ public void processPostCollection(Collector collectorTree) throws IOException { collectors.offer(innerCollector); } } else if (currentCollector instanceof BucketCollector) { + // This should call buildAggregation + // Does this only get called for top level agg? why? ((BucketCollector) currentCollector).postCollection(); } } @@ -106,4 +108,27 @@ public List toAggregators(Collection collectors) { } return aggregators; } + + public List toInternalAggregations(Collection collectors) { + List internalAggregations = new ArrayList<>(); + + final Deque allCollectors = new LinkedList<>(collectors); + while (!allCollectors.isEmpty()) { + final Collector currentCollector = allCollectors.pop(); + if (currentCollector instanceof AggregatorBase) { + internalAggregations.add(((AggregatorBase) currentCollector).getInternalAggregation()); + } else if (currentCollector instanceof InternalProfileCollector) { + if (((InternalProfileCollector) currentCollector).getCollector() instanceof Aggregator) { + internalAggregations.add(((AggregatorBase) ((InternalProfileCollector) currentCollector).getCollector()).getInternalAggregation()); + } else if (((InternalProfileCollector) currentCollector).getCollector() instanceof MultiBucketCollector) { + allCollectors.addAll( + Arrays.asList(((MultiBucketCollector) ((InternalProfileCollector) currentCollector).getCollector()).getCollectors()) + ); + } + } else if (currentCollector instanceof MultiBucketCollector) { + allCollectors.addAll(Arrays.asList(((MultiBucketCollector) currentCollector).getCollectors())); + } + } + return internalAggregations; + } }