Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PPL/SQL in search API [Opensearch CORE Changes] #12443

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.search.SearchExtBuilder;
import org.opensearch.search.SearchHits;
import org.opensearch.search.aggregations.Aggregations;
import org.opensearch.search.externalengine.QueryEngineExtBuilder;
import org.opensearch.search.profile.ProfileShardResult;
import org.opensearch.search.profile.SearchProfileShardResults;
import org.opensearch.search.suggest.Suggest;
Expand Down Expand Up @@ -74,6 +75,7 @@ public class SearchResponseSections implements ToXContentFragment {
protected final Boolean terminatedEarly;
protected final int numReducePhases;
protected final List<SearchExtBuilder> searchExtBuilders = new ArrayList<>();
protected final List<QueryEngineExtBuilder> queryEngineExtBuilders = new ArrayList<>();

public SearchResponseSections(
SearchHits hits,
Expand All @@ -84,7 +86,7 @@ public SearchResponseSections(
SearchProfileShardResults profileResults,
int numReducePhases
) {
this(hits, aggregations, suggest, timedOut, terminatedEarly, profileResults, numReducePhases, Collections.emptyList());
this(hits, aggregations, suggest, timedOut, terminatedEarly, profileResults, numReducePhases, Collections.emptyList(), Collections.emptyList());
}

public SearchResponseSections(
Expand All @@ -107,6 +109,28 @@ public SearchResponseSections(
this.searchExtBuilders.addAll(Objects.requireNonNull(searchExtBuilders, "searchExtBuilders must not be null"));
}

public SearchResponseSections(
SearchHits hits,
Aggregations aggregations,
Suggest suggest,
boolean timedOut,
Boolean terminatedEarly,
SearchProfileShardResults profileResults,
int numReducePhases,
List<SearchExtBuilder> searchExtBuilders,
List<QueryEngineExtBuilder> queryEngineExtBuilders
) {
this.hits = hits;
this.aggregations = aggregations;
this.suggest = suggest;
this.profileResults = profileResults;
this.timedOut = timedOut;
this.terminatedEarly = terminatedEarly;
this.numReducePhases = numReducePhases;
this.searchExtBuilders.addAll(Objects.requireNonNull(searchExtBuilders, "searchExtBuilders must not be null"));
this.queryEngineExtBuilders.addAll(Objects.requireNonNull(queryEngineExtBuilders, "queryEngineExtBuilders must not be null"));
}

public final boolean timedOut() {
return this.timedOut;
}
Expand Down Expand Up @@ -166,6 +190,13 @@ public final XContentBuilder toXContent(XContentBuilder builder, Params params)
}
builder.endObject();
}

if(!queryEngineExtBuilders.isEmpty()) {
for (QueryEngineExtBuilder queryEngineExtBuilder: queryEngineExtBuilders) {
queryEngineExtBuilder.toXContent(builder, params);
}
}

return builder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,11 @@ private void executeRequest(
relativeStartNanos,
System::nanoTime
);
// taking over by query engine.
if (!originalSearchRequest.source().queryEngines().isEmpty()) {
originalSearchRequest.source().queryEngines().get(0).executeQuery(originalSearchRequest, originalListener);
return;
}
if (originalSearchRequest.isPhaseTook() == null) {
originalSearchRequest.setPhaseTook(clusterService.getClusterSettings().get(SEARCH_PHASE_TOOK_ENABLED));
}
Expand Down
48 changes: 48 additions & 0 deletions server/src/main/java/org/opensearch/plugins/SearchPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
import org.opensearch.search.aggregations.pipeline.MovAvgPipelineAggregator;
import org.opensearch.search.aggregations.pipeline.PipelineAggregator;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
import org.opensearch.search.externalengine.QueryEngineParser;
import org.opensearch.search.externalengine.QueryEngine;
import org.opensearch.search.fetch.FetchSubPhase;
import org.opensearch.search.fetch.subphase.highlight.Highlighter;
import org.opensearch.search.query.QueryPhaseSearcher;
Expand Down Expand Up @@ -216,6 +218,10 @@ default Optional<ExecutorServiceProvider> getIndexSearcherExecutorProvider() {
return Optional.empty();
}

default List<QueryEngineSpec<?>> getQueryEnginesSpecs() {
return emptyList();
}

/**
* Executor service provider
*/
Expand Down Expand Up @@ -877,4 +883,46 @@ public Map<String, Highlighter> getHighlighters() {
return highlighters;
}
}

/**
* Specification for a {@link SearchExtBuilder} which represents an additional section that can be
* parsed in a search request (within the ext element).
*/
class QueryEngineSpec<T extends QueryEngine> extends SearchExtensionSpec<T, QueryEngineParser<T>> {
/**
* Specification of custom {@link SearchExtBuilder}.
*
* @param name holds the names by which this search ext might be parsed. The {@link ParseField#getPreferredName()} is special as it
* is the name by under which the reader is registered. So it is the name that the search ext should use as its
* {@link NamedWriteable#getWriteableName()} too. It is an error if {@link ParseField#getPreferredName()} conflicts with
* another registered name, including names from other plugins.
* @param reader the reader registered for this search ext's builder. Typically, a reference to a constructor that takes a
* {@link StreamInput}
* @param parser the parser function that reads the search ext builder from xcontent
*/
public QueryEngineSpec(
ParseField name,
Writeable.Reader<? extends T> reader,
QueryEngineParser<T> parser
) {
super(name, reader, parser);
}

/**
* Specification of custom {@link SearchExtBuilder}.
*
* @param name the name by which this search ext might be parsed or deserialized. Make sure that the search ext builder returns this name for
* {@link NamedWriteable#getWriteableName()}. It is an error if this name conflicts with another registered name, including
* names from other plugins.
* @param reader the reader registered for this search ext's builder. Typically, a reference to a constructor that takes a
* {@link StreamInput}
* @param parser the parser function that reads the search ext builder from xcontent
*/
public QueryEngineSpec(String name,
Writeable.Reader<? extends T> reader,
QueryEngineParser<T> parser) {
super(name, reader, parser);
}

}
}
12 changes: 12 additions & 0 deletions server/src/main/java/org/opensearch/search/SearchModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
import org.opensearch.plugins.SearchPlugin.SignificanceHeuristicSpec;
import org.opensearch.plugins.SearchPlugin.SortSpec;
import org.opensearch.plugins.SearchPlugin.SuggesterSpec;
import org.opensearch.plugins.SearchPlugin.QueryEngineSpec;
import org.opensearch.search.aggregations.AggregationBuilder;
import org.opensearch.search.aggregations.BaseAggregationBuilder;
import org.opensearch.search.aggregations.InternalAggregation;
Expand Down Expand Up @@ -240,6 +241,7 @@
import org.opensearch.search.aggregations.pipeline.StatsBucketPipelineAggregationBuilder;
import org.opensearch.search.aggregations.pipeline.SumBucketPipelineAggregationBuilder;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
import org.opensearch.search.externalengine.QueryEngine;
import org.opensearch.search.fetch.FetchPhase;
import org.opensearch.search.fetch.FetchSubPhase;
import org.opensearch.search.fetch.subphase.ExplainPhase;
Expand Down Expand Up @@ -340,6 +342,7 @@ public SearchModule(Settings settings, List<SearchPlugin> plugins) {
registerQueryParsers(plugins);
registerRescorers(plugins);
registerSortParsers(plugins);
registerQueryEngines(plugins);
registerValueFormats();
registerSignificanceHeuristics(plugins);
this.valuesSourceRegistry = registerAggregations(plugins);
Expand Down Expand Up @@ -838,6 +841,15 @@ private void registerRescorers(List<SearchPlugin> plugins) {
registerFromPlugin(plugins, SearchPlugin::getRescorers, this::registerRescorer);
}

private void registerQueryEngines(List<SearchPlugin> plugins) {
registerFromPlugin(plugins, SearchPlugin::getQueryEnginesSpecs, this::registerQueryEngine);
}

private void registerQueryEngine(SearchPlugin.QueryEngineSpec<?> spec) {
namedXContents.add(new NamedXContentRegistry.Entry(QueryEngine.class, spec.getName(), (p, c) -> spec.getParser().fromXContent(p)));
namedWriteables.add(new NamedWriteableRegistry.Entry(QueryEngine.class, spec.getName().getPreferredName(), spec.getReader()));
}

private void registerRescorer(RescorerSpec<?> spec) {
namedXContents.add(new NamedXContentRegistry.Entry(RescorerBuilder.class, spec.getName(), (p, c) -> spec.getParser().apply(p)));
namedWriteables.add(new NamedWriteableRegistry.Entry(RescorerBuilder.class, spec.getName().getPreferredName(), spec.getReader()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.opensearch.search.aggregations.AggregatorFactories;
import org.opensearch.search.aggregations.PipelineAggregationBuilder;
import org.opensearch.search.collapse.CollapseBuilder;
import org.opensearch.search.externalengine.QueryEngine;
import org.opensearch.search.fetch.StoredFieldsContext;
import org.opensearch.search.fetch.subphase.FetchSourceContext;
import org.opensearch.search.fetch.subphase.FieldAndFormat;
Expand Down Expand Up @@ -216,6 +217,7 @@ public static HighlightBuilder highlight() {
private PointInTimeBuilder pointInTimeBuilder = null;

private Map<String, Object> searchPipelineSource = null;
private List<QueryEngine> queryEngines = new ArrayList<>();

/**
* Constructs a new search source builder.
Expand Down Expand Up @@ -1039,6 +1041,15 @@ public SearchSourceBuilder searchPipelineSource(Map<String, Object> searchPipeli
return this;
}

public List<QueryEngine> queryEngines() {
return queryEngines;
}

public List<QueryEngine> queryEngines(List<QueryEngine> queryEngines) {
this.queryEngines = queryEngines;
return queryEngines;
}

/**
* Rewrites this search source builder into its primitive form. e.g. by
* rewriting the QueryBuilder. If the builder did not change the identity
Expand Down Expand Up @@ -1282,11 +1293,17 @@ public void parseXContent(XContentParser parser, boolean checkTrailingTokens) th
} else if (SEARCH_PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) {
searchPipelineSource = parser.mapOrdered();
} else {
throw new ParsingException(
parser.getTokenLocation(),
"Unknown key for a " + token + " in [" + currentFieldName + "].",
parser.getTokenLocation()
);
QueryEngine queryEngine = parser.namedObject(QueryEngine.class, currentFieldName, null);
if (queryEngine != null) {
queryEngines.add(queryEngine);
}
else {
throw new ParsingException(
parser.getTokenLocation(),
"Unknown key for a " + token + " in [" + currentFieldName + "].",
parser.getTokenLocation()
);
}
}
} else if (token == XContentParser.Token.START_ARRAY) {
if (STORED_FIELDS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.externalengine;

import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.NamedWriteable;
import org.opensearch.core.xcontent.ToXContentObject;

/**
* QueryEngine abstract interface.
*/
public abstract class QueryEngine implements NamedWriteable, ToXContentObject {
public abstract void executeQuery(SearchRequest searchRequest,
ActionListener<SearchResponse> actionListener);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.search.externalengine;

import org.opensearch.common.CheckedFunction;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.common.io.stream.NamedWriteable;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.plugins.SearchPlugin;
import org.opensearch.plugins.SearchPlugin.SearchExtSpec;

/**
* Intermediate serializable representation of a search ext section. To be subclassed by plugins that support
* a custom section as part of a search request, which will be provided within the ext element.
* Any state needs to be serialized as part of the {@link Writeable#writeTo(StreamOutput)} method and
* read from the incoming stream, usually done adding a constructor that takes {@link StreamInput} as
* an argument.
* <p>
* Registration happens through {@link SearchPlugin#getSearchExts()}, which also needs a {@link CheckedFunction} that's able to parse
* the incoming request from the REST layer into the proper {@link QueryEngineExtBuilder} subclass.
* <p>
* {@link #getWriteableName()} must return the same name as the one used for the registration
* of the {@link SearchExtSpec}.
*
* @see SearchExtSpec
*
* @opensearch.api
*/
@PublicApi(since = "1.0.0")
public abstract class QueryEngineExtBuilder implements NamedWriteable, ToXContentFragment {

public abstract int hashCode();

public abstract boolean equals(Object obj);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.externalengine;

import java.io.IOException;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.query.QueryBuilder;


/**
* Query Engine Parser.
* @param <T> extend QuerEgnien.
*/
@FunctionalInterface
public interface QueryEngineParser<T extends QueryEngine> {

/**
* Creates a new {@link QueryBuilder} from the query held by the
* {@link XContentParser}. The state on the parser contained in this context
* will be changed as a side effect of this method call
*/
T fromXContent(XContentParser parser) throws IOException;

}
Loading
Loading