Skip to content

Commit

Permalink
Refactor code in aggregation processor to make it simpler
Browse files Browse the repository at this point in the history
Signed-off-by: Martin Gaievski <[email protected]>
  • Loading branch information
martin-gaievski committed Feb 20, 2024
1 parent 02e0b09 commit ec3f7dc
Showing 1 changed file with 29 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,53 +29,47 @@ public class HybridAggregationProcessor implements AggregationProcessor {

@Override
public void preProcess(SearchContext context) {
if (shouldUseMultiCollectorManager(context)) {
if (context.shouldUseConcurrentSearch()) {
concurrentAggregationProcessor.preProcess(context);
} else {
defaultAggregationProcessor.preProcess(context);
}
if (HybridQueryUtil.isHybridQuery(context.query(), context)) {
HybridCollectorManager collectorManager;
try {
collectorManager = HybridCollectorManager.createHybridCollectorManager(context);
} catch (IOException e) {
throw new RuntimeException(e);
}
Map<Class<?>, CollectorManager<? extends Collector, ReduceableSearchResult>> collectorManagersByManagerClass = context
.queryCollectorManagers();
collectorManagersByManagerClass.put(HybridCollectorManager.class, collectorManager);
}
if (context.shouldUseConcurrentSearch()) {
concurrentAggregationProcessor.preProcess(context);
} else {
defaultAggregationProcessor.preProcess(context);
}

if (HybridQueryUtil.isHybridQuery(context.query(), context)) {
HybridCollectorManager collectorManager;
try {
collectorManager = HybridCollectorManager.createHybridCollectorManager(context);
} catch (IOException e) {
throw new RuntimeException(e);
}
Map<Class<?>, CollectorManager<? extends Collector, ReduceableSearchResult>> collectorManagersByManagerClass = context
.queryCollectorManagers();
collectorManagersByManagerClass.put(HybridCollectorManager.class, collectorManager);
}
}

@Override
public void postProcess(SearchContext context) {
if (shouldUseMultiCollectorManager(context)) {
if (HybridQueryUtil.isHybridQuery(context.query(), context)) {
if (HybridQueryUtil.isHybridQuery(context.query(), context)) {
if (!context.shouldUseConcurrentSearch()) {
CollectorManager<?, ReduceableSearchResult> collectorManager = context.queryCollectorManagers()
.get(HybridCollectorManager.class);
if (!context.shouldUseConcurrentSearch()) {
try {
final Collection collectors = List.of(collectorManager.newCollector());
collectorManager.reduce(collectors).reduce(context.queryResult());
} catch (IOException e) {
throw new QueryPhaseExecutionException(
context.shardTarget(),
"failed to execute hybrid query aggregation processor",
e
);
}
try {
final Collection collectors = List.of(collectorManager.newCollector());
collectorManager.reduce(collectors).reduce(context.queryResult());
} catch (IOException e) {
throw new QueryPhaseExecutionException(
context.shardTarget(),
"failed to execute hybrid query aggregation processor",
e
);
}
}
updateQueryResult(context.queryResult(), context);
if (context.shouldUseConcurrentSearch()) {
concurrentAggregationProcessor.postProcess(context);
} else {
defaultAggregationProcessor.postProcess(context);
}
}

if (context.shouldUseConcurrentSearch()) {
concurrentAggregationProcessor.postProcess(context);
} else {
defaultAggregationProcessor.postProcess(context);
}
Expand Down

0 comments on commit ec3f7dc

Please sign in to comment.