From c2a7820b31e7b40223c45ab9deb52a0a8918cc38 Mon Sep 17 00:00:00 2001 From: Gao Binlong Date: Sun, 24 Mar 2024 13:03:51 +0800 Subject: [PATCH 1/3] Fix bulk API ignores ingest pipeline for upsert Signed-off-by: Gao Binlong --- CHANGELOG.md | 1 + .../rest-api-spec/test/ingest/70_bulk.yml | 39 +++++++++++++++++++ .../action/bulk/BulkRequestParser.java | 2 +- .../action/bulk/BulkRequestParserTests.java | 7 ++++ 4 files changed, 48 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index af20332c61146..a118ad4b22ed8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -111,6 +111,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Removed ### Fixed +- Fix bulk API ignores ingest pipeline for upsert ### Security diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/70_bulk.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/70_bulk.yml index 2dfa17174b139..8a1522dc51b70 100644 --- a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/70_bulk.yml +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/70_bulk.yml @@ -144,3 +144,42 @@ teardown: - is_false: _source.field1 - match: {_source.field2: value2} + +# related issue: https://github.com/opensearch-project/OpenSearch/issues/12854 +--- +"Test bulk honors pipeline in update action with upsert": + - skip: + version: " - 2.99.99" + reason: "fixed in 3.0.0" + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "set" : { + "field" : "foo", + "value" : "bar" + } + } + ] + } + - match: { acknowledged: true } + + - do: + bulk: + refresh: true + body: + - '{"update": {"_index": "test_index", "_id": "test_id3", "pipeline": "my_pipeline"}}' + - '{"upsert": {"f1": "v2", "f2": 47}, "doc": {"x": 1}}' + + - match: { errors: false } + - match: { items.0.update.result: created } + + - do: + get: + index: test_index + id: test_id3 + - match: { _source: {"f1": "v2", "f2": 47, "foo": "bar"}} diff --git a/server/src/main/java/org/opensearch/action/bulk/BulkRequestParser.java b/server/src/main/java/org/opensearch/action/bulk/BulkRequestParser.java index 3fadfe5f2cd6a..f705a218fb8e2 100644 --- a/server/src/main/java/org/opensearch/action/bulk/BulkRequestParser.java +++ b/server/src/main/java/org/opensearch/action/bulk/BulkRequestParser.java @@ -371,7 +371,7 @@ public void parse( } IndexRequest upsertRequest = updateRequest.upsertRequest(); if (upsertRequest != null) { - upsertRequest.setPipeline(defaultPipeline); + upsertRequest.setPipeline(pipeline); } updateRequestConsumer.accept(updateRequest); diff --git a/server/src/test/java/org/opensearch/action/bulk/BulkRequestParserTests.java b/server/src/test/java/org/opensearch/action/bulk/BulkRequestParserTests.java index 4f07c098b0869..9625475607911 100644 --- a/server/src/test/java/org/opensearch/action/bulk/BulkRequestParserTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/BulkRequestParserTests.java @@ -112,6 +112,13 @@ public void testUpdateRequest() throws IOException { parser.parse(request, "foo", null, null, null, true, false, MediaTypeRegistry.JSON, req -> fail(), updateRequest -> { assertFalse(updateRequest.isRequireAlias()); }, req -> fail()); + + request = new BytesArray( + "{ \"update\":{ \"_id\": \"bar\", \"require_alias\": false, \"pipeline\": \"testPipeline\" } }\n{\"upsert\": {\"x\": 1}}\n" + ); + parser.parse(request, "foo", null, null, null, true, false, MediaTypeRegistry.JSON, req -> fail(), updateRequest -> { + assertEquals(updateRequest.upsertRequest().getPipeline(), "testPipeline"); + }, req -> fail()); } public void testBarfOnLackOfTrailingNewline() { From 481070fe350ea76da292c959bef8b2cb45a05ed8 Mon Sep 17 00:00:00 2001 From: Gao Binlong Date: Sun, 24 Mar 2024 13:11:40 +0800 Subject: [PATCH 2/3] Modify change log Signed-off-by: Gao Binlong --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a118ad4b22ed8..28cd2af73f109 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -111,7 +111,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Removed ### Fixed -- Fix bulk API ignores ingest pipeline for upsert +- Fix bulk API ignores ingest pipeline for upsert ([#12883](https://github.com/opensearch-project/OpenSearch/pull/12883)) ### Security From 3a1881584d9ac96760963295865e2248fcd4f223 Mon Sep 17 00:00:00 2001 From: Gao Binlong Date: Mon, 25 Mar 2024 20:14:04 +0800 Subject: [PATCH 3/3] Use existing pipeline instead Signed-off-by: Gao Binlong --- .../rest-api-spec/test/ingest/70_bulk.yml | 20 ++----------------- 1 file changed, 2 insertions(+), 18 deletions(-) diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/70_bulk.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/70_bulk.yml index 8a1522dc51b70..d95b1239b1cf2 100644 --- a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/70_bulk.yml +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/70_bulk.yml @@ -151,28 +151,12 @@ teardown: - skip: version: " - 2.99.99" reason: "fixed in 3.0.0" - - do: - ingest.put_pipeline: - id: "my_pipeline" - body: > - { - "description": "_description", - "processors": [ - { - "set" : { - "field" : "foo", - "value" : "bar" - } - } - ] - } - - match: { acknowledged: true } - do: bulk: refresh: true body: - - '{"update": {"_index": "test_index", "_id": "test_id3", "pipeline": "my_pipeline"}}' + - '{"update": {"_index": "test_index", "_id": "test_id3", "pipeline": "pipeline1"}}' - '{"upsert": {"f1": "v2", "f2": 47}, "doc": {"x": 1}}' - match: { errors: false } @@ -182,4 +166,4 @@ teardown: get: index: test_index id: test_id3 - - match: { _source: {"f1": "v2", "f2": 47, "foo": "bar"}} + - match: { _source: {"f1": "v2", "f2": 47, "field1": "value1"}}