Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Add template snippets support for field and target_field in KV ingest processor #11300

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Streaming Indexing] Introduce new experimental server HTTP transport based on Netty 4 and Project Reactor (Reactor Netty) ([#9672](https://github.com/opensearch-project/OpenSearch/pull/9672))
- Add back half_float BKD based sort query optimization ([#11024](https://github.com/opensearch-project/OpenSearch/pull/11024))
- Request level coordinator slow logs ([#11246](https://github.com/opensearch-project/OpenSearch/pull/11246))
- Add template snippets support for field and target_field in KV ingest processor ([#10040](https://github.com/opensearch-project/OpenSearch/pull/10040))
- Allowing pipeline processors to access index mapping info by passing ingest service ref as part of the processor factory parameters ([#10307](https://github.com/opensearch-project/OpenSearch/pull/10307))

### Dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@
processors.put(ScriptProcessor.TYPE, new ScriptProcessor.Factory(parameters.scriptService));
processors.put(DotExpanderProcessor.TYPE, new DotExpanderProcessor.Factory());
processors.put(JsonProcessor.TYPE, new JsonProcessor.Factory());
processors.put(KeyValueProcessor.TYPE, new KeyValueProcessor.Factory());
processors.put(KeyValueProcessor.TYPE, new KeyValueProcessor.Factory(parameters.scriptService));

Check warning on line 101 in modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonPlugin.java

View check run for this annotation

Codecov / codecov/patch

modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonPlugin.java#L101

Added line #L101 was not covered by tests
processors.put(URLDecodeProcessor.TYPE, new URLDecodeProcessor.Factory());
processors.put(BytesProcessor.TYPE, new BytesProcessor.Factory());
processors.put(PipelineProcessor.TYPE, new PipelineProcessor.Factory(parameters.ingestService));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,13 @@
package org.opensearch.ingest.common;

import org.opensearch.common.util.set.Sets;
import org.opensearch.core.common.Strings;
import org.opensearch.ingest.AbstractProcessor;
import org.opensearch.ingest.ConfigurationUtils;
import org.opensearch.ingest.IngestDocument;
import org.opensearch.ingest.Processor;
import org.opensearch.script.ScriptService;
import org.opensearch.script.TemplateScript;

import java.util.Collections;
import java.util.List;
Expand All @@ -56,24 +59,24 @@

private static final Pattern STRIP_BRACKETS = Pattern.compile("(^[\\(\\[<\"'])|([\\]\\)>\"']$)");

private final String field;
private final TemplateScript.Factory field;
private final String fieldSplit;
private final String valueSplit;
private final Set<String> includeKeys;
private final Set<String> excludeKeys;
private final String targetField;
private final TemplateScript.Factory targetField;
private final boolean ignoreMissing;
private final Consumer<IngestDocument> execution;

KeyValueProcessor(
String tag,
String description,
String field,
TemplateScript.Factory field,
String fieldSplit,
String valueSplit,
Set<String> includeKeys,
Set<String> excludeKeys,
String targetField,
TemplateScript.Factory targetField,
boolean ignoreMissing,
String trimKey,
String trimValue,
Expand Down Expand Up @@ -106,10 +109,10 @@
private static Consumer<IngestDocument> buildExecution(
String fieldSplit,
String valueSplit,
String field,
TemplateScript.Factory field,
Set<String> includeKeys,
Set<String> excludeKeys,
String targetField,
TemplateScript.Factory targetField,
boolean ignoreMissing,
String trimKey,
String trimValue,
Expand All @@ -130,41 +133,62 @@
keyFilter = key -> includeKeys.contains(key) && excludeKeys.contains(key) == false;
}
}
final String fieldPathPrefix;
String keyPrefix = prefix == null ? "" : prefix;
if (targetField == null) {
fieldPathPrefix = keyPrefix;
} else {
fieldPathPrefix = targetField + "." + keyPrefix;
}
final Function<String, String> keyPrefixer;
if (fieldPathPrefix.isEmpty()) {
keyPrefixer = val -> val;
} else {
keyPrefixer = val -> fieldPathPrefix + val;
}
final Function<String, String[]> fieldSplitter = buildSplitter(fieldSplit, true);
Function<String, String[]> valueSplitter = buildSplitter(valueSplit, false);
final Function<String, String> keyTrimmer = buildTrimmer(trimKey);
final Function<String, String> bracketStrip;
if (stripBrackets) {
bracketStrip = val -> STRIP_BRACKETS.matcher(val).replaceAll("");
} else {
bracketStrip = val -> val;
}
final Function<String, String> valueTrimmer = buildTrimmer(trimValue);

return document -> {
String value = document.getFieldValue(field, String.class, ignoreMissing);
final String fieldPathPrefix;
String keyPrefix = prefix == null ? "" : prefix;
if (targetField != null) {
String targetFieldPath = document.renderTemplate(targetField);
if (!Strings.isNullOrEmpty((targetFieldPath))) {
fieldPathPrefix = targetFieldPath + "." + keyPrefix;
} else {
fieldPathPrefix = keyPrefix;

Check warning on line 145 in modules/ingest-common/src/main/java/org/opensearch/ingest/common/KeyValueProcessor.java

View check run for this annotation

Codecov / codecov/patch

modules/ingest-common/src/main/java/org/opensearch/ingest/common/KeyValueProcessor.java#L145

Added line #L145 was not covered by tests
}
} else {
fieldPathPrefix = keyPrefix;
}

final Function<String, String> keyPrefixer;
if (fieldPathPrefix.isEmpty()) {
keyPrefixer = val -> val;
} else {
keyPrefixer = val -> fieldPathPrefix + val;
}
final Function<String, String[]> fieldSplitter = buildSplitter(fieldSplit, true);
Function<String, String[]> valueSplitter = buildSplitter(valueSplit, false);
final Function<String, String> keyTrimmer = buildTrimmer(trimKey);
final Function<String, String> bracketStrip;
if (stripBrackets) {
bracketStrip = val -> STRIP_BRACKETS.matcher(val).replaceAll("");
} else {
bracketStrip = val -> val;
}
final Function<String, String> valueTrimmer = buildTrimmer(trimValue);

String path = document.renderTemplate(field);
final boolean fieldPathNullOrEmpty = Strings.isNullOrEmpty(path);
if (fieldPathNullOrEmpty || document.hasField(path, true) == false) {
if (ignoreMissing) {
return;
} else if (fieldPathNullOrEmpty) {
throw new IllegalArgumentException("field path cannot be null nor empty");
} else {
throw new IllegalArgumentException("field [" + path + "] doesn't exist");
}
}

String value = document.getFieldValue(path, String.class, ignoreMissing);
if (value == null) {
if (ignoreMissing) {
return;
}
throw new IllegalArgumentException("field [" + field + "] is null, cannot extract key-value pairs.");
throw new IllegalArgumentException("field [" + path + "] is null, cannot extract key-value pairs. ");

Check warning on line 185 in modules/ingest-common/src/main/java/org/opensearch/ingest/common/KeyValueProcessor.java

View check run for this annotation

Codecov / codecov/patch

modules/ingest-common/src/main/java/org/opensearch/ingest/common/KeyValueProcessor.java#L185

Added line #L185 was not covered by tests
}

for (String part : fieldSplitter.apply(value)) {
String[] kv = valueSplitter.apply(part);
if (kv.length != 2) {
throw new IllegalArgumentException("field [" + field + "] does not contain value_split [" + valueSplit + "]");
throw new IllegalArgumentException("field [" + path + "] does not contain value_split [" + valueSplit + "]");
}
String key = keyTrimmer.apply(kv[0]);
if (keyFilter.test(key)) {
Expand Down Expand Up @@ -193,7 +217,7 @@
}
}

String getField() {
TemplateScript.Factory getField() {
return field;
}

Expand All @@ -213,7 +237,7 @@
return excludeKeys;
}

String getTargetField() {
TemplateScript.Factory getTargetField() {
return targetField;
}

Expand Down Expand Up @@ -241,6 +265,12 @@
}

public static class Factory implements Processor.Factory {
private final ScriptService scriptService;

public Factory(ScriptService scriptService) {
this.scriptService = scriptService;
}

@Override
public KeyValueProcessor create(
Map<String, Processor.Factory> registry,
Expand All @@ -249,7 +279,13 @@
Map<String, Object> config
) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
TemplateScript.Factory fieldTemplate = ConfigurationUtils.compileTemplate(TYPE, processorTag, "field", field, scriptService);
String targetField = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "target_field");
TemplateScript.Factory targetFieldTemplate = null;
if (!Strings.isNullOrEmpty(targetField)) {
targetFieldTemplate = ConfigurationUtils.compileTemplate(TYPE, processorTag, "target_field", targetField, scriptService);
}

String fieldSplit = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field_split");
String valueSplit = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "value_split");
String trimKey = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "trim_key");
Expand All @@ -270,12 +306,12 @@
return new KeyValueProcessor(
processorTag,
description,
field,
fieldTemplate,
fieldSplit,
valueSplit,
includeKeys,
excludeKeys,
targetField,
targetFieldTemplate,
ignoreMissing,
trimKey,
trimValue,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
import org.opensearch.OpenSearchException;
import org.opensearch.OpenSearchParseException;
import org.opensearch.common.util.set.Sets;
import org.opensearch.ingest.TestTemplateService;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Before;

import java.util.Arrays;
import java.util.Collections;
Expand All @@ -48,16 +50,22 @@

public class KeyValueProcessorFactoryTests extends OpenSearchTestCase {

private KeyValueProcessor.Factory factory;

@Before
public void init() {
factory = new KeyValueProcessor.Factory(TestTemplateService.instance());
}

public void testCreateWithDefaults() throws Exception {
KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory();
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
config.put("field_split", "&");
config.put("value_split", "=");
String processorTag = randomAlphaOfLength(10);
KeyValueProcessor processor = factory.create(null, processorTag, null, config);
assertThat(processor.getTag(), equalTo(processorTag));
assertThat(processor.getField(), equalTo("field1"));
assertThat(processor.getField().newInstance(Collections.emptyMap()).execute(), equalTo("field1"));
assertThat(processor.getFieldSplit(), equalTo("&"));
assertThat(processor.getValueSplit(), equalTo("="));
assertThat(processor.getIncludeKeys(), is(nullValue()));
Expand All @@ -66,7 +74,6 @@ public void testCreateWithDefaults() throws Exception {
}

public void testCreateWithAllFieldsSet() throws Exception {
KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory();
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
config.put("field_split", "&");
Expand All @@ -78,17 +85,16 @@ public void testCreateWithAllFieldsSet() throws Exception {
String processorTag = randomAlphaOfLength(10);
KeyValueProcessor processor = factory.create(null, processorTag, null, config);
assertThat(processor.getTag(), equalTo(processorTag));
assertThat(processor.getField(), equalTo("field1"));
assertThat(processor.getField().newInstance(Collections.emptyMap()).execute(), equalTo("field1"));
assertThat(processor.getFieldSplit(), equalTo("&"));
assertThat(processor.getValueSplit(), equalTo("="));
assertThat(processor.getIncludeKeys(), equalTo(Sets.newHashSet("a", "b")));
assertThat(processor.getExcludeKeys(), equalTo(Collections.emptySet()));
assertThat(processor.getTargetField(), equalTo("target"));
assertThat(processor.getTargetField().newInstance(Collections.emptyMap()).execute(), equalTo("target"));
assertTrue(processor.isIgnoreMissing());
}

public void testCreateWithMissingField() {
KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory();
Map<String, Object> config = new HashMap<>();
String processorTag = randomAlphaOfLength(10);
OpenSearchException exception = expectThrows(
Expand All @@ -99,7 +105,6 @@ public void testCreateWithMissingField() {
}

public void testCreateWithMissingFieldSplit() {
KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory();
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
String processorTag = randomAlphaOfLength(10);
Expand All @@ -111,7 +116,6 @@ public void testCreateWithMissingFieldSplit() {
}

public void testCreateWithMissingValueSplit() {
KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory();
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
config.put("field_split", "&");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.ingest.IngestDocument;
import org.opensearch.ingest.Processor;
import org.opensearch.ingest.RandomDocumentPicks;
import org.opensearch.ingest.TestTemplateService;
import org.opensearch.test.OpenSearchTestCase;

import java.util.ArrayList;
Expand All @@ -51,7 +52,7 @@

public class KeyValueProcessorTests extends OpenSearchTestCase {

private static final KeyValueProcessor.Factory FACTORY = new KeyValueProcessor.Factory();
private static final KeyValueProcessor.Factory FACTORY = new KeyValueProcessor.Factory(TestTemplateService.instance());

public void test() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
Expand Down Expand Up @@ -123,7 +124,12 @@ public void testMissingField() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap());
Processor processor = createKvProcessor("unknown", "&", "=", null, null, "target", false);
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument));
assertThat(exception.getMessage(), equalTo("field [unknown] not present as part of path [unknown]"));
assertThat(exception.getMessage(), equalTo("field [unknown] doesn't exist"));

// when using template snippet, the resolved field path maybe empty
Processor processorWithEmptyFieldPath = createKvProcessor("", "&", "=", null, null, "target", false);
exception = expectThrows(IllegalArgumentException.class, () -> processorWithEmptyFieldPath.execute(ingestDocument));
assertThat(exception.getMessage(), equalTo("field path cannot be null nor empty"));
}

public void testNullValueWithIgnoreMissing() throws Exception {
Expand Down
Loading
Loading