forked from opensearch-project/OpenSearch
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Concurrent Segment Search]: Implement concurrent aggregations suppor…
…t without profile option (opensearch-project#7514) * Refactoring of AggregationReduceContext to use in SearchContext. This will be used for performing shard level reduce of aggregations during concurrent segment search usecase Signed-off-by: Sorabh Hamirwasia <[email protected]> * Support for non global aggregations with concurrent segment search. This PR does not include the support for profile option with aggregations to work with concurrent model Signed-off-by: Sorabh Hamirwasia <[email protected]> * Implement AggregationCollectorManager's reduce Signed-off-by: Andriy Redko <[email protected]> * Use CollectorManager for both concurrent and non concurrent use case Add CollectorManager for Global Aggregations to support concurrent use case Signed-off-by: Sorabh Hamirwasia <[email protected]> * Address review comments Signed-off-by: Sorabh Hamirwasia <[email protected]> --------- Signed-off-by: Sorabh Hamirwasia <[email protected]> Signed-off-by: Andriy Redko <[email protected]> Co-authored-by: Andriy Redko <[email protected]>
- Loading branch information
1 parent
4fe83ce
commit 492b16a
Showing
38 changed files
with
1,241 additions
and
317 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
115 changes: 115 additions & 0 deletions
115
server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.search.aggregations; | ||
|
||
import org.apache.lucene.search.Collector; | ||
import org.apache.lucene.search.CollectorManager; | ||
import org.opensearch.common.CheckedFunction; | ||
import org.opensearch.search.internal.SearchContext; | ||
import org.opensearch.search.profile.query.InternalProfileCollector; | ||
import org.opensearch.search.query.ReduceableSearchResult; | ||
|
||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.Collection; | ||
import java.util.Collections; | ||
import java.util.Deque; | ||
import java.util.LinkedList; | ||
import java.util.List; | ||
|
||
/** | ||
* Common {@link CollectorManager} used by both concurrent and non-concurrent aggregation path and also for global and non-global | ||
* aggregation operators | ||
*/ | ||
class AggregationCollectorManager implements CollectorManager<Collector, ReduceableSearchResult> { | ||
private final SearchContext context; | ||
private final CheckedFunction<SearchContext, List<Aggregator>, IOException> aggProvider; | ||
private final String collectorReason; | ||
|
||
AggregationCollectorManager( | ||
SearchContext context, | ||
CheckedFunction<SearchContext, List<Aggregator>, IOException> aggProvider, | ||
String collectorReason | ||
) { | ||
this.context = context; | ||
this.aggProvider = aggProvider; | ||
this.collectorReason = collectorReason; | ||
} | ||
|
||
@Override | ||
public Collector newCollector() throws IOException { | ||
final Collector collector = createCollector(context, aggProvider.apply(context), collectorReason); | ||
// For Aggregations we should not have a NO_OP_Collector | ||
assert collector != BucketCollector.NO_OP_COLLECTOR; | ||
return collector; | ||
} | ||
|
||
@Override | ||
public ReduceableSearchResult reduce(Collection<Collector> collectors) throws IOException { | ||
List<Aggregator> aggregators = new ArrayList<>(); | ||
|
||
final Deque<Collector> allCollectors = new LinkedList<>(collectors); | ||
while (!allCollectors.isEmpty()) { | ||
final Collector currentCollector = allCollectors.pop(); | ||
if (currentCollector instanceof Aggregator) { | ||
aggregators.add((Aggregator) currentCollector); | ||
} else if (currentCollector instanceof InternalProfileCollector) { | ||
if (((InternalProfileCollector) currentCollector).getCollector() instanceof Aggregator) { | ||
aggregators.add((Aggregator) ((InternalProfileCollector) currentCollector).getCollector()); | ||
} else if (((InternalProfileCollector) currentCollector).getCollector() instanceof MultiBucketCollector) { | ||
allCollectors.addAll( | ||
Arrays.asList(((MultiBucketCollector) ((InternalProfileCollector) currentCollector).getCollector()).getCollectors()) | ||
); | ||
} | ||
} else if (currentCollector instanceof MultiBucketCollector) { | ||
allCollectors.addAll(Arrays.asList(((MultiBucketCollector) currentCollector).getCollectors())); | ||
} | ||
} | ||
|
||
final List<InternalAggregation> internals = new ArrayList<>(aggregators.size()); | ||
context.aggregations().resetBucketMultiConsumer(); | ||
for (Aggregator aggregator : aggregators) { | ||
try { | ||
aggregator.postCollection(); | ||
internals.add(aggregator.buildTopLevel()); | ||
} catch (IOException e) { | ||
throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e); | ||
} | ||
} | ||
|
||
final InternalAggregations internalAggregations = InternalAggregations.from(internals); | ||
// Reduce the aggregations across slices before sending to the coordinator. We will perform shard level reduce iff multiple slices | ||
// were created to execute this request and it used concurrent segment search path | ||
// TODO: Add the check for flag that the request was executed using concurrent search | ||
if (collectors.size() > 1) { | ||
// using reduce is fine here instead of topLevelReduce as pipeline aggregation is evaluated on the coordinator after all | ||
// documents are collected across shards for an aggregation | ||
return new AggregationReduceableSearchResult( | ||
InternalAggregations.reduce(Collections.singletonList(internalAggregations), context.partial()) | ||
); | ||
} else { | ||
return new AggregationReduceableSearchResult(internalAggregations); | ||
} | ||
} | ||
|
||
static Collector createCollector(SearchContext context, List<Aggregator> collectors, String reason) throws IOException { | ||
Collector collector = MultiBucketCollector.wrap(collectors); | ||
((BucketCollector) collector).preCollection(); | ||
if (context.getProfilers() != null) { | ||
collector = new InternalProfileCollector( | ||
collector, | ||
reason, | ||
// TODO: report on child aggs as well | ||
Collections.emptyList() | ||
); | ||
} | ||
return collector; | ||
} | ||
} |
Oops, something went wrong.