From 7016e57b7d7534f25a5f74abe7e3b9621e9f3f4a Mon Sep 17 00:00:00 2001 From: gaobinlong Date: Wed, 17 Jan 2024 21:14:10 +0800 Subject: [PATCH] Remove ingest processor supports excluding fields (#10967) * Remove ingest processor supports field patterns and excluding fields Signed-off-by: Gao Binlong * Format some code Signed-off-by: Gao Binlong * Fix test failure Signed-off-by: Gao Binlong * Remove the code of field pattern Signed-off-by: Gao Binlong * Add skip version in rest test yml Signed-off-by: Gao Binlong * Make fields and exclude_fields mutually exclusive when constructing RemoveProcessor Signed-off-by: Gao Binlong --------- Signed-off-by: Gao Binlong (cherry picked from commit 5dd4b61e83d9bbbdfd7fd490de4e8336bcc8e4f8) --- CHANGELOG.md | 3 + .../ingest/common/RemoveProcessor.java | 165 +++++++++++++----- .../common/RemoveProcessorFactoryTests.java | 38 ++-- .../ingest/common/RemoveProcessorTests.java | 46 +++++ .../test/ingest/290_remove_processor.yml | 40 +++++ .../opensearch/ingest/ConfigurationUtils.java | 8 + 6 files changed, 246 insertions(+), 54 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 842953015eab8..c11dc0515757b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add search query categorizor ([#10255](https://github.com/opensearch-project/OpenSearch/pull/10255)) - Per request phase latency ([#10351](https://github.com/opensearch-project/OpenSearch/issues/10351)) - Add cluster state stats ([#10670](https://github.com/opensearch-project/OpenSearch/pull/10670)) +- Remove ingest processor supports excluding fields ([#10967](https://github.com/opensearch-project/OpenSearch/pull/10967)) +- [Tiered caching] Enabling serialization for IndicesRequestCache key object ([#10275](https://github.com/opensearch-project/OpenSearch/pull/10275)) +- [Tiered caching] Defining interfaces, listeners and extending IndicesRequestCache with Tiered cache support ([#10753](https://github.com/opensearch-project/OpenSearch/pull/10753)) - [Remote cluster state] Restore cluster state version during remote state auto restore ([#10853](https://github.com/opensearch-project/OpenSearch/pull/10853)) - Update the indexRandom function to create more segments for concurrent search tests ([10247](https://github.com/opensearch-project/OpenSearch/pull/10247)) - Add support for query profiler with concurrent aggregation ([#9248](https://github.com/opensearch-project/OpenSearch/pull/9248)) diff --git a/modules/ingest-common/src/main/java/org/opensearch/ingest/common/RemoveProcessor.java b/modules/ingest-common/src/main/java/org/opensearch/ingest/common/RemoveProcessor.java index a48cfd87b78c3..d01dce02fca31 100644 --- a/modules/ingest-common/src/main/java/org/opensearch/ingest/common/RemoveProcessor.java +++ b/modules/ingest-common/src/main/java/org/opensearch/ingest/common/RemoveProcessor.java @@ -32,6 +32,7 @@ package org.opensearch.ingest.common; +import org.opensearch.common.Nullable; import org.opensearch.core.common.Strings; import org.opensearch.index.VersionType; import org.opensearch.ingest.AbstractProcessor; @@ -42,11 +43,15 @@ import org.opensearch.script.TemplateScript; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.stream.Collectors; +import static org.opensearch.ingest.ConfigurationUtils.newConfigurationException; + /** * Processor that removes existing fields. Nothing happens if the field is not present. */ @@ -55,11 +60,28 @@ public final class RemoveProcessor extends AbstractProcessor { public static final String TYPE = "remove"; private final List fields; + private final List excludeFields; private final boolean ignoreMissing; - RemoveProcessor(String tag, String description, List fields, boolean ignoreMissing) { + RemoveProcessor( + String tag, + String description, + @Nullable List fields, + @Nullable List excludeFields, + boolean ignoreMissing + ) { super(tag, description); - this.fields = new ArrayList<>(fields); + if (fields == null && excludeFields == null || fields != null && excludeFields != null) { + throw new IllegalArgumentException("ether fields and excludeFields must be set"); + } + if (fields != null) { + this.fields = new ArrayList<>(fields); + this.excludeFields = null; + } else { + this.fields = null; + this.excludeFields = new ArrayList<>(excludeFields); + } + this.ignoreMissing = ignoreMissing; } @@ -67,42 +89,76 @@ public List getFields() { return fields; } + public List getExcludeFields() { + return excludeFields; + } + @Override public IngestDocument execute(IngestDocument document) { - fields.forEach(field -> { - String path = document.renderTemplate(field); - final boolean fieldPathIsNullOrEmpty = Strings.isNullOrEmpty(path); - if (fieldPathIsNullOrEmpty || document.hasField(path) == false) { - if (ignoreMissing) { - return; - } else if (fieldPathIsNullOrEmpty) { - throw new IllegalArgumentException("field path cannot be null nor empty"); - } else { - throw new IllegalArgumentException("field [" + path + "] doesn't exist"); + if (fields != null && !fields.isEmpty()) { + fields.forEach(field -> { + String path = document.renderTemplate(field); + final boolean fieldPathIsNullOrEmpty = Strings.isNullOrEmpty(path); + if (fieldPathIsNullOrEmpty || document.hasField(path) == false) { + if (ignoreMissing) { + return; + } else if (fieldPathIsNullOrEmpty) { + throw new IllegalArgumentException("field path cannot be null nor empty"); + } else { + throw new IllegalArgumentException("field [" + path + "] doesn't exist"); + } } - } - // cannot remove _index, _version and _version_type. - if (path.equals(IngestDocument.Metadata.INDEX.getFieldName()) - || path.equals(IngestDocument.Metadata.VERSION.getFieldName()) - || path.equals(IngestDocument.Metadata.VERSION_TYPE.getFieldName())) { - throw new IllegalArgumentException("cannot remove metadata field [" + path + "]"); - } - // removing _id is disallowed when there's an external version specified in the request - if (path.equals(IngestDocument.Metadata.ID.getFieldName()) - && document.hasField(IngestDocument.Metadata.VERSION_TYPE.getFieldName())) { - String versionType = document.getFieldValue(IngestDocument.Metadata.VERSION_TYPE.getFieldName(), String.class); - if (!Objects.equals(versionType, VersionType.toString(VersionType.INTERNAL))) { - Long version = document.getFieldValue(IngestDocument.Metadata.VERSION.getFieldName(), Long.class, true); - throw new IllegalArgumentException( - "cannot remove metadata field [_id] when specifying external version for the document, version: " - + version - + ", version_type: " - + versionType - ); + + // cannot remove _index, _version and _version_type. + if (path.equals(IngestDocument.Metadata.INDEX.getFieldName()) + || path.equals(IngestDocument.Metadata.VERSION.getFieldName()) + || path.equals(IngestDocument.Metadata.VERSION_TYPE.getFieldName())) { + throw new IllegalArgumentException("cannot remove metadata field [" + path + "]"); } + // removing _id is disallowed when there's an external version specified in the request + if (path.equals(IngestDocument.Metadata.ID.getFieldName()) + && document.hasField(IngestDocument.Metadata.VERSION_TYPE.getFieldName())) { + String versionType = document.getFieldValue(IngestDocument.Metadata.VERSION_TYPE.getFieldName(), String.class); + if (!Objects.equals(versionType, VersionType.toString(VersionType.INTERNAL))) { + Long version = document.getFieldValue(IngestDocument.Metadata.VERSION.getFieldName(), Long.class, true); + throw new IllegalArgumentException( + "cannot remove metadata field [_id] when specifying external version for the document, version: " + + version + + ", version_type: " + + versionType + ); + } + } + document.removeField(path); + }); + } + + if (excludeFields != null && !excludeFields.isEmpty()) { + Set excludeFieldSet = new HashSet<>(); + excludeFields.forEach(field -> { + String path = document.renderTemplate(field); + // ignore the empty or null field path + if (!Strings.isNullOrEmpty(path)) { + excludeFieldSet.add(path); + } + }); + + if (!excludeFieldSet.isEmpty()) { + Set existingFields = new HashSet<>(document.getSourceAndMetadata().keySet()); + Set metadataFields = document.getMetadata() + .keySet() + .stream() + .map(IngestDocument.Metadata::getFieldName) + .collect(Collectors.toSet()); + existingFields.forEach(field -> { + // ignore metadata fields such as _index, _id, etc. + if (!metadataFields.contains(field) && !excludeFieldSet.contains(field)) { + document.removeField(field); + } + }); } - document.removeField(path); - }); + } + return document; } @@ -127,20 +183,41 @@ public RemoveProcessor create( Map config ) throws Exception { final List fields = new ArrayList<>(); - final Object field = ConfigurationUtils.readObject(TYPE, processorTag, config, "field"); - if (field instanceof List) { - @SuppressWarnings("unchecked") - List stringList = (List) field; - fields.addAll(stringList); - } else { - fields.add((String) field); + final List excludeFields = new ArrayList<>(); + final Object field = ConfigurationUtils.readOptionalObject(config, "field"); + final Object excludeField = ConfigurationUtils.readOptionalObject(config, "exclude_field"); + + if (field == null && excludeField == null || field != null && excludeField != null) { + throw newConfigurationException(TYPE, processorTag, "field", "ether field or exclude_field must be set"); } - final List compiledTemplates = fields.stream() - .map(f -> ConfigurationUtils.compileTemplate(TYPE, processorTag, "field", f, scriptService)) - .collect(Collectors.toList()); boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false); - return new RemoveProcessor(processorTag, description, compiledTemplates, ignoreMissing); + + if (field != null) { + if (field instanceof List) { + @SuppressWarnings("unchecked") + List stringList = (List) field; + fields.addAll(stringList); + } else { + fields.add((String) field); + } + List fieldCompiledTemplates = fields.stream() + .map(f -> ConfigurationUtils.compileTemplate(TYPE, processorTag, "field", f, scriptService)) + .collect(Collectors.toList()); + return new RemoveProcessor(processorTag, description, fieldCompiledTemplates, null, ignoreMissing); + } else { + if (excludeField instanceof List) { + @SuppressWarnings("unchecked") + List stringList = (List) excludeField; + excludeFields.addAll(stringList); + } else { + excludeFields.add((String) excludeField); + } + List excludeFieldCompiledTemplates = excludeFields.stream() + .map(f -> ConfigurationUtils.compileTemplate(TYPE, processorTag, "exclude_field", f, scriptService)) + .collect(Collectors.toList()); + return new RemoveProcessor(processorTag, description, null, excludeFieldCompiledTemplates, ignoreMissing); + } } } } diff --git a/modules/ingest-common/src/test/java/org/opensearch/ingest/common/RemoveProcessorFactoryTests.java b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/RemoveProcessorFactoryTests.java index 66ca888a0d39f..179aef2feac0c 100644 --- a/modules/ingest-common/src/test/java/org/opensearch/ingest/common/RemoveProcessorFactoryTests.java +++ b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/RemoveProcessorFactoryTests.java @@ -41,6 +41,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -79,16 +80,6 @@ public void testCreateMultipleFields() throws Exception { ); } - public void testCreateMissingField() throws Exception { - Map config = new HashMap<>(); - try { - factory.create(null, null, null, config); - fail("factory create should have failed"); - } catch (OpenSearchParseException e) { - assertThat(e.getMessage(), equalTo("[field] required property is missing")); - } - } - public void testInvalidMustacheTemplate() throws Exception { RemoveProcessor.Factory factory = new RemoveProcessor.Factory(TestTemplateService.instance(true)); Map config = new HashMap<>(); @@ -98,4 +89,31 @@ public void testInvalidMustacheTemplate() throws Exception { assertThat(exception.getMessage(), equalTo("java.lang.RuntimeException: could not compile script")); assertThat(exception.getMetadata("opensearch.processor_tag").get(0), equalTo(processorTag)); } + + public void testCreateWithExcludeField() throws Exception { + Map config = new HashMap<>(); + String processorTag = randomAlphaOfLength(10); + OpenSearchException exception = expectThrows( + OpenSearchParseException.class, + () -> factory.create(null, processorTag, null, config) + ); + assertThat(exception.getMessage(), equalTo("[field] ether field or exclude_field must be set")); + + Map config2 = new HashMap<>(); + config2.put("field", "field1"); + config2.put("exclude_field", "field2"); + exception = expectThrows(OpenSearchParseException.class, () -> factory.create(null, processorTag, null, config2)); + assertThat(exception.getMessage(), equalTo("[field] ether field or exclude_field must be set")); + + Map config6 = new HashMap<>(); + config6.put("exclude_field", "exclude_field"); + RemoveProcessor removeProcessor = factory.create(null, processorTag, null, config6); + assertThat( + removeProcessor.getExcludeFields() + .stream() + .map(template -> template.newInstance(Collections.emptyMap()).execute()) + .collect(Collectors.toList()), + equalTo(List.of("exclude_field")) + ); + } } diff --git a/modules/ingest-common/src/test/java/org/opensearch/ingest/common/RemoveProcessorTests.java b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/RemoveProcessorTests.java index c138ad606d2e5..78a3d36124d45 100644 --- a/modules/ingest-common/src/test/java/org/opensearch/ingest/common/RemoveProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/RemoveProcessorTests.java @@ -38,8 +38,10 @@ import org.opensearch.ingest.Processor; import org.opensearch.ingest.RandomDocumentPicks; import org.opensearch.ingest.TestTemplateService; +import org.opensearch.script.TemplateScript; import org.opensearch.test.OpenSearchTestCase; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -57,12 +59,28 @@ public void testRemoveFields() throws Exception { randomAlphaOfLength(10), null, Collections.singletonList(new TestTemplateService.MockTemplateScript.Factory(field)), + null, false ); processor.execute(ingestDocument); assertThat(ingestDocument.hasField(field), equalTo(false)); } + public void testRemoveByExcludeFields() throws Exception { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); + ingestDocument.setFieldValue("foo_1", "value"); + ingestDocument.setFieldValue("foo_2", "value"); + ingestDocument.setFieldValue("foo_3", "value"); + List excludeFields = new ArrayList<>(); + excludeFields.add(new TestTemplateService.MockTemplateScript.Factory("foo_1")); + excludeFields.add(new TestTemplateService.MockTemplateScript.Factory("foo_2")); + Processor processor = new RemoveProcessor(randomAlphaOfLength(10), null, null, excludeFields, false); + processor.execute(ingestDocument); + assertThat(ingestDocument.hasField("foo_1"), equalTo(true)); + assertThat(ingestDocument.hasField("foo_2"), equalTo(true)); + assertThat(ingestDocument.hasField("foo_3"), equalTo(false)); + } + public void testRemoveNonExistingField() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); String fieldName = RandomDocumentPicks.randomFieldName(random()); @@ -183,6 +201,34 @@ public void testRemoveMetadataField() throws Exception { } } + public void testCreateRemoveProcessorWithBothFieldsAndExcludeFields() throws Exception { + assertThrows( + "ether fields and excludeFields must be set", + IllegalArgumentException.class, + () -> new RemoveProcessor(randomAlphaOfLength(10), null, null, null, false) + ); + + final List fields; + if (randomBoolean()) { + fields = new ArrayList<>(); + } else { + fields = List.of(new TestTemplateService.MockTemplateScript.Factory("foo_1")); + } + + final List excludeFields; + if (randomBoolean()) { + excludeFields = new ArrayList<>(); + } else { + excludeFields = List.of(new TestTemplateService.MockTemplateScript.Factory("foo_2")); + } + + assertThrows( + "ether fields and excludeFields must be set", + IllegalArgumentException.class, + () -> new RemoveProcessor(randomAlphaOfLength(10), null, fields, excludeFields, false) + ); + } + public void testRemoveDocumentId() throws Exception { Map config = new HashMap<>(); config.put("field", IngestDocument.Metadata.ID.getFieldName()); diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/290_remove_processor.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/290_remove_processor.yml index 6668b468f8edc..e120a865052b0 100644 --- a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/290_remove_processor.yml +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/290_remove_processor.yml @@ -319,3 +319,43 @@ teardown: } - match: { docs.0.error.type: "illegal_argument_exception" } - match: { docs.0.error.reason: "cannot remove metadata field [_id] when specifying external version for the document, version: 1, version_type: external_gte" } + +# Related issue: https://github.com/opensearch-project/OpenSearch/issues/1578 +--- +"Test remove processor with exclude_field": + - skip: + version: " - 2.11.99" + reason: "exclude_field is introduced in 2.12" + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "remove" : { + "exclude_field": "bar" + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + id: 1 + pipeline: "my_pipeline" + body: { + foo1: "bar", + foo2: "bar", + bar: "zoo", + zoo: "bar" + } + + - do: + get: + index: test + id: 1 + - match: { _source: { bar: "zoo"}} diff --git a/server/src/main/java/org/opensearch/ingest/ConfigurationUtils.java b/server/src/main/java/org/opensearch/ingest/ConfigurationUtils.java index 5185b740d90cb..a2c2137130587 100644 --- a/server/src/main/java/org/opensearch/ingest/ConfigurationUtils.java +++ b/server/src/main/java/org/opensearch/ingest/ConfigurationUtils.java @@ -387,6 +387,7 @@ private static Map readMap(String processorType, String processor /** * Returns and removes the specified property as an {@link Object} from the specified configuration map. + * If the property is missing an {@link OpenSearchParseException} is thrown */ public static Object readObject(String processorType, String processorTag, Map configuration, String propertyName) { Object value = configuration.remove(propertyName); @@ -396,6 +397,13 @@ public static Object readObject(String processorType, String processorTag, Map configuration, String propertyName) { + return configuration.remove(propertyName); + } + public static OpenSearchException newConfigurationException( String processorType, String processorTag,