From a4e5cd022a5e73704754a59304b49c30d0c434b3 Mon Sep 17 00:00:00 2001 From: Michael Froh Date: Mon, 10 Jul 2023 18:47:12 -0700 Subject: [PATCH] [Search pipelines] Pass "adhocness" flag to processor factories (#8164) (#8522) * [Search pipelines] Pass "adhocness" flag to processor factories A named search pipeline may be created with a PUT request, while an "anonymous" or "ad hoc" search pipeline can be defined in the search request source. In the latter case, we don't want to create any "resource-heavy" processors, since they're potentially increasing the cost of every search request, whereas names pipeline processors get reused. This change passes a configuration flag to a processor factory if it's being called as part of an ad hoc pipeline. The factory can use that information to avoid allocating expensive resources (maybe by throwing an exception instead). Backport from commit https://github.com/opensearch-project/OpenSearch/commit/431b2464324a76e60fb446567504eae846f7b120 --------- Signed-off-by: Michael Froh (cherry picked from commit 431b2464324a76e60fb446567504eae846f7b120) --- CHANGELOG.md | 1 + .../common/FilterQueryRequestProcessor.java | 29 ++--- .../common/RenameFieldResponseProcessor.java | 3 +- .../common/ScriptRequestProcessor.java | 29 +++-- .../SearchPipelineCommonModulePlugin.java | 4 +- .../FilterQueryRequestProcessorTests.java | 14 +-- .../RenameFieldResponseProcessorTests.java | 7 +- .../search_pipeline/50_script_processor.yml | 2 + .../plugins/SearchPipelinePlugin.java | 88 +++++++++++++- .../search/pipeline/PipelineWithMetrics.java | 35 +++++- .../opensearch/search/pipeline/Processor.java | 112 ++++++------------ .../pipeline/SearchPipelineService.java | 11 +- .../pipeline/SearchPipelineServiceTests.java | 109 ++++++++++++----- 13 files changed, 281 insertions(+), 163 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4f64434c6ac27..00022827dad6d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -97,6 +97,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Update ZSTD default compression level ([#8471](https://github.com/opensearch-project/OpenSearch/pull/8471)) - Enabling compression levels for zstd and zstd_no_dict ([#8312](https://github.com/opensearch-project/OpenSearch/pull/8312)) - Improved performance of parsing floating point numbers ([#8467](https://github.com/opensearch-project/OpenSearch/pull/8467)) +- [Search Pipelines] Pass pipeline creation context to processor factories ([#8164](https://github.com/opensearch-project/OpenSearch/pull/8164)) - Move span actions to Scope ([#8411](https://github.com/opensearch-project/OpenSearch/pull/8411)) - [Refactor] OpenSearchException streamables to a registry ([#7646](https://github.com/opensearch-project/OpenSearch/pull/7646)) - [Refactor] Serverless common classes to libraries ([#8065](https://github.com/opensearch-project/OpenSearch/pull/8065)) diff --git a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessor.java b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessor.java index 7deb8faa03af6..d8862aa59cede 100644 --- a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessor.java +++ b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessor.java @@ -13,12 +13,12 @@ import org.opensearch.common.xcontent.LoggingDeprecationHandler; import org.opensearch.common.xcontent.XContentType; import org.opensearch.common.xcontent.json.JsonXContent; -import org.opensearch.core.ParseField; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.index.query.BoolQueryBuilder; import org.opensearch.index.query.QueryBuilder; +import org.opensearch.ingest.ConfigurationUtils; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.search.pipeline.Processor; import org.opensearch.search.pipeline.SearchRequestProcessor; @@ -89,8 +89,8 @@ public SearchRequest processRequest(SearchRequest request) throws Exception { } static class Factory implements Processor.Factory { + private static final String QUERY_KEY = "query"; private final NamedXContentRegistry namedXContentRegistry; - public static final ParseField QUERY_FIELD = new ParseField("query"); Factory(NamedXContentRegistry namedXContentRegistry) { this.namedXContentRegistry = namedXContentRegistry; @@ -101,30 +101,21 @@ public FilterQueryRequestProcessor create( Map> processorFactories, String tag, String description, - Map config + Map config, + PipelineContext pipelineContext ) throws Exception { + Map query = ConfigurationUtils.readOptionalMap(TYPE, tag, config, QUERY_KEY); + if (query == null) { + throw new IllegalArgumentException("Did not specify the " + QUERY_KEY + " property in processor of type " + TYPE); + } try ( - XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent).map(config); + XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent).map(query); InputStream stream = BytesReference.bytes(builder).streamInput(); XContentParser parser = XContentType.JSON.xContent() .createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, stream) ) { - XContentParser.Token token = parser.nextToken(); - assert token == XContentParser.Token.START_OBJECT; - String currentFieldName = null; - while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { - if (token == XContentParser.Token.FIELD_NAME) { - currentFieldName = parser.currentName(); - } else if (token == XContentParser.Token.START_OBJECT) { - if (QUERY_FIELD.match(currentFieldName, parser.getDeprecationHandler())) { - return new FilterQueryRequestProcessor(tag, description, parseInnerQueryBuilder(parser)); - } - } - } + return new FilterQueryRequestProcessor(tag, description, parseInnerQueryBuilder(parser)); } - throw new IllegalArgumentException( - "Did not specify the " + QUERY_FIELD.getPreferredName() + " property in processor of type " + TYPE - ); } } } diff --git a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/RenameFieldResponseProcessor.java b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/RenameFieldResponseProcessor.java index 4c40dda5928f0..c8b3c06a71562 100644 --- a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/RenameFieldResponseProcessor.java +++ b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/RenameFieldResponseProcessor.java @@ -140,7 +140,8 @@ public RenameFieldResponseProcessor create( Map> processorFactories, String tag, String description, - Map config + Map config, + PipelineContext pipelineContext ) throws Exception { String oldField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "field"); String newField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "target_field"); diff --git a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/ScriptRequestProcessor.java b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/ScriptRequestProcessor.java index 015411e0701a4..43ab3d4622d6b 100644 --- a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/ScriptRequestProcessor.java +++ b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/ScriptRequestProcessor.java @@ -29,7 +29,8 @@ import org.opensearch.search.pipeline.common.helpers.SearchRequestMap; import java.io.InputStream; -import java.util.Arrays; +import java.util.HashMap; +import java.util.List; import java.util.Map; import static org.opensearch.ingest.ConfigurationUtils.newConfigurationException; @@ -127,6 +128,8 @@ SearchScript getPrecompiledSearchScript() { * Factory class for creating {@link ScriptRequestProcessor}. */ public static final class Factory implements Processor.Factory { + private static final List SCRIPT_CONFIG_KEYS = List.of("id", "source", "inline", "lang", "params", "options"); + private final ScriptService scriptService; /** @@ -138,33 +141,29 @@ public Factory(ScriptService scriptService) { this.scriptService = scriptService; } - /** - * Creates a new instance of {@link ScriptRequestProcessor}. - * - * @param registry The registry of processor factories. - * @param processorTag The processor's tag. - * @param description The processor's description. - * @param config The configuration options for the processor. - * @return The created {@link ScriptRequestProcessor} instance. - * @throws Exception if an error occurs during the creation process. - */ @Override public ScriptRequestProcessor create( Map> registry, String processorTag, String description, - Map config + Map config, + PipelineContext pipelineContext ) throws Exception { + Map scriptConfig = new HashMap<>(); + for (String key : SCRIPT_CONFIG_KEYS) { + Object val = config.remove(key); + if (val != null) { + scriptConfig.put(key, val); + } + } try ( - XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent).map(config); + XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent).map(scriptConfig); InputStream stream = BytesReference.bytes(builder).streamInput(); XContentParser parser = XContentType.JSON.xContent() .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream) ) { Script script = Script.parse(parser); - Arrays.asList("id", "source", "inline", "lang", "params", "options").forEach(config::remove); - // verify script is able to be compiled before successfully creating processor. SearchScript searchScript = null; try { diff --git a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePlugin.java b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePlugin.java index dc25de460fdba..49681b80fdead 100644 --- a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePlugin.java +++ b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePlugin.java @@ -33,7 +33,7 @@ public SearchPipelineCommonModulePlugin() {} * @return A map of processor factories, where the keys are the processor types and the values are the corresponding factory instances. */ @Override - public Map> getRequestProcessors(Processor.Parameters parameters) { + public Map> getRequestProcessors(Parameters parameters) { return Map.of( FilterQueryRequestProcessor.TYPE, new FilterQueryRequestProcessor.Factory(parameters.namedXContentRegistry), @@ -43,7 +43,7 @@ public Map> getRequestProcesso } @Override - public Map> getResponseProcessors(Processor.Parameters parameters) { + public Map> getResponseProcessors(Parameters parameters) { return Map.of(RenameFieldResponseProcessor.TYPE, new RenameFieldResponseProcessor.Factory()); } } diff --git a/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessorTests.java b/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessorTests.java index 1f355ac97c801..ecf746af556a2 100644 --- a/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessorTests.java +++ b/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessorTests.java @@ -16,6 +16,7 @@ import org.opensearch.test.AbstractBuilderTestCase; import java.util.Collections; +import java.util.HashMap; import java.util.Map; public class FilterQueryRequestProcessorTests extends AbstractBuilderTestCase { @@ -37,15 +38,14 @@ public void testFilterQuery() throws Exception { public void testFactory() throws Exception { FilterQueryRequestProcessor.Factory factory = new FilterQueryRequestProcessor.Factory(this.xContentRegistry()); - FilterQueryRequestProcessor processor = factory.create( - Collections.emptyMap(), - null, - null, - Map.of("query", Map.of("term", Map.of("field", "value"))) - ); + Map configMap = new HashMap<>(Map.of("query", Map.of("term", Map.of("field", "value")))); + FilterQueryRequestProcessor processor = factory.create(Collections.emptyMap(), null, null, configMap, null); assertEquals(new TermQueryBuilder("field", "value"), processor.filterQuery); // Missing "query" parameter: - expectThrows(IllegalArgumentException.class, () -> factory.create(Collections.emptyMap(), null, null, Collections.emptyMap())); + expectThrows( + IllegalArgumentException.class, + () -> factory.create(Collections.emptyMap(), null, null, Collections.emptyMap(), null) + ); } } diff --git a/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/RenameFieldResponseProcessorTests.java b/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/RenameFieldResponseProcessorTests.java index a2fc7f6acfa7c..7f3a2acfbdc08 100644 --- a/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/RenameFieldResponseProcessorTests.java +++ b/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/RenameFieldResponseProcessorTests.java @@ -115,12 +115,15 @@ public void testFactory() throws Exception { config.put("target_field", newField); RenameFieldResponseProcessor.Factory factory = new RenameFieldResponseProcessor.Factory(); - RenameFieldResponseProcessor processor = factory.create(Collections.emptyMap(), null, null, config); + RenameFieldResponseProcessor processor = factory.create(Collections.emptyMap(), null, null, config, null); assertEquals(processor.getType(), "rename_field"); assertEquals(processor.getOldField(), oldField); assertEquals(processor.getNewField(), newField); assertFalse(processor.isIgnoreMissing()); - expectThrows(OpenSearchParseException.class, () -> factory.create(Collections.emptyMap(), null, null, Collections.emptyMap())); + expectThrows( + OpenSearchParseException.class, + () -> factory.create(Collections.emptyMap(), null, null, Collections.emptyMap(), null) + ); } } diff --git a/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/50_script_processor.yml b/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/50_script_processor.yml index 9d855e8a1861a..7a01e68acf75c 100644 --- a/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/50_script_processor.yml +++ b/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/50_script_processor.yml @@ -17,6 +17,7 @@ teardown: "request_processors": [ { "script" : { + "tag": "empty_script", "lang": "painless", "source" : "" } @@ -38,6 +39,7 @@ teardown: "request_processors": [ { "script" : { + "tag": "working", "lang" : "painless", "source" : "ctx._source['size'] += 10; ctx._source['from'] = ctx._source['from'] <= 0 ? ctx._source['from'] : ctx._source['from'] - 1 ; ctx._source['explain'] = !ctx._source['explain']; ctx._source['version'] = !ctx._source['version']; ctx._source['seq_no_primary_term'] = !ctx._source['seq_no_primary_term']; ctx._source['track_scores'] = !ctx._source['track_scores']; ctx._source['track_total_hits'] = 1; ctx._source['min_score'] -= 0.9; ctx._source['terminate_after'] += 2; ctx._source['profile'] = !ctx._source['profile'];" } diff --git a/server/src/main/java/org/opensearch/plugins/SearchPipelinePlugin.java b/server/src/main/java/org/opensearch/plugins/SearchPipelinePlugin.java index 3d76bab93a60c..d2ef2b65c5944 100644 --- a/server/src/main/java/org/opensearch/plugins/SearchPipelinePlugin.java +++ b/server/src/main/java/org/opensearch/plugins/SearchPipelinePlugin.java @@ -8,13 +8,24 @@ package org.opensearch.plugins; +import org.opensearch.client.Client; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.env.Environment; +import org.opensearch.index.analysis.AnalysisRegistry; +import org.opensearch.script.ScriptService; import org.opensearch.search.pipeline.Processor; import org.opensearch.search.pipeline.SearchPhaseResultsProcessor; +import org.opensearch.search.pipeline.SearchPipelineService; import org.opensearch.search.pipeline.SearchRequestProcessor; import org.opensearch.search.pipeline.SearchResponseProcessor; +import org.opensearch.threadpool.Scheduler; import java.util.Collections; import java.util.Map; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.LongSupplier; /** * An extension point for {@link Plugin} implementation to add custom search pipeline processors. @@ -29,7 +40,7 @@ public interface SearchPipelinePlugin { * in pipeline configurations, and the value is a {@link org.opensearch.search.pipeline.Processor.Factory} * to create the processor from a given pipeline configuration. */ - default Map> getRequestProcessors(Processor.Parameters parameters) { + default Map> getRequestProcessors(Parameters parameters) { return Collections.emptyMap(); } @@ -40,7 +51,7 @@ default Map> getRequestProcess * in pipeline configurations, and the value is a {@link org.opensearch.search.pipeline.Processor.Factory} * to create the processor from a given pipeline configuration. */ - default Map> getResponseProcessors(Processor.Parameters parameters) { + default Map> getResponseProcessors(Parameters parameters) { return Collections.emptyMap(); } @@ -51,7 +62,78 @@ default Map> getResponseProce * in pipeline configurations, and the value is a {@link org.opensearch.search.pipeline.Processor.Factory} * to create the processor from a given pipeline configuration. */ - default Map> getSearchPhaseResultsProcessors(Processor.Parameters parameters) { + default Map> getSearchPhaseResultsProcessors(Parameters parameters) { return Collections.emptyMap(); } + + /** + * Infrastructure class that holds services that can be used by processor factories to create processor instances + * and that gets passed around to all {@link SearchPipelinePlugin}s. + */ + class Parameters { + + /** + * Useful to provide access to the node's environment like config directory to processor factories. + */ + public final Environment env; + + /** + * Provides processors script support. + */ + public final ScriptService scriptService; + + /** + * Provide analyzer support + */ + public final AnalysisRegistry analysisRegistry; + + /** + * Allows processors to read headers set by {@link org.opensearch.action.support.ActionFilter} + * instances that have run while handling the current search. + */ + public final ThreadContext threadContext; + + public final LongSupplier relativeTimeSupplier; + + public final SearchPipelineService searchPipelineService; + + public final Consumer genericExecutor; + + public final NamedXContentRegistry namedXContentRegistry; + + /** + * Provides scheduler support + */ + public final BiFunction scheduler; + + /** + * Provides access to the node's cluster client + */ + public final Client client; + + public Parameters( + Environment env, + ScriptService scriptService, + AnalysisRegistry analysisRegistry, + ThreadContext threadContext, + LongSupplier relativeTimeSupplier, + BiFunction scheduler, + SearchPipelineService searchPipelineService, + Client client, + Consumer genericExecutor, + NamedXContentRegistry namedXContentRegistry + ) { + this.env = env; + this.scriptService = scriptService; + this.threadContext = threadContext; + this.analysisRegistry = analysisRegistry; + this.relativeTimeSupplier = relativeTimeSupplier; + this.scheduler = scheduler; + this.searchPipelineService = searchPipelineService; + this.client = client; + this.genericExecutor = genericExecutor; + this.namedXContentRegistry = namedXContentRegistry; + } + + } } diff --git a/server/src/main/java/org/opensearch/search/pipeline/PipelineWithMetrics.java b/server/src/main/java/org/opensearch/search/pipeline/PipelineWithMetrics.java index 612e979e56070..060894a37e5ed 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/PipelineWithMetrics.java +++ b/server/src/main/java/org/opensearch/search/pipeline/PipelineWithMetrics.java @@ -77,19 +77,28 @@ static PipelineWithMetrics create( Map> phaseResultsProcessorFactories, NamedWriteableRegistry namedWriteableRegistry, OperationMetrics totalRequestProcessingMetrics, - OperationMetrics totalResponseProcessingMetrics + OperationMetrics totalResponseProcessingMetrics, + Processor.PipelineContext pipelineContext ) throws Exception { String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY); Integer version = ConfigurationUtils.readIntProperty(null, null, config, VERSION_KEY, null); List> requestProcessorConfigs = ConfigurationUtils.readOptionalList(null, null, config, REQUEST_PROCESSORS_KEY); - List requestProcessors = readProcessors(requestProcessorFactories, requestProcessorConfigs); + List requestProcessors = readProcessors( + requestProcessorFactories, + requestProcessorConfigs, + pipelineContext + ); List> responseProcessorConfigs = ConfigurationUtils.readOptionalList( null, null, config, RESPONSE_PROCESSORS_KEY ); - List responseProcessors = readProcessors(responseProcessorFactories, responseProcessorConfigs); + List responseProcessors = readProcessors( + responseProcessorFactories, + responseProcessorConfigs, + pipelineContext + ); List> phaseResultsProcessorConfigs = ConfigurationUtils.readOptionalList( null, null, @@ -98,7 +107,8 @@ static PipelineWithMetrics create( ); List phaseResultsProcessors = readProcessors( phaseResultsProcessorFactories, - phaseResultsProcessorConfigs + phaseResultsProcessorConfigs, + pipelineContext ); if (config.isEmpty() == false) { throw new OpenSearchParseException( @@ -125,7 +135,8 @@ static PipelineWithMetrics create( private static List readProcessors( Map> processorFactories, - List> requestProcessorConfigs + List> requestProcessorConfigs, + Processor.PipelineContext pipelineContext ) throws Exception { List processors = new ArrayList<>(); if (requestProcessorConfigs == null) { @@ -140,7 +151,19 @@ private static List readProcessors( Map config = (Map) entry.getValue(); String tag = ConfigurationUtils.readOptionalStringProperty(null, null, config, TAG_KEY); String description = ConfigurationUtils.readOptionalStringProperty(null, tag, config, DESCRIPTION_KEY); - processors.add(processorFactories.get(type).create(processorFactories, tag, description, config)); + processors.add(processorFactories.get(type).create(processorFactories, tag, description, config, pipelineContext)); + if (config.isEmpty() == false) { + String processorName = type; + if (tag != null) { + processorName = processorName + ":" + tag; + } + throw new OpenSearchParseException( + "processor [" + + processorName + + "] doesn't support one or more provided configuration parameters: " + + Arrays.toString(config.keySet().toArray()) + ); + } } } return Collections.unmodifiableList(processors); diff --git a/server/src/main/java/org/opensearch/search/pipeline/Processor.java b/server/src/main/java/org/opensearch/search/pipeline/Processor.java index ee28db1cc334d..cc96132479c74 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/Processor.java +++ b/server/src/main/java/org/opensearch/search/pipeline/Processor.java @@ -8,19 +8,7 @@ package org.opensearch.search.pipeline; -import org.opensearch.client.Client; -import org.opensearch.common.util.concurrent.ThreadContext; -import org.opensearch.core.xcontent.NamedXContentRegistry; -import org.opensearch.env.Environment; -import org.opensearch.index.analysis.AnalysisRegistry; -import org.opensearch.plugins.SearchPipelinePlugin; -import org.opensearch.script.ScriptService; -import org.opensearch.threadpool.Scheduler; - import java.util.Map; -import java.util.function.BiFunction; -import java.util.function.Consumer; -import java.util.function.LongSupplier; /** * A processor implementation may modify the request or response from a search call. @@ -33,6 +21,12 @@ * @opensearch.internal */ public interface Processor { + /** + * Processor configuration key to let the factory know the context for pipeline creation. + *

+ * See {@link PipelineSource}. + */ + String PIPELINE_SOURCE = "pipeline_source"; /** * Gets the type of processor @@ -61,81 +55,45 @@ interface Factory { * @param tag The tag for the processor * @param description A short description of what this processor does * @param config The configuration for the processor - * * Note: Implementations are responsible for removing the used configuration * keys, so that after creation the config map should be empty. + * @param pipelineContext Contextual information about the enclosing pipeline. */ - T create(Map> processorFactories, String tag, String description, Map config) throws Exception; + T create( + Map> processorFactories, + String tag, + String description, + Map config, + PipelineContext pipelineContext + ) throws Exception; } /** - * Infrastructure class that holds services that can be used by processor factories to create processor instances - * and that gets passed around to all {@link SearchPipelinePlugin}s. + * Contextual information about the enclosing pipeline. A processor factory may change processor initialization behavior or + * pass this information to the created processor instance. */ - class Parameters { + class PipelineContext { + private final PipelineSource pipelineSource; - /** - * Useful to provide access to the node's environment like config directory to processor factories. - */ - public final Environment env; - - /** - * Provides processors script support. - */ - public final ScriptService scriptService; - - /** - * Provide analyzer support - */ - public final AnalysisRegistry analysisRegistry; - - /** - * Allows processors to read headers set by {@link org.opensearch.action.support.ActionFilter} - * instances that have run while handling the current search. - */ - public final ThreadContext threadContext; - - public final LongSupplier relativeTimeSupplier; - - public final SearchPipelineService searchPipelineService; - - public final Consumer genericExecutor; - - public final NamedXContentRegistry namedXContentRegistry; - - /** - * Provides scheduler support - */ - public final BiFunction scheduler; - - /** - * Provides access to the node's cluster client - */ - public final Client client; + public PipelineContext(PipelineSource pipelineSource) { + this.pipelineSource = pipelineSource; + } - public Parameters( - Environment env, - ScriptService scriptService, - AnalysisRegistry analysisRegistry, - ThreadContext threadContext, - LongSupplier relativeTimeSupplier, - BiFunction scheduler, - SearchPipelineService searchPipelineService, - Client client, - Consumer genericExecutor, - NamedXContentRegistry namedXContentRegistry - ) { - this.env = env; - this.scriptService = scriptService; - this.threadContext = threadContext; - this.analysisRegistry = analysisRegistry; - this.relativeTimeSupplier = relativeTimeSupplier; - this.scheduler = scheduler; - this.searchPipelineService = searchPipelineService; - this.client = client; - this.genericExecutor = genericExecutor; - this.namedXContentRegistry = namedXContentRegistry; + public PipelineSource getPipelineSource() { + return pipelineSource; } + } + /** + * A processor factory may change the processor initialization behavior based on the creation context (e.g. avoiding + * creating expensive resources during validation or in a request-scoped pipeline.) + */ + enum PipelineSource { + // A named pipeline is being created or updated + UPDATE_PIPELINE, + // Pipeline is defined within a search request + SEARCH_REQUEST, + // A named pipeline is being validated before being written to cluster state + VALIDATE_PIPELINE } } diff --git a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java index 70dc8546a077f..83a7a0564467e 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java +++ b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java @@ -103,7 +103,7 @@ public SearchPipelineService( this.scriptService = scriptService; this.threadPool = threadPool; this.namedWriteableRegistry = namedWriteableRegistry; - Processor.Parameters parameters = new Processor.Parameters( + SearchPipelinePlugin.Parameters parameters = new SearchPipelinePlugin.Parameters( env, scriptService, analysisRegistry, @@ -189,7 +189,8 @@ void innerUpdatePipelines(SearchPipelineMetadata newSearchPipelineMetadata) { phaseInjectorProcessorFactories, namedWriteableRegistry, totalRequestProcessingMetrics, - totalResponseProcessingMetrics + totalResponseProcessingMetrics, + new Processor.PipelineContext(Processor.PipelineSource.UPDATE_PIPELINE) ); newPipelines.put(newConfiguration.getId(), new PipelineHolder(newConfiguration, newPipeline)); @@ -289,7 +290,8 @@ void validatePipeline(Map searchPipelineInfos phaseInjectorProcessorFactories, namedWriteableRegistry, new OperationMetrics(), // Use ephemeral metrics for validation - new OperationMetrics() + new OperationMetrics(), + new Processor.PipelineContext(Processor.PipelineSource.VALIDATE_PIPELINE) ); List exceptions = new ArrayList<>(); for (SearchRequestProcessor processor : pipeline.getSearchRequestProcessors()) { @@ -388,7 +390,8 @@ public PipelinedRequest resolvePipeline(SearchRequest searchRequest) { phaseInjectorProcessorFactories, namedWriteableRegistry, totalRequestProcessingMetrics, - totalResponseProcessingMetrics + totalResponseProcessingMetrics, + new Processor.PipelineContext(Processor.PipelineSource.SEARCH_REQUEST) ); } catch (Exception e) { throw new SearchPipelineProcessingException(e); diff --git a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java index 2ac0b2136ddd9..84f39e4bdab42 100644 --- a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java +++ b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java @@ -77,19 +77,17 @@ public class SearchPipelineServiceTests extends OpenSearchTestCase { private static final SearchPipelinePlugin DUMMY_PLUGIN = new SearchPipelinePlugin() { @Override - public Map> getRequestProcessors(Processor.Parameters parameters) { - return Map.of("foo", (factories, tag, description, config) -> null); + public Map> getRequestProcessors(Parameters parameters) { + return Map.of("foo", (factories, tag, description, config, ctx) -> null); } - public Map> getResponseProcessors(Processor.Parameters parameters) { - return Map.of("bar", (factories, tag, description, config) -> null); + public Map> getResponseProcessors(Parameters parameters) { + return Map.of("bar", (factories, tag, description, config, ctx) -> null); } @Override - public Map> getSearchPhaseResultsProcessors( - Processor.Parameters parameters - ) { - return Map.of("zoe", (factories, tag, description, config) -> null); + public Map> getSearchPhaseResultsProcessors(Parameters parameters) { + return Map.of("zoe", (factories, tag, description, config, ctx) -> null); } }; @@ -303,7 +301,7 @@ public SearchPhaseName getAfterPhase() { private SearchPipelineService createWithProcessors() { Map> requestProcessors = new HashMap<>(); - requestProcessors.put("scale_request_size", (processorFactories, tag, description, config) -> { + requestProcessors.put("scale_request_size", (processorFactories, tag, description, config, ctx) -> { float scale = ((Number) config.remove("scale")).floatValue(); return new FakeRequestProcessor( "scale_request_size", @@ -313,13 +311,13 @@ private SearchPipelineService createWithProcessors() { ); }); Map> responseProcessors = new HashMap<>(); - responseProcessors.put("fixed_score", (processorFactories, tag, description, config) -> { + responseProcessors.put("fixed_score", (processorFactories, tag, description, config, ctx) -> { float score = ((Number) config.remove("score")).floatValue(); return new FakeResponseProcessor("fixed_score", tag, description, rsp -> rsp.getHits().forEach(h -> h.score(score))); }); Map> searchPhaseProcessors = new HashMap<>(); - searchPhaseProcessors.put("max_score", (processorFactories, tag, description, config) -> { + searchPhaseProcessors.put("max_score", (processorFactories, tag, description, config, context) -> { final float finalScore = config.containsKey("score") ? ((Number) config.remove("score")).floatValue() : 100f; final Consumer querySearchResultConsumer = (result) -> result.queryResult().topDocs().maxScore = finalScore; return new FakeSearchPhaseResultsProcessor("max_score", tag, description, querySearchResultConsumer); @@ -354,19 +352,17 @@ private SearchPipelineService createWithProcessors( this.writableRegistry(), Collections.singletonList(new SearchPipelinePlugin() { @Override - public Map> getRequestProcessors(Processor.Parameters parameters) { + public Map> getRequestProcessors(Parameters parameters) { return requestProcessors; } @Override - public Map> getResponseProcessors(Processor.Parameters parameters) { + public Map> getResponseProcessors(Parameters parameters) { return responseProcessors; } @Override - public Map> getSearchPhaseResultsProcessors( - Processor.Parameters parameters - ) { + public Map> getSearchPhaseResultsProcessors(Parameters parameters) { return phaseProcessors; } @@ -897,10 +893,9 @@ public void testInfo() { } public void testExceptionOnPipelineCreation() { - Map> badFactory = Map.of( - "bad_factory", - (pf, t, f, c) -> { throw new RuntimeException(); } - ); + Map> badFactory = Map.of("bad_factory", (pf, t, f, c, ctx) -> { + throw new RuntimeException(); + }); SearchPipelineService searchPipelineService = createWithProcessors(badFactory, Collections.emptyMap(), Collections.emptyMap()); Map pipelineSourceMap = new HashMap<>(); @@ -920,7 +915,7 @@ public void testExceptionOnRequestProcessing() { }); Map> throwingRequestProcessorFactory = Map.of( "throwing_request", - (pf, t, f, c) -> throwingRequestProcessor + (pf, t, f, c, ctx) -> throwingRequestProcessor ); SearchPipelineService searchPipelineService = createWithProcessors( @@ -945,7 +940,7 @@ public void testExceptionOnResponseProcessing() throws Exception { }); Map> throwingResponseProcessorFactory = Map.of( "throwing_response", - (pf, t, f, c) -> throwingResponseProcessor + (pf, t, f, c, ctx) -> throwingResponseProcessor ); SearchPipelineService searchPipelineService = createWithProcessors( @@ -955,7 +950,7 @@ public void testExceptionOnResponseProcessing() throws Exception { ); Map pipelineSourceMap = new HashMap<>(); - pipelineSourceMap.put(Pipeline.RESPONSE_PROCESSORS_KEY, List.of(Map.of("throwing_response", Collections.emptyMap()))); + pipelineSourceMap.put(Pipeline.RESPONSE_PROCESSORS_KEY, List.of(Map.of("throwing_response", new HashMap<>()))); SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource().size(100).searchPipelineSource(pipelineSourceMap); SearchRequest searchRequest = new SearchRequest().source(sourceBuilder); @@ -973,18 +968,18 @@ public void testStats() throws Exception { }); Map> requestProcessors = Map.of( "successful_request", - (pf, t, f, c) -> new FakeRequestProcessor("successful_request", "2", null, r -> {}), + (pf, t, f, c, ctx) -> new FakeRequestProcessor("successful_request", "2", null, r -> {}), "throwing_request", - (pf, t, f, c) -> throwingRequestProcessor + (pf, t, f, c, ctx) -> throwingRequestProcessor ); SearchResponseProcessor throwingResponseProcessor = new FakeResponseProcessor("throwing_response", "3", null, r -> { throw new RuntimeException(); }); Map> responseProcessors = Map.of( "successful_response", - (pf, t, f, c) -> new FakeResponseProcessor("successful_response", "4", null, r -> {}), + (pf, t, f, c, ctx) -> new FakeResponseProcessor("successful_response", "4", null, r -> {}), "throwing_response", - (pf, t, f, c) -> throwingResponseProcessor + (pf, t, f, c, ctx) -> throwingResponseProcessor ); SearchPipelineService searchPipelineService = createWithProcessors(requestProcessors, responseProcessors, Collections.emptyMap()); @@ -1088,4 +1083,64 @@ private static void assertPipelineStats(OperationStats stats, long count, long f assertEquals(stats.getCount(), count); assertEquals(stats.getFailedCount(), failed); } + + public void testAdHocRejectingProcessor() { + String processorType = "ad_hoc_rejecting"; + Map> requestProcessorFactories = Map.of(processorType, (pf, t, d, c, ctx) -> { + if (ctx.getPipelineSource() == Processor.PipelineSource.SEARCH_REQUEST) { + throw new IllegalArgumentException(processorType + " cannot be created as part of a pipeline defined in a search request"); + } + return new FakeRequestProcessor(processorType, t, d, r -> {}); + }); + + SearchPipelineService searchPipelineService = createWithProcessors( + requestProcessorFactories, + Collections.emptyMap(), + Collections.emptyMap() + ); + + String id = "_id"; + SearchPipelineService.PipelineHolder pipeline = searchPipelineService.getPipelines().get(id); + assertNull(pipeline); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + PutSearchPipelineRequest putRequest = new PutSearchPipelineRequest( + id, + new BytesArray("{\"request_processors\":[" + " { \"" + processorType + "\": {}}" + "]}"), + XContentType.JSON + ); + ClusterState previousClusterState = clusterState; + clusterState = SearchPipelineService.innerPut(putRequest, clusterState); + // The following line successfully creates the pipeline: + searchPipelineService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + + Map pipelineSourceMap = new HashMap<>(); + pipelineSourceMap.put(Pipeline.REQUEST_PROCESSORS_KEY, List.of(Map.of(processorType, Collections.emptyMap()))); + + SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource().searchPipelineSource(pipelineSourceMap); + SearchRequest searchRequest = new SearchRequest().source(sourceBuilder); + expectThrows(SearchPipelineProcessingException.class, () -> searchPipelineService.resolvePipeline(searchRequest)); + } + + public void testExtraParameterInProcessorConfig() { + SearchPipelineService searchPipelineService = createWithProcessors(); + + Map pipelineSourceMap = new HashMap<>(); + Map processorConfig = new HashMap<>( + Map.of("score", 1.0f, "tag", "my_tag", "comment", "I just like to add extra parameters so that I feel like I'm being heard.") + ); + pipelineSourceMap.put(Pipeline.RESPONSE_PROCESSORS_KEY, List.of(Map.of("fixed_score", processorConfig))); + SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource().searchPipelineSource(pipelineSourceMap); + SearchRequest searchRequest = new SearchRequest().source(sourceBuilder); + try { + searchPipelineService.resolvePipeline(searchRequest); + fail("Exception should have been thrown"); + } catch (SearchPipelineProcessingException e) { + assertTrue( + e.getMessage() + .contains("processor [fixed_score:my_tag] doesn't support one or more provided configuration parameters: [comment]") + ); + } catch (Exception e) { + fail("Wrong exception type: " + e.getClass()); + } + } }