From 5a64c3e398cc172e7832987416a80b0cc1f3fdc0 Mon Sep 17 00:00:00 2001 From: Vamsi Manohar Date: Wed, 14 Feb 2024 12:52:04 -0800 Subject: [PATCH] SQL in search API Signed-off-by: Vamsi Manohar --- .../action/search/TransportSearchAction.java | 5 ++ .../org/opensearch/plugins/SearchPlugin.java | 48 +++++++++++ .../org/opensearch/search/SearchModule.java | 14 ++++ .../search/builder/SearchSourceBuilder.java | 27 ++++-- .../search/externalengine/QueryEngine.java | 20 +++++ .../externalengine/QueryEngineParser.java | 25 ++++++ .../search/externalengine/SQLQueryEngine.java | 84 +++++++++++++++++++ 7 files changed, 218 insertions(+), 5 deletions(-) create mode 100644 server/src/main/java/org/opensearch/search/externalengine/QueryEngine.java create mode 100644 server/src/main/java/org/opensearch/search/externalengine/QueryEngineParser.java create mode 100644 server/src/main/java/org/opensearch/search/externalengine/SQLQueryEngine.java diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index 79e599ec9387b..305a248ec0d8c 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -428,6 +428,11 @@ private void executeRequest( relativeStartNanos, System::nanoTime ); + // taking over by query engine. + if (!originalSearchRequest.source().queryEngines().isEmpty()) { + originalSearchRequest.source().queryEngines().get(0).executeQuery(originalSearchRequest, originalListener); + return; + } if (originalSearchRequest.isPhaseTook() == null) { originalSearchRequest.setPhaseTook(clusterService.getClusterSettings().get(SEARCH_PHASE_TOOK_ENABLED)); } diff --git a/server/src/main/java/org/opensearch/plugins/SearchPlugin.java b/server/src/main/java/org/opensearch/plugins/SearchPlugin.java index 40b4f97cd1897..0ca0255df45e2 100644 --- a/server/src/main/java/org/opensearch/plugins/SearchPlugin.java +++ b/server/src/main/java/org/opensearch/plugins/SearchPlugin.java @@ -64,6 +64,8 @@ import org.opensearch.search.aggregations.pipeline.MovAvgPipelineAggregator; import org.opensearch.search.aggregations.pipeline.PipelineAggregator; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; +import org.opensearch.search.externalengine.QueryEngineParser; +import org.opensearch.search.externalengine.QueryEngine; import org.opensearch.search.fetch.FetchSubPhase; import org.opensearch.search.fetch.subphase.highlight.Highlighter; import org.opensearch.search.query.QueryPhaseSearcher; @@ -216,6 +218,10 @@ default Optional getIndexSearcherExecutorProvider() { return Optional.empty(); } + default List> getQueryEnginesSpecs() { + return emptyList(); + } + /** * Executor service provider */ @@ -877,4 +883,46 @@ public Map getHighlighters() { return highlighters; } } + + /** + * Specification for a {@link SearchExtBuilder} which represents an additional section that can be + * parsed in a search request (within the ext element). + */ + class QueryEngineSpec extends SearchExtensionSpec> { + /** + * Specification of custom {@link SearchExtBuilder}. + * + * @param name holds the names by which this search ext might be parsed. The {@link ParseField#getPreferredName()} is special as it + * is the name by under which the reader is registered. So it is the name that the search ext should use as its + * {@link NamedWriteable#getWriteableName()} too. It is an error if {@link ParseField#getPreferredName()} conflicts with + * another registered name, including names from other plugins. + * @param reader the reader registered for this search ext's builder. Typically, a reference to a constructor that takes a + * {@link StreamInput} + * @param parser the parser function that reads the search ext builder from xcontent + */ + public QueryEngineSpec( + ParseField name, + Writeable.Reader reader, + QueryEngineParser parser + ) { + super(name, reader, parser); + } + + /** + * Specification of custom {@link SearchExtBuilder}. + * + * @param name the name by which this search ext might be parsed or deserialized. Make sure that the search ext builder returns this name for + * {@link NamedWriteable#getWriteableName()}. It is an error if this name conflicts with another registered name, including + * names from other plugins. + * @param reader the reader registered for this search ext's builder. Typically, a reference to a constructor that takes a + * {@link StreamInput} + * @param parser the parser function that reads the search ext builder from xcontent + */ + public QueryEngineSpec(String name, + Writeable.Reader reader, + QueryEngineParser parser) { + super(name, reader, parser); + } + + } } diff --git a/server/src/main/java/org/opensearch/search/SearchModule.java b/server/src/main/java/org/opensearch/search/SearchModule.java index 88218896dceae..5cdda9649a8b1 100644 --- a/server/src/main/java/org/opensearch/search/SearchModule.java +++ b/server/src/main/java/org/opensearch/search/SearchModule.java @@ -114,6 +114,7 @@ import org.opensearch.plugins.SearchPlugin.SignificanceHeuristicSpec; import org.opensearch.plugins.SearchPlugin.SortSpec; import org.opensearch.plugins.SearchPlugin.SuggesterSpec; +import org.opensearch.plugins.SearchPlugin.QueryEngineSpec; import org.opensearch.search.aggregations.AggregationBuilder; import org.opensearch.search.aggregations.BaseAggregationBuilder; import org.opensearch.search.aggregations.InternalAggregation; @@ -240,6 +241,8 @@ import org.opensearch.search.aggregations.pipeline.StatsBucketPipelineAggregationBuilder; import org.opensearch.search.aggregations.pipeline.SumBucketPipelineAggregationBuilder; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; +import org.opensearch.search.externalengine.QueryEngine; +import org.opensearch.search.externalengine.SQLQueryEngine; import org.opensearch.search.fetch.FetchPhase; import org.opensearch.search.fetch.FetchSubPhase; import org.opensearch.search.fetch.subphase.ExplainPhase; @@ -340,6 +343,7 @@ public SearchModule(Settings settings, List plugins) { registerQueryParsers(plugins); registerRescorers(plugins); registerSortParsers(plugins); + registerQueryEngines(plugins); registerValueFormats(); registerSignificanceHeuristics(plugins); this.valuesSourceRegistry = registerAggregations(plugins); @@ -838,6 +842,16 @@ private void registerRescorers(List plugins) { registerFromPlugin(plugins, SearchPlugin::getRescorers, this::registerRescorer); } + private void registerQueryEngines(List plugins) { + registerQueryEngine(new SearchPlugin.QueryEngineSpec<>(SQLQueryEngine.NAME, SQLQueryEngine::new, SQLQueryEngine::fromXContent)); + registerFromPlugin(plugins, SearchPlugin::getQueryEnginesSpecs, this::registerQueryEngine); + } + + private void registerQueryEngine(SearchPlugin.QueryEngineSpec spec) { + namedXContents.add(new NamedXContentRegistry.Entry(QueryEngine.class, spec.getName(), (p, c) -> spec.getParser().fromXContent(p))); + namedWriteables.add(new NamedWriteableRegistry.Entry(QueryEngine.class, spec.getName().getPreferredName(), spec.getReader())); + } + private void registerRescorer(RescorerSpec spec) { namedXContents.add(new NamedXContentRegistry.Entry(RescorerBuilder.class, spec.getName(), (p, c) -> spec.getParser().apply(p))); namedWriteables.add(new NamedWriteableRegistry.Entry(RescorerBuilder.class, spec.getName().getPreferredName(), spec.getReader())); diff --git a/server/src/main/java/org/opensearch/search/builder/SearchSourceBuilder.java b/server/src/main/java/org/opensearch/search/builder/SearchSourceBuilder.java index bdd92a5baa115..fe72c1043aa48 100644 --- a/server/src/main/java/org/opensearch/search/builder/SearchSourceBuilder.java +++ b/server/src/main/java/org/opensearch/search/builder/SearchSourceBuilder.java @@ -60,6 +60,7 @@ import org.opensearch.search.aggregations.AggregatorFactories; import org.opensearch.search.aggregations.PipelineAggregationBuilder; import org.opensearch.search.collapse.CollapseBuilder; +import org.opensearch.search.externalengine.QueryEngine; import org.opensearch.search.fetch.StoredFieldsContext; import org.opensearch.search.fetch.subphase.FetchSourceContext; import org.opensearch.search.fetch.subphase.FieldAndFormat; @@ -216,6 +217,7 @@ public static HighlightBuilder highlight() { private PointInTimeBuilder pointInTimeBuilder = null; private Map searchPipelineSource = null; + private List queryEngines = new ArrayList<>(); /** * Constructs a new search source builder. @@ -1039,6 +1041,15 @@ public SearchSourceBuilder searchPipelineSource(Map searchPipeli return this; } + public List queryEngines() { + return queryEngines; + } + + public List queryEngines(List queryEngines) { + this.queryEngines = queryEngines; + return queryEngines; + } + /** * Rewrites this search source builder into its primitive form. e.g. by * rewriting the QueryBuilder. If the builder did not change the identity @@ -1282,11 +1293,17 @@ public void parseXContent(XContentParser parser, boolean checkTrailingTokens) th } else if (SEARCH_PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) { searchPipelineSource = parser.mapOrdered(); } else { - throw new ParsingException( - parser.getTokenLocation(), - "Unknown key for a " + token + " in [" + currentFieldName + "].", - parser.getTokenLocation() - ); + QueryEngine queryEngine = parser.namedObject(QueryEngine.class, currentFieldName, null); + if (queryEngine != null) { + queryEngines.add(queryEngine); + } + else { + throw new ParsingException( + parser.getTokenLocation(), + "Unknown key for a " + token + " in [" + currentFieldName + "].", + parser.getTokenLocation() + ); + } } } else if (token == XContentParser.Token.START_ARRAY) { if (STORED_FIELDS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) { diff --git a/server/src/main/java/org/opensearch/search/externalengine/QueryEngine.java b/server/src/main/java/org/opensearch/search/externalengine/QueryEngine.java new file mode 100644 index 0000000000000..57891a1aab81a --- /dev/null +++ b/server/src/main/java/org/opensearch/search/externalengine/QueryEngine.java @@ -0,0 +1,20 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.externalengine; + +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.io.stream.NamedWriteable; +import org.opensearch.core.xcontent.ToXContentObject; + +public abstract class QueryEngine implements NamedWriteable, ToXContentObject { + public abstract void executeQuery(SearchRequest searchRequest, + ActionListener actionListener); +} diff --git a/server/src/main/java/org/opensearch/search/externalengine/QueryEngineParser.java b/server/src/main/java/org/opensearch/search/externalengine/QueryEngineParser.java new file mode 100644 index 0000000000000..fa924b382061b --- /dev/null +++ b/server/src/main/java/org/opensearch/search/externalengine/QueryEngineParser.java @@ -0,0 +1,25 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.externalengine; + +import java.io.IOException; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.index.query.QueryBuilder; + +@FunctionalInterface +public interface QueryEngineParser { + + /** + * Creates a new {@link QueryBuilder} from the query held by the + * {@link XContentParser}. The state on the parser contained in this context + * will be changed as a side effect of this method call + */ + T fromXContent(XContentParser parser) throws IOException; + +} diff --git a/server/src/main/java/org/opensearch/search/externalengine/SQLQueryEngine.java b/server/src/main/java/org/opensearch/search/externalengine/SQLQueryEngine.java new file mode 100644 index 0000000000000..00322b0cde128 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/externalengine/SQLQueryEngine.java @@ -0,0 +1,84 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.externalengine; + +import static org.opensearch.repositories.fs.ReloadableFsRepository.randomIntBetween; + +import java.io.IOException; +import java.util.concurrent.ThreadLocalRandom; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.search.ShardSearchFailure; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.search.internal.InternalSearchResponse; + +public class SQLQueryEngine extends QueryEngine { + + public static final String NAME = "sql"; + + public SQLQueryEngine() { + + } + public SQLQueryEngine(StreamInput in) { + } + @Override + public String getWriteableName() { + return "sql"; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + } + + + @Override + public void executeQuery(SearchRequest searchRequest, + ActionListener actionListener) { + // Creating a minimal response is OK, because SearchResponse self + // is tested elsewhere. + long tookInMillis = ThreadLocalRandom.current().nextLong(0L, Long.MAX_VALUE); + int totalShards = randomIntBetween(1, Integer.MAX_VALUE); + int successfulShards = randomIntBetween(0, totalShards); + int skippedShards = totalShards - successfulShards; + InternalSearchResponse internalSearchResponse = InternalSearchResponse.empty(); + int totalClusters = randomIntBetween(0, 10); + int successfulClusters = randomIntBetween(0, totalClusters); + int skippedClusters = totalClusters - successfulClusters; + SearchResponse.Clusters clusters = new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters); + SearchResponse searchResponse = new SearchResponse( + internalSearchResponse, + null, + totalShards, + successfulShards, + skippedShards, + tookInMillis, + ShardSearchFailure.EMPTY_ARRAY, + clusters + ); + actionListener.onResponse(searchResponse); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return null; + } + + public static QueryEngine fromXContent(XContentParser parser) throws IOException { + XContentParser.Token token; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + token = parser.nextToken(); + } + return new SQLQueryEngine(); + } + +}