From 42d89186a34a3d0bb22790c59115637db0a02f41 Mon Sep 17 00:00:00 2001 From: Jay Deng Date: Mon, 20 May 2024 13:22:22 -0700 Subject: [PATCH] rough POC for ordered QueryCollectorContexts --- .../search/query/QueryCollectorContext.java | 26 +++++++ .../search/query/QueryContextProvider.java | 74 +++++++++++++++++++ .../opensearch/search/query/QueryPhase.java | 9 ++- 3 files changed, 108 insertions(+), 1 deletion(-) create mode 100644 server/src/main/java/org/opensearch/search/query/QueryContextProvider.java diff --git a/server/src/main/java/org/opensearch/search/query/QueryCollectorContext.java b/server/src/main/java/org/opensearch/search/query/QueryCollectorContext.java index 08b048cf682bb..0a1e6a00b567e 100644 --- a/server/src/main/java/org/opensearch/search/query/QueryCollectorContext.java +++ b/server/src/main/java/org/opensearch/search/query/QueryCollectorContext.java @@ -308,4 +308,30 @@ CollectorManager createManager( } }; } + + static QueryCollectorContext createFakeContext() { + return new QueryCollectorContext("fake_plugin") { + @Override + Collector create(Collector in) { + return EMPTY_COLLECTOR; + } + + @Override + CollectorManager createManager( + CollectorManager in + ) throws IOException { + return new CollectorManager() { + @Override + public Collector newCollector() throws IOException { + return EMPTY_COLLECTOR; + } + + @Override + public ReduceableSearchResult reduce(Collection collectors) throws IOException { + return result -> {}; + } + }; + } + }; + } } diff --git a/server/src/main/java/org/opensearch/search/query/QueryContextProvider.java b/server/src/main/java/org/opensearch/search/query/QueryContextProvider.java new file mode 100644 index 0000000000000..ea436a656aed6 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/query/QueryContextProvider.java @@ -0,0 +1,74 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.query; + +import org.apache.lucene.search.Collector; +import org.apache.lucene.search.CollectorManager; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.PriorityQueue; + +public class QueryContextProvider { + private final PriorityQueue contextQueue = new PriorityQueue<>(new Comparator() { + @Override + public int compare(OrderedQueryContext o1, OrderedQueryContext o2) { + return o1.getPriority() - o2.getPriority(); + } + }); + + public void addQueryContext(QueryCollectorContext collectorContext, int priority) { + contextQueue.offer(new OrderedQueryContext(collectorContext, priority)); + } + + public QueryCollectorContext getComposedContext() { + return new QueryCollectorContext("provider") { + @Override + Collector create(Collector in) throws IOException { + OrderedQueryContext octx = contextQueue.poll(); + Collector collector = null; + while (octx != null) { + collector = octx.getContext().create(collector); + octx = contextQueue.poll(); + } + return collector; + } + + @Override + CollectorManager createManager(CollectorManager in) throws IOException { + OrderedQueryContext octx = contextQueue.poll(); + CollectorManager manager = null; + while (octx != null) { + manager = octx.getContext().createManager(manager); + octx = contextQueue.poll(); + } + return manager; + } + }; + } +} + +class OrderedQueryContext { + private final int priority; + private final QueryCollectorContext context; + + public OrderedQueryContext(QueryCollectorContext context, int priority) { + this.priority = priority; + this.context = context; + } + + public QueryCollectorContext getContext() { + return this.context; + } + + public int getPriority() { + return priority; + } +} diff --git a/server/src/main/java/org/opensearch/search/query/QueryPhase.java b/server/src/main/java/org/opensearch/search/query/QueryPhase.java index 608649ad22b23..2d62a238a8010 100644 --- a/server/src/main/java/org/opensearch/search/query/QueryPhase.java +++ b/server/src/main/java/org/opensearch/search/query/QueryPhase.java @@ -80,6 +80,7 @@ import java.util.stream.Collectors; import static org.opensearch.search.query.QueryCollectorContext.createEarlyTerminationCollectorContext; +import static org.opensearch.search.query.QueryCollectorContext.createFakeContext; import static org.opensearch.search.query.QueryCollectorContext.createFilteredCollectorContext; import static org.opensearch.search.query.QueryCollectorContext.createMinScoreCollectorContext; import static org.opensearch.search.query.QueryCollectorContext.createMultiCollectorContext; @@ -100,6 +101,7 @@ public class QueryPhase { private final QueryPhaseSearcher queryPhaseSearcher; private final SuggestProcessor suggestProcessor; private final RescoreProcessor rescoreProcessor; +// private final QueryContextProvider contextProvider; public QueryPhase() { this(DEFAULT_QUERY_PHASE_SEARCHER); @@ -109,6 +111,7 @@ public QueryPhase(QueryPhaseSearcher queryPhaseSearcher) { this.queryPhaseSearcher = Objects.requireNonNull(queryPhaseSearcher, "QueryPhaseSearcher is required"); this.suggestProcessor = new SuggestProcessor(); this.rescoreProcessor = new RescoreProcessor(); +// this.contextProvider = new QueryContextProvider(); } public void preProcess(SearchContext context) { @@ -192,6 +195,7 @@ static boolean executeInternal(SearchContext searchContext, QueryPhaseSearcher q final ContextIndexSearcher searcher = searchContext.searcher(); final IndexReader reader = searcher.getIndexReader(); QuerySearchResult queryResult = searchContext.queryResult(); + QueryContextProvider contextProvider = new QueryContextProvider(); queryResult.searchTimedOut(false); try { queryResult.from(searchContext.from()); @@ -248,7 +252,10 @@ static boolean executeInternal(SearchContext searchContext, QueryPhaseSearcher q .map(Map.Entry::getValue) .collect(Collectors.toList()); if (managersExceptGlobalAgg.isEmpty() == false) { - collectors.add(createMultiCollectorContext(managersExceptGlobalAgg)); + contextProvider.addQueryContext(createFakeContext(), 100); + contextProvider.addQueryContext(createMultiCollectorContext(managersExceptGlobalAgg), 50); + collectors.add(contextProvider.getComposedContext()); +// collectors.add(createMultiCollectorContext(managersExceptGlobalAgg)); } if (searchContext.minimumScore() != null) {