Skip to content

Commit

Permalink
temp
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Deng committed Dec 26, 2023
1 parent b9022c5 commit 42ba232
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,19 @@ public String getCollectorReason() {

@Override
public ReduceableSearchResult reduce(Collection<Collector> collectors) throws IOException {
final List<Aggregator> aggregators = context.bucketCollectorProcessor().toAggregators(collectors);
final List<InternalAggregation> internals = new ArrayList<>(aggregators.size());
// This should get the InternalAggregations from the Aggregators
// final List<Aggregator> aggregators = context.bucketCollectorProcessor().toAggregators(collectors);
final List<InternalAggregation> internals = context.bucketCollectorProcessor().toInternalAggregations(collectors);
// final List<InternalAggregation> internals = new ArrayList<>(aggregators.size());
context.aggregations().resetBucketMultiConsumer();
for (Aggregator aggregator : aggregators) {
try {
// post collection is called in ContextIndexSearcher after search on leaves are completed
internals.add(aggregator.buildTopLevel());
} catch (IOException e) {
throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e);
}
}
// for (Aggregator aggregator : aggregators) {
// try {
// // post collection is called in ContextIndexSearcher after search on leaves are completed
// internals.add(aggregator.buildTopLevel());
// } catch (IOException e) {
// throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e);
// }
// }

final InternalAggregations internalAggregations = InternalAggregations.from(internals);
return buildAggregationResult(internalAggregations);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.ScoreMode;
import org.opensearch.common.SetOnce;
import org.opensearch.core.common.breaker.CircuitBreaker;
import org.opensearch.core.common.breaker.CircuitBreakingException;
import org.opensearch.core.indices.breaker.CircuitBreakerService;
Expand Down Expand Up @@ -72,6 +73,8 @@ public abstract class AggregatorBase extends Aggregator {
private final CircuitBreakerService breakerService;
private long requestBytesUsed;

private final SetOnce<InternalAggregation> internalAggregation = new SetOnce<>();

/**
* Constructs a new Aggregator.
*
Expand Down Expand Up @@ -277,6 +280,10 @@ public void postCollection() throws IOException {
// post-collect this agg before subs to make it possible to buffer and then replay in postCollection()
doPostCollection();
collectableSubAggregators.postCollection();
if (parent == null)
{
internalAggregation.set(buildTopLevel());
}
}

/** Called upon release of the aggregator. */
Expand Down Expand Up @@ -305,6 +312,10 @@ protected final InternalAggregations buildEmptySubAggregations() {
return InternalAggregations.from(aggs);
}

public InternalAggregation getInternalAggregation() {
return internalAggregation.get();
}

@Override
public String toString() {
return name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ public void processPostCollection(Collector collectorTree) throws IOException {
collectors.offer(innerCollector);
}
} else if (currentCollector instanceof BucketCollector) {
// This should call buildAggregation
// Does this only get called for top level agg? why?
((BucketCollector) currentCollector).postCollection();
}
}
Expand Down Expand Up @@ -106,4 +108,27 @@ public List<Aggregator> toAggregators(Collection<Collector> collectors) {
}
return aggregators;
}

public List<InternalAggregation> toInternalAggregations(Collection<Collector> collectors) {
List<InternalAggregation> internalAggregations = new ArrayList<>();

final Deque<Collector> allCollectors = new LinkedList<>(collectors);
while (!allCollectors.isEmpty()) {
final Collector currentCollector = allCollectors.pop();
if (currentCollector instanceof AggregatorBase) {
internalAggregations.add(((AggregatorBase) currentCollector).getInternalAggregation());
} else if (currentCollector instanceof InternalProfileCollector) {
if (((InternalProfileCollector) currentCollector).getCollector() instanceof Aggregator) {
internalAggregations.add(((AggregatorBase) ((InternalProfileCollector) currentCollector).getCollector()).getInternalAggregation());
} else if (((InternalProfileCollector) currentCollector).getCollector() instanceof MultiBucketCollector) {
allCollectors.addAll(
Arrays.asList(((MultiBucketCollector) ((InternalProfileCollector) currentCollector).getCollector()).getCollectors())
);
}
} else if (currentCollector instanceof MultiBucketCollector) {
allCollectors.addAll(Arrays.asList(((MultiBucketCollector) currentCollector).getCollectors()));
}
}
return internalAggregations;
}
}

0 comments on commit 42ba232

Please sign in to comment.