Skip to content

Commit

Permalink
rough POC for ordered QueryCollectorContexts
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Deng committed May 20, 2024
1 parent f30e0e0 commit 42d8918
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -308,4 +308,30 @@ CollectorManager<? extends Collector, ReduceableSearchResult> createManager(
}
};
}

static QueryCollectorContext createFakeContext() {
return new QueryCollectorContext("fake_plugin") {
@Override
Collector create(Collector in) {
return EMPTY_COLLECTOR;
}

@Override
CollectorManager<? extends Collector, ReduceableSearchResult> createManager(
CollectorManager<? extends Collector, ReduceableSearchResult> in
) throws IOException {
return new CollectorManager<Collector, ReduceableSearchResult>() {
@Override
public Collector newCollector() throws IOException {
return EMPTY_COLLECTOR;
}

@Override
public ReduceableSearchResult reduce(Collection<Collector> collectors) throws IOException {
return result -> {};
}
};
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.query;

import org.apache.lucene.search.Collector;
import org.apache.lucene.search.CollectorManager;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.PriorityQueue;

public class QueryContextProvider {
private final PriorityQueue<OrderedQueryContext> contextQueue = new PriorityQueue<>(new Comparator<OrderedQueryContext>() {
@Override
public int compare(OrderedQueryContext o1, OrderedQueryContext o2) {
return o1.getPriority() - o2.getPriority();
}
});

public void addQueryContext(QueryCollectorContext collectorContext, int priority) {
contextQueue.offer(new OrderedQueryContext(collectorContext, priority));
}

public QueryCollectorContext getComposedContext() {
return new QueryCollectorContext("provider") {
@Override
Collector create(Collector in) throws IOException {
OrderedQueryContext octx = contextQueue.poll();
Collector collector = null;
while (octx != null) {
collector = octx.getContext().create(collector);
octx = contextQueue.poll();
}
return collector;
}

@Override
CollectorManager<?, ReduceableSearchResult> createManager(CollectorManager<?, ReduceableSearchResult> in) throws IOException {
OrderedQueryContext octx = contextQueue.poll();
CollectorManager<?, ReduceableSearchResult> manager = null;
while (octx != null) {
manager = octx.getContext().createManager(manager);
octx = contextQueue.poll();
}
return manager;
}
};
}
}

class OrderedQueryContext {
private final int priority;
private final QueryCollectorContext context;

public OrderedQueryContext(QueryCollectorContext context, int priority) {
this.priority = priority;
this.context = context;
}

public QueryCollectorContext getContext() {
return this.context;
}

public int getPriority() {
return priority;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import java.util.stream.Collectors;

import static org.opensearch.search.query.QueryCollectorContext.createEarlyTerminationCollectorContext;
import static org.opensearch.search.query.QueryCollectorContext.createFakeContext;
import static org.opensearch.search.query.QueryCollectorContext.createFilteredCollectorContext;
import static org.opensearch.search.query.QueryCollectorContext.createMinScoreCollectorContext;
import static org.opensearch.search.query.QueryCollectorContext.createMultiCollectorContext;
Expand All @@ -100,6 +101,7 @@ public class QueryPhase {
private final QueryPhaseSearcher queryPhaseSearcher;
private final SuggestProcessor suggestProcessor;
private final RescoreProcessor rescoreProcessor;
// private final QueryContextProvider contextProvider;

public QueryPhase() {
this(DEFAULT_QUERY_PHASE_SEARCHER);
Expand All @@ -109,6 +111,7 @@ public QueryPhase(QueryPhaseSearcher queryPhaseSearcher) {
this.queryPhaseSearcher = Objects.requireNonNull(queryPhaseSearcher, "QueryPhaseSearcher is required");
this.suggestProcessor = new SuggestProcessor();
this.rescoreProcessor = new RescoreProcessor();
// this.contextProvider = new QueryContextProvider();
}

public void preProcess(SearchContext context) {
Expand Down Expand Up @@ -192,6 +195,7 @@ static boolean executeInternal(SearchContext searchContext, QueryPhaseSearcher q
final ContextIndexSearcher searcher = searchContext.searcher();
final IndexReader reader = searcher.getIndexReader();
QuerySearchResult queryResult = searchContext.queryResult();
QueryContextProvider contextProvider = new QueryContextProvider();
queryResult.searchTimedOut(false);
try {
queryResult.from(searchContext.from());
Expand Down Expand Up @@ -248,7 +252,10 @@ static boolean executeInternal(SearchContext searchContext, QueryPhaseSearcher q
.map(Map.Entry::getValue)
.collect(Collectors.toList());
if (managersExceptGlobalAgg.isEmpty() == false) {
collectors.add(createMultiCollectorContext(managersExceptGlobalAgg));
contextProvider.addQueryContext(createFakeContext(), 100);
contextProvider.addQueryContext(createMultiCollectorContext(managersExceptGlobalAgg), 50);
collectors.add(contextProvider.getComposedContext());
// collectors.add(createMultiCollectorContext(managersExceptGlobalAgg));
}

if (searchContext.minimumScore() != null) {
Expand Down

0 comments on commit 42d8918

Please sign in to comment.