Skip to content

Commit

Permalink
Add remove_by_pattern ingest processor (opensearch-project#11920)
Browse files Browse the repository at this point in the history
* Add remove_by_pattern ingest processor
* Modify change log
* Remove some duplicated checks
* Add more yml test case
* Fix typo

---------

Signed-off-by: Gao Binlong <[email protected]>
  • Loading branch information
gaobinlong authored and Peter Alfonsi committed Mar 1, 2024
1 parent 3992f91 commit 609b373
Show file tree
Hide file tree
Showing 7 changed files with 555 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Introduce new feature flag "WRITEABLE_REMOTE_INDEX" to gate the writeable remote index functionality ([#11717](https://github.com/opensearch-project/OpenSearch/pull/11170))
- [Tiered caching] Integrating ehcache as a disk cache option ([#11874](https://github.com/opensearch-project/OpenSearch/pull/11874))
- Bump OpenTelemetry from 1.32.0 to 1.34.1 ([#11891](https://github.com/opensearch-project/OpenSearch/pull/11891))
- Add remove_by_pattern ingest processor ([#11920](https://github.com/opensearch-project/OpenSearch/pull/11920))
- Support index level allocation filtering for searchable snapshot index ([#11563](https://github.com/opensearch-project/OpenSearch/pull/11563))
- Add `org.opensearch.rest.MethodHandlers` and `RestController#getAllHandlers` ([11876](https://github.com/opensearch-project/OpenSearch/pull/11876))
- New DateTime format for RFC3339 compatible date fields ([#11465](https://github.com/opensearch-project/OpenSearch/pull/11465))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
processors.put(HtmlStripProcessor.TYPE, new HtmlStripProcessor.Factory());
processors.put(CsvProcessor.TYPE, new CsvProcessor.Factory());
processors.put(CopyProcessor.TYPE, new CopyProcessor.Factory(parameters.scriptService));
processors.put(RemoveByPatternProcessor.TYPE, new RemoveByPatternProcessor.Factory());
return Collections.unmodifiableMap(processors);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.ingest.common;

import org.opensearch.common.Nullable;
import org.opensearch.common.ValidationException;
import org.opensearch.common.regex.Regex;
import org.opensearch.core.common.Strings;
import org.opensearch.ingest.AbstractProcessor;
import org.opensearch.ingest.ConfigurationUtils;
import org.opensearch.ingest.IngestDocument;
import org.opensearch.ingest.Processor;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static org.opensearch.ingest.ConfigurationUtils.newConfigurationException;

/**
* Processor that removes existing fields by field patterns or excluding field patterns.
*/
public final class RemoveByPatternProcessor extends AbstractProcessor {

public static final String TYPE = "remove_by_pattern";
private final List<String> fieldPatterns;
private final List<String> excludeFieldPatterns;

RemoveByPatternProcessor(
String tag,
String description,
@Nullable List<String> fieldPatterns,
@Nullable List<String> excludeFieldPatterns
) {
super(tag, description);
if (fieldPatterns != null && excludeFieldPatterns != null || fieldPatterns == null && excludeFieldPatterns == null) {
throw new IllegalArgumentException("either fieldPatterns and excludeFieldPatterns must be set");
}
if (fieldPatterns == null) {
this.fieldPatterns = null;
this.excludeFieldPatterns = new ArrayList<>(excludeFieldPatterns);
} else {
this.fieldPatterns = new ArrayList<>(fieldPatterns);
this.excludeFieldPatterns = null;
}
}

public List<String> getFieldPatterns() {
return fieldPatterns;
}

public List<String> getExcludeFieldPatterns() {
return excludeFieldPatterns;
}

@Override
public IngestDocument execute(IngestDocument document) {
Set<String> existingFields = new HashSet<>(document.getSourceAndMetadata().keySet());
Set<String> metadataFields = document.getMetadata()
.keySet()
.stream()
.map(IngestDocument.Metadata::getFieldName)
.collect(Collectors.toSet());

if (fieldPatterns != null && !fieldPatterns.isEmpty()) {
existingFields.forEach(field -> {
// ignore metadata fields such as _index, _id, etc.
if (!metadataFields.contains(field)) {
final boolean matched = fieldPatterns.stream().anyMatch(pattern -> Regex.simpleMatch(pattern, field));
if (matched) {
document.removeField(field);
}
}
});
}

if (excludeFieldPatterns != null && !excludeFieldPatterns.isEmpty()) {
existingFields.forEach(field -> {
// ignore metadata fields such as _index, _id, etc.
if (!metadataFields.contains(field)) {
final boolean matched = excludeFieldPatterns.stream().anyMatch(pattern -> Regex.simpleMatch(pattern, field));
if (!matched) {
document.removeField(field);
}
}
});
}

return document;
}

@Override
public String getType() {
return TYPE;
}

public static final class Factory implements Processor.Factory {

public Factory() {}

@Override
public RemoveByPatternProcessor create(
Map<String, Processor.Factory> registry,
String processorTag,
String description,
Map<String, Object> config
) throws Exception {
final List<String> fieldPatterns = new ArrayList<>();
final List<String> excludeFieldPatterns = new ArrayList<>();
final Object fieldPattern = ConfigurationUtils.readOptionalObject(config, "field_pattern");
final Object excludeFieldPattern = ConfigurationUtils.readOptionalObject(config, "exclude_field_pattern");

if (fieldPattern == null && excludeFieldPattern == null || fieldPattern != null && excludeFieldPattern != null) {
throw newConfigurationException(
TYPE,
processorTag,
"field_pattern",
"either field_pattern or exclude_field_pattern must be set"
);
}

if (fieldPattern != null) {
if (fieldPattern instanceof List) {
@SuppressWarnings("unchecked")
List<String> fieldPatternList = (List<String>) fieldPattern;
fieldPatterns.addAll(fieldPatternList);
} else {
fieldPatterns.add((String) fieldPattern);
}
validateFieldPatterns(processorTag, fieldPatterns, "field_pattern");
return new RemoveByPatternProcessor(processorTag, description, fieldPatterns, null);
} else {
if (excludeFieldPattern instanceof List) {
@SuppressWarnings("unchecked")
List<String> excludeFieldPatternList = (List<String>) excludeFieldPattern;
excludeFieldPatterns.addAll(excludeFieldPatternList);
} else {
excludeFieldPatterns.add((String) excludeFieldPattern);
}
validateFieldPatterns(processorTag, excludeFieldPatterns, "exclude_field_pattern");
return new RemoveByPatternProcessor(processorTag, description, null, excludeFieldPatterns);
}
}

private void validateFieldPatterns(String processorTag, List<String> patterns, String patternKey) {
List<String> validationErrors = new ArrayList<>();
for (String fieldPattern : patterns) {
if (fieldPattern.contains("#")) {
validationErrors.add(patternKey + " [" + fieldPattern + "] must not contain a '#'");
}
if (fieldPattern.contains(":")) {
validationErrors.add(patternKey + " [" + fieldPattern + "] must not contain a ':'");
}
if (fieldPattern.startsWith("_")) {
validationErrors.add(patternKey + " [" + fieldPattern + "] must not start with '_'");
}
if (Strings.validFileNameExcludingAstrix(fieldPattern) == false) {
validationErrors.add(
patternKey + " [" + fieldPattern + "] must not contain the following characters " + Strings.INVALID_FILENAME_CHARS
);
}
}

if (validationErrors.size() > 0) {
ValidationException validationException = new ValidationException();
validationException.addValidationErrors(validationErrors);
throw newConfigurationException(TYPE, processorTag, patternKey, validationException.getMessage());
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.ingest.common;

import org.opensearch.OpenSearchException;
import org.opensearch.OpenSearchParseException;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Before;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.hamcrest.CoreMatchers.equalTo;

public class RemoveByPatternProcessorFactoryTests extends OpenSearchTestCase {

private RemoveByPatternProcessor.Factory factory;

@Before
public void init() {
factory = new RemoveByPatternProcessor.Factory();
}

public void testCreateFieldPatterns() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put("field_pattern", "field1*");
String processorTag = randomAlphaOfLength(10);
RemoveByPatternProcessor removeByPatternProcessor = factory.create(null, processorTag, null, config);
assertThat(removeByPatternProcessor.getTag(), equalTo(processorTag));
assertThat(removeByPatternProcessor.getFieldPatterns().get(0), equalTo("field1*"));

Map<String, Object> config2 = new HashMap<>();
config2.put("field_pattern", List.of("field1*", "field2*"));
removeByPatternProcessor = factory.create(null, processorTag, null, config2);
assertThat(removeByPatternProcessor.getTag(), equalTo(processorTag));
assertThat(removeByPatternProcessor.getFieldPatterns().get(0), equalTo("field1*"));
assertThat(removeByPatternProcessor.getFieldPatterns().get(1), equalTo("field2*"));

Map<String, Object> config3 = new HashMap<>();
List<String> patterns = Arrays.asList("foo*", "*", " ", ",", "#", ":", "_");
config3.put("field_pattern", patterns);
Exception exception = expectThrows(OpenSearchParseException.class, () -> factory.create(null, processorTag, null, config3));
assertThat(
exception.getMessage(),
equalTo(
"[field_pattern] Validation Failed: "
+ "1: field_pattern [ ] must not contain the following characters [ , \", *, \\, <, |, ,, >, /, ?];"
+ "2: field_pattern [,] must not contain the following characters [ , \", *, \\, <, |, ,, >, /, ?];"
+ "3: field_pattern [#] must not contain a '#';"
+ "4: field_pattern [:] must not contain a ':';"
+ "5: field_pattern [_] must not start with '_';"
)
);
}

public void testCreateExcludeFieldPatterns() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put("exclude_field_pattern", "field1*");
String processorTag = randomAlphaOfLength(10);
RemoveByPatternProcessor removeByPatternProcessor = factory.create(null, processorTag, null, config);
assertThat(removeByPatternProcessor.getTag(), equalTo(processorTag));
assertThat(removeByPatternProcessor.getExcludeFieldPatterns().get(0), equalTo("field1*"));

Map<String, Object> config2 = new HashMap<>();
config2.put("exclude_field_pattern", List.of("field1*", "field2*"));
removeByPatternProcessor = factory.create(null, processorTag, null, config2);
assertThat(removeByPatternProcessor.getTag(), equalTo(processorTag));
assertThat(removeByPatternProcessor.getExcludeFieldPatterns().get(0), equalTo("field1*"));
assertThat(removeByPatternProcessor.getExcludeFieldPatterns().get(1), equalTo("field2*"));

Map<String, Object> config3 = new HashMap<>();
List<String> patterns = Arrays.asList("foo*", "*", " ", ",", "#", ":", "_");
config3.put("exclude_field_pattern", patterns);
Exception exception = expectThrows(OpenSearchParseException.class, () -> factory.create(null, processorTag, null, config3));
assertThat(
exception.getMessage(),
equalTo(
"[exclude_field_pattern] Validation Failed: "
+ "1: exclude_field_pattern [ ] must not contain the following characters [ , \", *, \\, <, |, ,, >, /, ?];"
+ "2: exclude_field_pattern [,] must not contain the following characters [ , \", *, \\, <, |, ,, >, /, ?];"
+ "3: exclude_field_pattern [#] must not contain a '#';"
+ "4: exclude_field_pattern [:] must not contain a ':';"
+ "5: exclude_field_pattern [_] must not start with '_';"
)
);
}

public void testCreatePatternsFailed() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put("field_pattern", List.of("foo*"));
config.put("exclude_field_pattern", List.of("bar*"));
String processorTag = randomAlphaOfLength(10);
OpenSearchException exception = expectThrows(
OpenSearchParseException.class,
() -> factory.create(null, processorTag, null, config)
);
assertThat(exception.getMessage(), equalTo("[field_pattern] either field_pattern or exclude_field_pattern must be set"));

Map<String, Object> config2 = new HashMap<>();
config2.put("field_pattern", null);
config2.put("exclude_field_pattern", null);

exception = expectThrows(OpenSearchParseException.class, () -> factory.create(null, processorTag, null, config2));
assertThat(exception.getMessage(), equalTo("[field_pattern] either field_pattern or exclude_field_pattern must be set"));
}
}
Loading

0 comments on commit 609b373

Please sign in to comment.