Skip to content

Commit

Permalink
Refactor non-concurrent collector manager
Browse files Browse the repository at this point in the history
Signed-off-by: Martin Gaievski <[email protected]>
  • Loading branch information
martin-gaievski committed Mar 8, 2024
1 parent 763061b commit 65553e7
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@

import lombok.AllArgsConstructor;
import org.apache.lucene.search.CollectorManager;
import org.opensearch.search.aggregations.AggregationInitializationException;
import org.opensearch.search.aggregations.AggregationProcessor;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.query.QueryPhaseExecutionException;
import org.opensearch.search.query.QuerySearchResult;
import org.opensearch.search.query.ReduceableSearchResult;

import java.io.IOException;
import java.util.Collection;
import java.util.List;

import static org.opensearch.neuralsearch.search.query.HybridQueryPhaseSearcher.isHybridQuery;
Expand All @@ -36,8 +36,8 @@ public void preProcess(SearchContext context) {
CollectorManager collectorManager;
try {
collectorManager = HybridCollectorManager.createHybridCollectorManager(context);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (IOException exception) {
throw new AggregationInitializationException("could not initialize hybrid aggregation processor", exception);
}
context.queryCollectorManagers().put(HybridCollectorManager.class, collectorManager);
}
Expand Down Expand Up @@ -67,8 +67,7 @@ public void postProcess(SearchContext context) {
private void reduceCollectorResults(SearchContext context) {
CollectorManager<?, ReduceableSearchResult> collectorManager = context.queryCollectorManagers().get(HybridCollectorManager.class);
try {
final Collection collectors = List.of(collectorManager.newCollector());
collectorManager.reduce(collectors).reduce(context.queryResult());
collectorManager.reduce(List.of()).reduce(context.queryResult());
} catch (IOException e) {
throw new QueryPhaseExecutionException(context.shardTarget(), "failed to execute hybrid query aggregation processor", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,7 @@ public static CollectorManager createHybridCollectorManager(final SearchContext
}

@Override
abstract public Collector newCollector();

Collector getCollector() {
public Collector newCollector() {
Collector hybridcollector = new HybridTopScoreDocCollector(numHits, hitsThresholdChecker);
return hybridcollector;
}
Expand Down Expand Up @@ -211,7 +209,7 @@ private DocValueFormat[] getSortValueFormats(final SortAndFormats sortAndFormats
* use saved state of collector
*/
static class HybridCollectorNonConcurrentManager extends HybridCollectorManager {
Collector maxScoreCollector;
private final Collector scoreCollector;

public HybridCollectorNonConcurrentManager(
int numHits,
Expand All @@ -221,18 +219,18 @@ public HybridCollectorNonConcurrentManager(
SortAndFormats sortAndFormats
) {
super(numHits, hitsThresholdChecker, isSingleShard, trackTotalHitsUpTo, sortAndFormats);
scoreCollector = Objects.requireNonNull(super.newCollector(), "collector for hybrid query cannot be null");
}

@Override
public Collector newCollector() {
if (Objects.isNull(maxScoreCollector)) {
maxScoreCollector = getCollector();
return maxScoreCollector;
} else {
Collector toReturnCollector = maxScoreCollector;
maxScoreCollector = null;
return toReturnCollector;
}
return scoreCollector;
}

@Override
public ReduceableSearchResult reduce(Collection<Collector> collectors) {
assert collectors.isEmpty() : "reduce on HybridCollectorNonConcurrentManager called with non-empty collectors";
return super.reduce(List.of(scoreCollector));
}
}

Expand All @@ -251,10 +249,5 @@ public HybridCollectorConcurrentSearchManager(
) {
super(numHits, hitsThresholdChecker, isSingleShard, trackTotalHitsUpTo, sortAndFormats);
}

@Override
public Collector newCollector() {
return getCollector();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.opensearch.search.query.QuerySearchResult;
import org.opensearch.search.query.ReduceableSearchResult;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -175,8 +174,7 @@ public void testReduce_whenMatchedDocs_thenSuccessful() {
scorer.score(leafCollector, leafReaderContext.reader().getLiveDocs());
leafCollector.finish();

final Collection<Collector> collectors = List.of(collector);
Object results = hybridCollectorManager.reduce(collectors);
Object results = hybridCollectorManager.reduce(List.of());

assertNotNull(results);
ReduceableSearchResult reduceableSearchResult = ((ReduceableSearchResult) results);
Expand Down

0 comments on commit 65553e7

Please sign in to comment.