Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline ingest processor supports ignore_missing_pipeline parameter #12476

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add toString methods to MultiSearchRequest, MultiGetRequest and CreateIndexRequest ([#12163](https://github.com/opensearch-project/OpenSearch/pull/12163))
- Support for returning scores in matched queries ([#11626](https://github.com/opensearch-project/OpenSearch/pull/11626))
- Add shard id property to SearchLookup for use in field types provided by plugins ([#1063](https://github.com/opensearch-project/OpenSearch/pull/1063))
- Pipeline ingest processor supports ignore_missing_pipeline parameter ([#12476](https://github.com/opensearch-project/OpenSearch/pull/12476))

### Dependencies
- Bump `peter-evans/find-comment` from 2 to 3 ([#12288](https://github.com/opensearch-project/OpenSearch/pull/12288))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,3 +280,70 @@ teardown:
- match: { _source.pipelines.0: "pipeline1" }
- match: { _source.pipelines.1: "another_pipeline" }
- match: { _source.pipelines.2: "another_pipeline2" }

---
"Test Pipeline Processor with ignore_missing_pipeline":
- skip:
version: " - 2.12.99"
reason: "introduced in 2.13.0"

- do:
ingest.put_pipeline:
id: "outer"
body: >
{
"description" : "outer pipeline",
"processors" : [
{
"pipeline" : {
"name": "non-existent",
"ignore_missing_pipeline": true
}
}
]
}
- match: { acknowledged: true }

- do:
index:
index: test
id: 1
pipeline: "outer"
body: {
foo: "bar"
}

- do:
get:
index: test
id: 1
- match: { _source: { "foo": "bar" } }

- do:
ingest.put_pipeline:
id: "outer"
body: >
{
"description" : "outer pipeline",
"processors" : [
{
"pipeline" : {
"name": "non-existent",
"ignore_missing_pipeline": false
}
}
]
}
- match: { acknowledged: true }

- do:
catch: /illegal_state_exception/
index:
index: test
id: 1
pipeline: "outer"
body: {
foo: "bar"
}
- match: { error.root_cause.0.type: "illegal_state_exception" }
- match: { error.root_cause.0.reason: "Pipeline processor configured for non-existent pipeline [non-existent]" }
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,19 @@ public class PipelineProcessor extends AbstractProcessor {

private final TemplateScript.Factory pipelineTemplate;
private final IngestService ingestService;

PipelineProcessor(String tag, String description, TemplateScript.Factory pipelineTemplate, IngestService ingestService) {
private final boolean ignoreMissingPipeline;

PipelineProcessor(
String tag,
String description,
TemplateScript.Factory pipelineTemplate,
IngestService ingestService,
boolean ignoreMissingPipeline
) {
super(tag, description);
this.pipelineTemplate = pipelineTemplate;
this.ingestService = ingestService;
this.ignoreMissingPipeline = ignoreMissingPipeline;
}

@Override
Expand All @@ -61,11 +69,13 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
Pipeline pipeline = ingestService.getPipeline(pipelineName);
if (pipeline != null) {
ingestDocument.executePipeline(pipeline, handler);
} else {
} else if (!ignoreMissingPipeline) {
handler.accept(
null,
new IllegalStateException("Pipeline processor configured for non-existent pipeline [" + pipelineName + ']')
);
} else {
handler.accept(ingestDocument, null);
}
}

Expand All @@ -91,6 +101,10 @@ TemplateScript.Factory getPipelineTemplate() {
return pipelineTemplate;
}

boolean isIgnoreMissingPipeline() {
return ignoreMissingPipeline;
}

/**
* Factory for the processor.
*
Expand Down Expand Up @@ -118,7 +132,14 @@ public PipelineProcessor create(
"name",
ingestService.getScriptService()
);
return new PipelineProcessor(processorTag, description, pipelineTemplate, ingestService);
boolean ignoreMissingPipeline = ConfigurationUtils.readBooleanProperty(
TYPE,
processorTag,
config,
"ignore_missing_pipeline",
false
);
return new PipelineProcessor(processorTag, description, pipelineTemplate, ingestService, ignoreMissingPipeline);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.ingest;

import org.opensearch.OpenSearchParseException;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Before;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import static org.opensearch.ingest.PipelineProcessorTests.createIngestService;
import static org.hamcrest.CoreMatchers.equalTo;

public class PipelineProcessorFactoryTests extends OpenSearchTestCase {
private PipelineProcessor.Factory factory;

@Before
public void init() {
factory = new PipelineProcessor.Factory(createIngestService());
}

public void testCreate() throws Exception {
Map<String, Object> config = new HashMap<>();
boolean ignoreMissingPipeline = randomBoolean();
config.put("name", "pipeline_name");
config.put("ignore_missing_pipeline", ignoreMissingPipeline);
String processorTag = randomAlphaOfLength(10);
PipelineProcessor pipelineProcessor = factory.create(null, processorTag, null, config);
assertThat(pipelineProcessor.getTag(), equalTo(processorTag));
assertThat(pipelineProcessor.getPipelineTemplate().newInstance(Collections.emptyMap()).execute(), equalTo("pipeline_name"));
assertThat(pipelineProcessor.isIgnoreMissingPipeline(), equalTo(ignoreMissingPipeline));
}

public void testCreateNoPipelinePresent() throws Exception {
Map<String, Object> config = new HashMap<>();
try {
factory.create(null, null, null, config);
fail("factory create should have failed");
} catch (OpenSearchParseException e) {
assertThat(e.getMessage(), equalTo("[name] required property is missing"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,21 @@ public void testThrowsOnMissingPipeline() throws Exception {
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
Map<String, Object> config = new HashMap<>();
config.put("name", "missingPipelineId");
if (randomBoolean()) {
config.put("ignore_missing_pipeline", false);
}
IllegalStateException[] e = new IllegalStateException[1];
factory.create(Collections.emptyMap(), null, null, config)
.execute(testIngestDocument, (result, e1) -> e[0] = (IllegalStateException) e1);
assertEquals("Pipeline processor configured for non-existent pipeline [missingPipelineId]", e[0].getMessage());

IllegalStateException[] exceptions = new IllegalStateException[1];
config = new HashMap<>();
config.put("name", "missingPipelineId");
config.put("ignore_missing_pipeline", true);
factory.create(Collections.emptyMap(), null, null, config)
.execute(testIngestDocument, (result, e1) -> exceptions[0] = (IllegalStateException) e1);
assertEquals(exceptions[0], null);
}

public void testThrowsOnRecursivePipelineInvocations() throws Exception {
Expand Down Expand Up @@ -241,7 +252,7 @@ public String getType() {
});
if (i < (numPipelines - 1)) {
TemplateScript.Factory pipelineName = new TestTemplateService.MockTemplateScript.Factory(Integer.toString(i + 1));
processors.add(new PipelineProcessor(null, null, pipelineName, ingestService));
processors.add(new PipelineProcessor(null, null, pipelineName, ingestService, false));
}

Pipeline pipeline = new Pipeline(pipelineId, null, null, new CompoundProcessor(false, processors, Collections.emptyList()));
Expand Down
Loading