Skip to content

Commit

Permalink
[Search pipelines] Add Global Ignore_failure options for Processors (#…
Browse files Browse the repository at this point in the history
…8373) (#8637)

* Add Global Ingore_failure options for Processors



* add changelog



* Add ignore_failure to 40_rename_response



* Change Boolean to boolean and refactor AbstractProcessor



* rename to isIgnoreFailure and add tests



* rename to isIgnoreFailure and add tests



* add ignoreFailure to runSearchPhaseResultsTransformer



* fix filter query and change log warn message



* Add test on matching each processor stat



* Add test on matching each processor stat



* remove extra spaces and words



* use IGNORE_FAILURE_KEY



---------


(cherry picked from commit 2004ba0)

Signed-off-by: Mingshi Liu <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent aa7172e commit 149be07
Show file tree
Hide file tree
Showing 14 changed files with 440 additions and 127 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Extensions] Support extension additional settings with extension REST initialization ([#8414](https://github.com/opensearch-project/OpenSearch/pull/8414))
- Adds mock implementation for TelemetryPlugin ([#7545](https://github.com/opensearch-project/OpenSearch/issues/7545))
- Create concept of persistent ThreadContext headers that are unstashable ([#8291]()https://github.com/opensearch-project/OpenSearch/pull/8291)
- [Search pipelines] Add Global Ignore_failure options for Processors ([#8373](https://github.com/opensearch-project/OpenSearch/pull/8373))
- Enable Partial Flat Object ([#7997](https://github.com/opensearch-project/OpenSearch/pull/7997))
- Add partial results support for concurrent segment search ([#8306](https://github.com/opensearch-project/OpenSearch/pull/8306))
- Support transport action names when registering NamedRoutes ([#7957](https://github.com/opensearch-project/OpenSearch/pull/7957))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.ingest.ConfigurationUtils;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.AbstractProcessor;
import org.opensearch.search.pipeline.SearchRequestProcessor;

import java.io.InputStream;
Expand Down Expand Up @@ -53,12 +54,13 @@ public String getType() {
/**
* Constructor that takes a filter query.
*
* @param tag processor tag
* @param description processor description
* @param tag processor tag
* @param description processor description
* @param ignoreFailure option to ignore failure
* @param filterQuery the query that will be added as a filter to incoming queries
*/
public FilterQueryRequestProcessor(String tag, String description, QueryBuilder filterQuery) {
super(tag, description);
FilterQueryRequestProcessor(String tag, String description, boolean ignoreFailure, QueryBuilder filterQuery) {
super(tag, description, ignoreFailure);
this.filterQuery = filterQuery;
}

Expand Down Expand Up @@ -101,6 +103,7 @@ public FilterQueryRequestProcessor create(
Map<String, Processor.Factory<SearchRequestProcessor>> processorFactories,
String tag,
String description,
boolean ignoreFailure,
Map<String, Object> config,
PipelineContext pipelineContext
) throws Exception {
Expand All @@ -114,7 +117,7 @@ public FilterQueryRequestProcessor create(
XContentParser parser = XContentType.JSON.xContent()
.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, stream)
) {
return new FilterQueryRequestProcessor(tag, description, parseInnerQueryBuilder(parser));
return new FilterQueryRequestProcessor(tag, description, ignoreFailure, parseInnerQueryBuilder(parser));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.ingest.ConfigurationUtils;
import org.opensearch.search.SearchHit;
import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.AbstractProcessor;
import org.opensearch.search.pipeline.SearchRequestProcessor;
import org.opensearch.search.pipeline.SearchResponseProcessor;

Expand All @@ -41,14 +42,22 @@ public class RenameFieldResponseProcessor extends AbstractProcessor implements S
/**
* Constructor that takes a target field to rename and the new name
*
* @param tag processor tag
* @param description processor description
* @param oldField name of field to be renamed
* @param newField name of field that will replace the old field
* @param tag processor tag
* @param description processor description
* @param ignoreFailure option to ignore failure
* @param oldField name of field to be renamed
* @param newField name of field that will replace the old field
* @param ignoreMissing if true, do not throw error if oldField does not exist within search response
*/
public RenameFieldResponseProcessor(String tag, String description, String oldField, String newField, boolean ignoreMissing) {
super(tag, description);
public RenameFieldResponseProcessor(
String tag,
String description,
boolean ignoreFailure,
String oldField,
String newField,
boolean ignoreMissing
) {
super(tag, description, ignoreFailure);
this.oldField = oldField;
this.newField = newField;
this.ignoreMissing = ignoreMissing;
Expand Down Expand Up @@ -140,13 +149,14 @@ public RenameFieldResponseProcessor create(
Map<String, Processor.Factory<SearchResponseProcessor>> processorFactories,
String tag,
String description,
boolean ignoreFailure,
Map<String, Object> config,
PipelineContext pipelineContext
) throws Exception {
String oldField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "field");
String newField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "target_field");
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "ignore_missing", false);
return new RenameFieldResponseProcessor(tag, description, oldField, newField, ignoreMissing);
return new RenameFieldResponseProcessor(tag, description, ignoreFailure, oldField, newField, ignoreMissing);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.script.ScriptType;
import org.opensearch.script.SearchScript;
import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.AbstractProcessor;
import org.opensearch.search.pipeline.SearchRequestProcessor;
import org.opensearch.search.pipeline.common.helpers.SearchRequestMap;

Expand Down Expand Up @@ -54,18 +55,20 @@ public final class ScriptRequestProcessor extends AbstractProcessor implements S
*
* @param tag The processor's tag.
* @param description The processor's description.
* @param ignoreFailure The option to ignore failure
* @param script The {@link Script} to execute.
* @param precompiledSearchScript The {@link Script} precompiled
* @param scriptService The {@link ScriptService} used to execute the script.
*/
ScriptRequestProcessor(
String tag,
String description,
boolean ignoreFailure,
Script script,
@Nullable SearchScript precompiledSearchScript,
ScriptService scriptService
) {
super(tag, description);
super(tag, description, ignoreFailure);
this.script = script;
this.precompiledSearchScript = precompiledSearchScript;
this.scriptService = scriptService;
Expand Down Expand Up @@ -146,6 +149,7 @@ public ScriptRequestProcessor create(
Map<String, Processor.Factory<SearchRequestProcessor>> registry,
String processorTag,
String description,
boolean ignoreFailure,
Map<String, Object> config,
PipelineContext pipelineContext
) throws Exception {
Expand Down Expand Up @@ -174,7 +178,7 @@ public ScriptRequestProcessor create(
} catch (ScriptException e) {
throw newConfigurationException(TYPE, processorTag, null, e);
}
return new ScriptRequestProcessor(processorTag, description, script, searchScript, scriptService);
return new ScriptRequestProcessor(processorTag, description, ignoreFailure, script, searchScript, scriptService);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class FilterQueryRequestProcessorTests extends AbstractBuilderTestCase {

public void testFilterQuery() throws Exception {
QueryBuilder filterQuery = new TermQueryBuilder("field", "value");
FilterQueryRequestProcessor filterQueryRequestProcessor = new FilterQueryRequestProcessor(null, null, filterQuery);
FilterQueryRequestProcessor filterQueryRequestProcessor = new FilterQueryRequestProcessor(null, null, false, filterQuery);
QueryBuilder incomingQuery = new TermQueryBuilder("text", "foo");
SearchSourceBuilder source = new SearchSourceBuilder().query(incomingQuery);
SearchRequest request = new SearchRequest().source(source);
Expand All @@ -39,13 +39,13 @@ public void testFilterQuery() throws Exception {
public void testFactory() throws Exception {
FilterQueryRequestProcessor.Factory factory = new FilterQueryRequestProcessor.Factory(this.xContentRegistry());
Map<String, Object> configMap = new HashMap<>(Map.of("query", Map.of("term", Map.of("field", "value"))));
FilterQueryRequestProcessor processor = factory.create(Collections.emptyMap(), null, null, configMap, null);
FilterQueryRequestProcessor processor = factory.create(Collections.emptyMap(), null, null, false, configMap, null);
assertEquals(new TermQueryBuilder("field", "value"), processor.filterQuery);

// Missing "query" parameter:
expectThrows(
IllegalArgumentException.class,
() -> factory.create(Collections.emptyMap(), null, null, Collections.emptyMap(), null)
() -> factory.create(Collections.emptyMap(), null, null, false, Collections.emptyMap(), null)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public void testRenameResponse() throws Exception {
RenameFieldResponseProcessor renameFieldResponseProcessor = new RenameFieldResponseProcessor(
null,
null,
false,
"field 0",
"new field",
false
Expand All @@ -74,6 +75,7 @@ public void testRenameResponseWithMapping() throws Exception {
RenameFieldResponseProcessor renameFieldResponseProcessor = new RenameFieldResponseProcessor(
null,
null,
false,
"field 0",
"new field",
true
Expand All @@ -97,6 +99,7 @@ public void testMissingField() throws Exception {
RenameFieldResponseProcessor renameFieldResponseProcessor = new RenameFieldResponseProcessor(
null,
null,
false,
"field",
"new field",
false
Expand All @@ -115,15 +118,15 @@ public void testFactory() throws Exception {
config.put("target_field", newField);

RenameFieldResponseProcessor.Factory factory = new RenameFieldResponseProcessor.Factory();
RenameFieldResponseProcessor processor = factory.create(Collections.emptyMap(), null, null, config, null);
RenameFieldResponseProcessor processor = factory.create(Collections.emptyMap(), null, null, false, config, null);
assertEquals(processor.getType(), "rename_field");
assertEquals(processor.getOldField(), oldField);
assertEquals(processor.getNewField(), newField);
assertFalse(processor.isIgnoreMissing());

expectThrows(
OpenSearchParseException.class,
() -> factory.create(Collections.emptyMap(), null, null, Collections.emptyMap(), null)
() -> factory.create(Collections.emptyMap(), null, null, false, Collections.emptyMap(), null)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void setupScripting() {
}

public void testScriptingWithoutPrecompiledScriptFactory() throws Exception {
ScriptRequestProcessor processor = new ScriptRequestProcessor(randomAlphaOfLength(10), null, script, null, scriptService);
ScriptRequestProcessor processor = new ScriptRequestProcessor(randomAlphaOfLength(10), null, false, script, null, scriptService);
SearchRequest searchRequest = new SearchRequest();
searchRequest.source(createSearchSourceBuilder());

Expand All @@ -92,7 +92,14 @@ public void testScriptingWithoutPrecompiledScriptFactory() throws Exception {
}

public void testScriptingWithPrecompiledIngestScript() throws Exception {
ScriptRequestProcessor processor = new ScriptRequestProcessor(randomAlphaOfLength(10), null, script, searchScript, scriptService);
ScriptRequestProcessor processor = new ScriptRequestProcessor(
randomAlphaOfLength(10),
null,
false,
script,
searchScript,
scriptService
);
SearchRequest searchRequest = new SearchRequest();
searchRequest.source(createSearchSourceBuilder());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,26 @@ teardown:
}
- match: { acknowledged: true }

- do:
search_pipeline.put:
id: "my_pipeline_4"
body: >
{
"description": "test pipeline with ignore missing false and ignore failure true",
"response_processors": [
{
"rename_field":
{
"field": "aa",
"target_field": "b",
"ignore_missing": false,
"ignore_failure": true
}
}
]
}
- match: { acknowledged: true }

- do:
indices.create:
index: test
Expand Down Expand Up @@ -119,15 +139,24 @@ teardown:
- match: { hits.total.value: 1 }
- match: {hits.hits.0._source: { "a": "foo" } }

# Pipeline with ignore_missing set to true
# Should still pass even though index does not contain field
# Pipeline with ignore_missing set to false
# Should throw illegal_argument_exception
- do:
catch: bad_request
search:
index: test
search_pipeline: "my_pipeline_3"
- match: { error.type: "illegal_argument_exception" }

# Pipeline with ignore_missing set to false and ignore_failure set to true
# Should return while catching error
- do:
search:
index: test
search_pipeline: "my_pipeline_4"
- match: { hits.total.value: 1 }
- match: {hits.hits.0._source: { "a": "foo" } }

# No source, using stored_fields
- do:
search:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public final class ConfigurationUtils {

public static final String TAG_KEY = "tag";
public static final String DESCRIPTION_KEY = "description";
public static final String IGNORE_FAILURE_KEY = "ignore_failure";

private ConfigurationUtils() {}

Expand Down Expand Up @@ -194,7 +195,7 @@ public static String readOptionalStringOrIntProperty(
return readStringOrInt(processorType, processorTag, propertyName, value);
}

public static Boolean readBooleanProperty(
public static boolean readBooleanProperty(
String processorType,
String processorTag,
Map<String, Object> configuration,
Expand All @@ -214,7 +215,7 @@ private static Boolean readBoolean(String processorType, String processorTag, St
return null;
}
if (value instanceof Boolean) {
return (Boolean) value;
return (boolean) value;
}
throw newConfigurationException(
processorType,
Expand Down Expand Up @@ -530,10 +531,11 @@ public static Processor readProcessor(
) throws Exception {
String tag = ConfigurationUtils.readOptionalStringProperty(null, null, config, TAG_KEY);
String description = ConfigurationUtils.readOptionalStringProperty(null, tag, config, DESCRIPTION_KEY);
boolean ignoreFailure = ConfigurationUtils.readBooleanProperty(null, null, config, IGNORE_FAILURE_KEY, false);
Script conditionalScript = extractConditional(config);
Processor.Factory factory = processorFactories.get(type);

if (factory != null) {
boolean ignoreFailure = ConfigurationUtils.readBooleanProperty(null, null, config, "ignore_failure", false);
List<Map<String, Object>> onFailureProcessorConfigs = ConfigurationUtils.readOptionalList(
null,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,20 @@
* compatible open source license.
*/

package org.opensearch.search.pipeline.common;

import org.opensearch.search.pipeline.Processor;
package org.opensearch.search.pipeline;

/**
* Base class for common processor behavior.
*/
abstract class AbstractProcessor implements Processor {
public abstract class AbstractProcessor implements Processor {
private final String tag;
private final String description;
private final boolean ignoreFailure;

protected AbstractProcessor(String tag, String description) {
protected AbstractProcessor(String tag, String description, boolean ignoreFailure) {
this.tag = tag;
this.description = description;
this.ignoreFailure = ignoreFailure;
}

@Override
Expand All @@ -31,4 +31,9 @@ public String getTag() {
public String getDescription() {
return description;
}

@Override
public boolean isIgnoreFailure() {
return ignoreFailure;
}
}
Loading

0 comments on commit 149be07

Please sign in to comment.