Skip to content

Commit

Permalink
Add copy ingest processor
Browse files Browse the repository at this point in the history
Signed-off-by: Gao Binlong <[email protected]>
  • Loading branch information
gaobinlong committed Jan 12, 2024
1 parent 5c82ab8 commit d8edaf3
Show file tree
Hide file tree
Showing 8 changed files with 780 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add additional handling in SearchTemplateRequest when simulate is set to true ([#11591](https://github.com/opensearch-project/OpenSearch/pull/11591))
- Introduce cluster level setting `cluster.index.restrict.replication.type` to prevent replication type setting override during index creations([#11583](https://github.com/opensearch-project/OpenSearch/pull/11583))
- Add match_only_text field that is optimized for storage by trading off positional queries performance ([#6836](https://github.com/opensearch-project/OpenSearch/pull/11039))
- Add copy ingest processor
- Introduce new feature flag "WRITEABLE_REMOTE_INDEX" to gate the writeable remote index functionality ([#11717](https://github.com/opensearch-project/OpenSearch/pull/11170))

### Dependencies
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.ingest.common;

import org.opensearch.core.common.Strings;
import org.opensearch.ingest.AbstractProcessor;
import org.opensearch.ingest.ConfigurationUtils;
import org.opensearch.ingest.IngestDocument;
import org.opensearch.ingest.Processor;
import org.opensearch.script.ScriptService;
import org.opensearch.script.TemplateScript;

import java.util.Map;

public final class CopyProcessor extends AbstractProcessor {
public static final String TYPE = "copy";

private final TemplateScript.Factory sourceField;
private final TemplateScript.Factory targetField;

private final boolean ignoreMissing;

private final boolean removeSource;

private final boolean overrideTarget;

CopyProcessor(String tag, String description, TemplateScript.Factory sourceField, TemplateScript.Factory targetField) {
this(tag, description, sourceField, targetField, false, false, false);
}

CopyProcessor(
String tag,
String description,
TemplateScript.Factory sourceField,
TemplateScript.Factory targetField,
boolean ignoreMissing,
boolean removeSource,
boolean overrideTarget
) {
super(tag, description);
this.sourceField = sourceField;
this.targetField = targetField;
this.ignoreMissing = ignoreMissing;
this.removeSource = removeSource;
this.overrideTarget = overrideTarget;
}

public TemplateScript.Factory getSourceField() {
return sourceField;
}

public TemplateScript.Factory getTargetField() {
return targetField;
}

public boolean isIgnoreMissing() {
return ignoreMissing;
}

public boolean isRemoveSource() {
return removeSource;
}

public boolean isOverrideTarget() {
return overrideTarget;
}

@Override
public IngestDocument execute(IngestDocument document) {
String source = document.renderTemplate(sourceField);
final boolean sourceFieldPathIsNullOrEmpty = Strings.isNullOrEmpty(source);
if (sourceFieldPathIsNullOrEmpty || document.hasField(source, true) == false) {
if (ignoreMissing) {
return document;
} else if (sourceFieldPathIsNullOrEmpty) {
throw new IllegalArgumentException("source field path cannot be null nor empty");
} else {
throw new IllegalArgumentException("source field [" + source + "] doesn't exist");
}
}

String target = document.renderTemplate(targetField);
if (Strings.isNullOrEmpty(target)) {
throw new IllegalArgumentException("target field path cannot be null nor empty");
}
if (source.equals(target)) {
throw new IllegalArgumentException("source field path and target field path cannot be same");
}

if (overrideTarget || document.hasField(target, true) == false || document.getFieldValue(target, Object.class) == null) {
Object sourceValue = document.getFieldValue(source, Object.class);
document.setFieldValue(target, IngestDocument.deepCopy(sourceValue));
} else {
throw new IllegalArgumentException("target field [" + target + "] already exists");
}

if (removeSource) {
document.removeField(source);
}

return document;
}

@Override
public String getType() {
return TYPE;
}

public static final class Factory implements Processor.Factory {

private final ScriptService scriptService;

public Factory(ScriptService scriptService) {
this.scriptService = scriptService;
}

@Override
public CopyProcessor create(
Map<String, Processor.Factory> registry,
String processorTag,
String description,
Map<String, Object> config
) throws Exception {
String sourceField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "source_field");
TemplateScript.Factory sourceFieldTemplate = ConfigurationUtils.compileTemplate(
TYPE,
processorTag,
"source_field",
sourceField,
scriptService
);
String targetField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "target_field");
TemplateScript.Factory targetFieldTemplate = ConfigurationUtils.compileTemplate(
TYPE,
processorTag,
"target_field",
targetField,
scriptService
);
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false);
boolean removeSource = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "remove_source", false);
boolean overrideTarget = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "override_target", false);

return new CopyProcessor(
processorTag,
description,
sourceFieldTemplate,
targetFieldTemplate,
ignoreMissing,
removeSource,
overrideTarget
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
processors.put(DropProcessor.TYPE, new DropProcessor.Factory());
processors.put(HtmlStripProcessor.TYPE, new HtmlStripProcessor.Factory());
processors.put(CsvProcessor.TYPE, new CsvProcessor.Factory());
processors.put(CopyProcessor.TYPE, new CopyProcessor.Factory(parameters.scriptService));
return Collections.unmodifiableMap(processors);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.ingest.common;

import org.opensearch.OpenSearchException;
import org.opensearch.OpenSearchParseException;
import org.opensearch.ingest.TestTemplateService;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Before;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import static org.hamcrest.CoreMatchers.equalTo;

public class CopyProcessorFactoryTests extends OpenSearchTestCase {

private CopyProcessor.Factory factory;

@Before
public void init() {
factory = new CopyProcessor.Factory(TestTemplateService.instance());
}

public void testCreate() throws Exception {
boolean ignoreMissing = randomBoolean();
boolean removeSource = randomBoolean();
boolean overrideTarget = randomBoolean();
Map<String, Object> config = new HashMap<>();
config.put("source_field", "source");
config.put("target_field", "target");
config.put("ignore_missing", ignoreMissing);
config.put("remove_source", removeSource);
config.put("override_target", overrideTarget);
String processorTag = randomAlphaOfLength(10);
CopyProcessor copyProcessor = factory.create(null, processorTag, null, config);
assertThat(copyProcessor.getTag(), equalTo(processorTag));
assertThat(copyProcessor.getSourceField().newInstance(Collections.emptyMap()).execute(), equalTo("source"));
assertThat(copyProcessor.getTargetField().newInstance(Collections.emptyMap()).execute(), equalTo("target"));
assertThat(copyProcessor.isIgnoreMissing(), equalTo(ignoreMissing));
assertThat(copyProcessor.isRemoveSource(), equalTo(removeSource));
assertThat(copyProcessor.isOverrideTarget(), equalTo(overrideTarget));
}

public void testCreateWithSourceField() throws Exception {
Map<String, Object> config = new HashMap<>();
try {
factory.create(null, null, null, config);
fail("factory create should have failed");
} catch (OpenSearchParseException e) {
assertThat(e.getMessage(), equalTo("[source_field] required property is missing"));
}

config.put("source_field", null);
try {
factory.create(null, null, null, config);
fail("factory create should have failed");
} catch (OpenSearchParseException e) {
assertThat(e.getMessage(), equalTo("[source_field] required property is missing"));
}
}

public void testCreateWithTargetField() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put("source_field", "source");
try {
factory.create(null, null, null, config);
fail("factory create should have failed");
} catch (OpenSearchParseException e) {
assertThat(e.getMessage(), equalTo("[target_field] required property is missing"));
}

config.put("source_field", "source");
config.put("target_field", null);
try {
factory.create(null, null, null, config);
fail("factory create should have failed");
} catch (OpenSearchParseException e) {
assertThat(e.getMessage(), equalTo("[target_field] required property is missing"));
}
}

public void testInvalidMustacheTemplate() throws Exception {
CopyProcessor.Factory factory = new CopyProcessor.Factory(TestTemplateService.instance(true));
Map<String, Object> config = new HashMap<>();
config.put("source_field", "{{source}}");
config.put("target_field", "target");
String processorTag = randomAlphaOfLength(10);
OpenSearchException exception = expectThrows(OpenSearchException.class, () -> factory.create(null, processorTag, null, config));
assertThat(exception.getMessage(), equalTo("java.lang.RuntimeException: could not compile script"));
assertThat(exception.getMetadata("opensearch.processor_tag").get(0), equalTo(processorTag));
}

}
Loading

0 comments on commit d8edaf3

Please sign in to comment.