Skip to content

Commit

Permalink
Fix bulk API ignores ingest pipeline for upsert
Browse files Browse the repository at this point in the history
Signed-off-by: Gao Binlong <[email protected]>
  • Loading branch information
gaobinlong committed Mar 24, 2024
1 parent 13604c8 commit c2a7820
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Removed

### Fixed
- Fix bulk API ignores ingest pipeline for upsert

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,42 @@ teardown:

- is_false: _source.field1
- match: {_source.field2: value2}

# related issue: https://github.com/opensearch-project/OpenSearch/issues/12854
---
"Test bulk honors pipeline in update action with upsert":
- skip:
version: " - 2.99.99"
reason: "fixed in 3.0.0"
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"set" : {
"field" : "foo",
"value" : "bar"
}
}
]
}
- match: { acknowledged: true }

- do:
bulk:
refresh: true
body:
- '{"update": {"_index": "test_index", "_id": "test_id3", "pipeline": "my_pipeline"}}'
- '{"upsert": {"f1": "v2", "f2": 47}, "doc": {"x": 1}}'

- match: { errors: false }
- match: { items.0.update.result: created }

- do:
get:
index: test_index
id: test_id3
- match: { _source: {"f1": "v2", "f2": 47, "foo": "bar"}}
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ public void parse(
}
IndexRequest upsertRequest = updateRequest.upsertRequest();
if (upsertRequest != null) {
upsertRequest.setPipeline(defaultPipeline);
upsertRequest.setPipeline(pipeline);
}

updateRequestConsumer.accept(updateRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ public void testUpdateRequest() throws IOException {
parser.parse(request, "foo", null, null, null, true, false, MediaTypeRegistry.JSON, req -> fail(), updateRequest -> {
assertFalse(updateRequest.isRequireAlias());
}, req -> fail());

request = new BytesArray(
"{ \"update\":{ \"_id\": \"bar\", \"require_alias\": false, \"pipeline\": \"testPipeline\" } }\n{\"upsert\": {\"x\": 1}}\n"
);
parser.parse(request, "foo", null, null, null, true, false, MediaTypeRegistry.JSON, req -> fail(), updateRequest -> {
assertEquals(updateRequest.upsertRequest().getPipeline(), "testPipeline");
}, req -> fail());
}

public void testBarfOnLackOfTrailingNewline() {
Expand Down

0 comments on commit c2a7820

Please sign in to comment.