Skip to content

Commit

Permalink
SQL in search API
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <[email protected]>
  • Loading branch information
vmmusings committed Feb 14, 2024
1 parent ad7f00f commit 5a64c3e
Show file tree
Hide file tree
Showing 7 changed files with 218 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,11 @@ private void executeRequest(
relativeStartNanos,
System::nanoTime
);
// taking over by query engine.
if (!originalSearchRequest.source().queryEngines().isEmpty()) {
originalSearchRequest.source().queryEngines().get(0).executeQuery(originalSearchRequest, originalListener);
return;
}
if (originalSearchRequest.isPhaseTook() == null) {
originalSearchRequest.setPhaseTook(clusterService.getClusterSettings().get(SEARCH_PHASE_TOOK_ENABLED));
}
Expand Down
48 changes: 48 additions & 0 deletions server/src/main/java/org/opensearch/plugins/SearchPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
import org.opensearch.search.aggregations.pipeline.MovAvgPipelineAggregator;
import org.opensearch.search.aggregations.pipeline.PipelineAggregator;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
import org.opensearch.search.externalengine.QueryEngineParser;
import org.opensearch.search.externalengine.QueryEngine;
import org.opensearch.search.fetch.FetchSubPhase;
import org.opensearch.search.fetch.subphase.highlight.Highlighter;
import org.opensearch.search.query.QueryPhaseSearcher;
Expand Down Expand Up @@ -216,6 +218,10 @@ default Optional<ExecutorServiceProvider> getIndexSearcherExecutorProvider() {
return Optional.empty();
}

default List<QueryEngineSpec<?>> getQueryEnginesSpecs() {
return emptyList();
}

/**
* Executor service provider
*/
Expand Down Expand Up @@ -877,4 +883,46 @@ public Map<String, Highlighter> getHighlighters() {
return highlighters;
}
}

/**
* Specification for a {@link SearchExtBuilder} which represents an additional section that can be
* parsed in a search request (within the ext element).
*/
class QueryEngineSpec<T extends QueryEngine> extends SearchExtensionSpec<T, QueryEngineParser<T>> {
/**
* Specification of custom {@link SearchExtBuilder}.
*
* @param name holds the names by which this search ext might be parsed. The {@link ParseField#getPreferredName()} is special as it
* is the name by under which the reader is registered. So it is the name that the search ext should use as its
* {@link NamedWriteable#getWriteableName()} too. It is an error if {@link ParseField#getPreferredName()} conflicts with
* another registered name, including names from other plugins.
* @param reader the reader registered for this search ext's builder. Typically, a reference to a constructor that takes a
* {@link StreamInput}
* @param parser the parser function that reads the search ext builder from xcontent
*/
public QueryEngineSpec(
ParseField name,
Writeable.Reader<? extends T> reader,
QueryEngineParser<T> parser
) {
super(name, reader, parser);
}

/**
* Specification of custom {@link SearchExtBuilder}.
*
* @param name the name by which this search ext might be parsed or deserialized. Make sure that the search ext builder returns this name for
* {@link NamedWriteable#getWriteableName()}. It is an error if this name conflicts with another registered name, including
* names from other plugins.
* @param reader the reader registered for this search ext's builder. Typically, a reference to a constructor that takes a
* {@link StreamInput}
* @param parser the parser function that reads the search ext builder from xcontent
*/
public QueryEngineSpec(String name,
Writeable.Reader<? extends T> reader,
QueryEngineParser<T> parser) {
super(name, reader, parser);
}

}
}
14 changes: 14 additions & 0 deletions server/src/main/java/org/opensearch/search/SearchModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
import org.opensearch.plugins.SearchPlugin.SignificanceHeuristicSpec;
import org.opensearch.plugins.SearchPlugin.SortSpec;
import org.opensearch.plugins.SearchPlugin.SuggesterSpec;
import org.opensearch.plugins.SearchPlugin.QueryEngineSpec;
import org.opensearch.search.aggregations.AggregationBuilder;
import org.opensearch.search.aggregations.BaseAggregationBuilder;
import org.opensearch.search.aggregations.InternalAggregation;
Expand Down Expand Up @@ -240,6 +241,8 @@
import org.opensearch.search.aggregations.pipeline.StatsBucketPipelineAggregationBuilder;
import org.opensearch.search.aggregations.pipeline.SumBucketPipelineAggregationBuilder;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
import org.opensearch.search.externalengine.QueryEngine;
import org.opensearch.search.externalengine.SQLQueryEngine;
import org.opensearch.search.fetch.FetchPhase;
import org.opensearch.search.fetch.FetchSubPhase;
import org.opensearch.search.fetch.subphase.ExplainPhase;
Expand Down Expand Up @@ -340,6 +343,7 @@ public SearchModule(Settings settings, List<SearchPlugin> plugins) {
registerQueryParsers(plugins);
registerRescorers(plugins);
registerSortParsers(plugins);
registerQueryEngines(plugins);
registerValueFormats();
registerSignificanceHeuristics(plugins);
this.valuesSourceRegistry = registerAggregations(plugins);
Expand Down Expand Up @@ -838,6 +842,16 @@ private void registerRescorers(List<SearchPlugin> plugins) {
registerFromPlugin(plugins, SearchPlugin::getRescorers, this::registerRescorer);
}

private void registerQueryEngines(List<SearchPlugin> plugins) {
registerQueryEngine(new SearchPlugin.QueryEngineSpec<>(SQLQueryEngine.NAME, SQLQueryEngine::new, SQLQueryEngine::fromXContent));
registerFromPlugin(plugins, SearchPlugin::getQueryEnginesSpecs, this::registerQueryEngine);
}

private void registerQueryEngine(SearchPlugin.QueryEngineSpec<?> spec) {
namedXContents.add(new NamedXContentRegistry.Entry(QueryEngine.class, spec.getName(), (p, c) -> spec.getParser().fromXContent(p)));
namedWriteables.add(new NamedWriteableRegistry.Entry(QueryEngine.class, spec.getName().getPreferredName(), spec.getReader()));
}

private void registerRescorer(RescorerSpec<?> spec) {
namedXContents.add(new NamedXContentRegistry.Entry(RescorerBuilder.class, spec.getName(), (p, c) -> spec.getParser().apply(p)));
namedWriteables.add(new NamedWriteableRegistry.Entry(RescorerBuilder.class, spec.getName().getPreferredName(), spec.getReader()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.opensearch.search.aggregations.AggregatorFactories;
import org.opensearch.search.aggregations.PipelineAggregationBuilder;
import org.opensearch.search.collapse.CollapseBuilder;
import org.opensearch.search.externalengine.QueryEngine;
import org.opensearch.search.fetch.StoredFieldsContext;
import org.opensearch.search.fetch.subphase.FetchSourceContext;
import org.opensearch.search.fetch.subphase.FieldAndFormat;
Expand Down Expand Up @@ -216,6 +217,7 @@ public static HighlightBuilder highlight() {
private PointInTimeBuilder pointInTimeBuilder = null;

private Map<String, Object> searchPipelineSource = null;
private List<QueryEngine> queryEngines = new ArrayList<>();

/**
* Constructs a new search source builder.
Expand Down Expand Up @@ -1039,6 +1041,15 @@ public SearchSourceBuilder searchPipelineSource(Map<String, Object> searchPipeli
return this;
}

public List<QueryEngine> queryEngines() {
return queryEngines;
}

public List<QueryEngine> queryEngines(List<QueryEngine> queryEngines) {
this.queryEngines = queryEngines;
return queryEngines;
}

/**
* Rewrites this search source builder into its primitive form. e.g. by
* rewriting the QueryBuilder. If the builder did not change the identity
Expand Down Expand Up @@ -1282,11 +1293,17 @@ public void parseXContent(XContentParser parser, boolean checkTrailingTokens) th
} else if (SEARCH_PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) {
searchPipelineSource = parser.mapOrdered();
} else {
throw new ParsingException(
parser.getTokenLocation(),
"Unknown key for a " + token + " in [" + currentFieldName + "].",
parser.getTokenLocation()
);
QueryEngine queryEngine = parser.namedObject(QueryEngine.class, currentFieldName, null);
if (queryEngine != null) {
queryEngines.add(queryEngine);
}
else {
throw new ParsingException(
parser.getTokenLocation(),
"Unknown key for a " + token + " in [" + currentFieldName + "].",
parser.getTokenLocation()
);
}
}
} else if (token == XContentParser.Token.START_ARRAY) {
if (STORED_FIELDS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.externalengine;

import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.NamedWriteable;
import org.opensearch.core.xcontent.ToXContentObject;

public abstract class QueryEngine implements NamedWriteable, ToXContentObject {
public abstract void executeQuery(SearchRequest searchRequest,
ActionListener<SearchResponse> actionListener);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.externalengine;

import java.io.IOException;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.query.QueryBuilder;

@FunctionalInterface
public interface QueryEngineParser<T extends QueryEngine> {

/**
* Creates a new {@link QueryBuilder} from the query held by the
* {@link XContentParser}. The state on the parser contained in this context
* will be changed as a side effect of this method call
*/
T fromXContent(XContentParser parser) throws IOException;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.externalengine;

import static org.opensearch.repositories.fs.ReloadableFsRepository.randomIntBetween;

import java.io.IOException;
import java.util.concurrent.ThreadLocalRandom;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.ShardSearchFailure;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.search.internal.InternalSearchResponse;

public class SQLQueryEngine extends QueryEngine {

public static final String NAME = "sql";

public SQLQueryEngine() {

}
public SQLQueryEngine(StreamInput in) {
}
@Override
public String getWriteableName() {
return "sql";
}

@Override
public void writeTo(StreamOutput out) throws IOException {
}


@Override
public void executeQuery(SearchRequest searchRequest,
ActionListener<SearchResponse> actionListener) {
// Creating a minimal response is OK, because SearchResponse self
// is tested elsewhere.
long tookInMillis = ThreadLocalRandom.current().nextLong(0L, Long.MAX_VALUE);
int totalShards = randomIntBetween(1, Integer.MAX_VALUE);
int successfulShards = randomIntBetween(0, totalShards);
int skippedShards = totalShards - successfulShards;
InternalSearchResponse internalSearchResponse = InternalSearchResponse.empty();
int totalClusters = randomIntBetween(0, 10);
int successfulClusters = randomIntBetween(0, totalClusters);
int skippedClusters = totalClusters - successfulClusters;
SearchResponse.Clusters clusters = new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters);
SearchResponse searchResponse = new SearchResponse(
internalSearchResponse,
null,
totalShards,
successfulShards,
skippedShards,
tookInMillis,
ShardSearchFailure.EMPTY_ARRAY,
clusters
);
actionListener.onResponse(searchResponse);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return null;
}

public static QueryEngine fromXContent(XContentParser parser) throws IOException {
XContentParser.Token token;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
token = parser.nextToken();
}
return new SQLQueryEngine();
}

}

0 comments on commit 5a64c3e

Please sign in to comment.