From a93ff13fc56545992a0b51ca9f4e1f55259a7a25 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Mon, 6 Jun 2022 14:13:19 -0700 Subject: [PATCH] [Type removal] Ignore _type field in bulk request (#3505) Reverts [Type removal] Remove type from BulkRequestParser and, instead, ignores _type field in bulk request. This enables bulk API bwc with external clients such as Beats and Logstash until a formal REST Version API mechanism is available for OpenSearch core. Signed-off-by: Suraj Singh --- .../test/bulk/11_with_deprecated_types.yml | 137 ++++++++++++++++++ .../action/bulk/BulkIntegrationIT.java | 12 ++ .../opensearch/action/bulk/BulkRequest.java | 6 +- .../action/bulk/BulkRequestParser.java | 34 ++++- .../action/bulk/BulkRequestParserTests.java | 83 ++++++++--- .../bulk/bulk-with-deprecated-types.json | 9 ++ 6 files changed, 251 insertions(+), 30 deletions(-) create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/test/bulk/11_with_deprecated_types.yml create mode 100644 server/src/test/resources/org/opensearch/action/bulk/bulk-with-deprecated-types.json diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/bulk/11_with_deprecated_types.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/bulk/11_with_deprecated_types.yml new file mode 100644 index 0000000000000..14f5ddcf72ff8 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/bulk/11_with_deprecated_types.yml @@ -0,0 +1,137 @@ +--- +"Array of objects": + - do: + bulk: + refresh: true + body: + - index: + _index: test_index + _type: test_type + _id: test_id + - f1: v1 + f2: 42 + - index: + _index: test_index + _type: test_type + _id: test_id2 + - f1: v2 + f2: 47 + + - do: + count: + index: test_index + + - match: {count: 2} + +--- +"Empty _id": + - do: + bulk: + refresh: true + body: + - index: + _index: test + _type: type + _id: '' + - f: 1 + - index: + _index: test + _type: type + _id: id + - f: 2 + - index: + _index: test + _type: type + - f: 3 + - match: { errors: true } + - match: { items.0.index.status: 400 } + - match: { items.0.index.error.type: illegal_argument_exception } + - match: { items.0.index.error.reason: if _id is specified it must not be empty } + - match: { items.1.index.result: created } + - match: { items.2.index.result: created } + + - do: + count: + index: test + + - match: { count: 2 } + +--- +"Empty _id with op_type create": + - skip: + version: " - 7.4.99" + reason: "auto id + op type create only supported since 7.5" + + - do: + bulk: + refresh: true + body: + - index: + _index: test + _type: type + _id: '' + - f: 1 + - index: + _index: test + _type: type + _id: id + - f: 2 + - index: + _index: test + _type: type + - f: 3 + - create: + _index: test + _type: type + - f: 4 + - index: + _index: test + _type: type + op_type: create + - f: 5 + - match: { errors: true } + - match: { items.0.index.status: 400 } + - match: { items.0.index.error.type: illegal_argument_exception } + - match: { items.0.index.error.reason: if _id is specified it must not be empty } + - match: { items.1.index.result: created } + - match: { items.2.index.result: created } + - match: { items.3.create.result: created } + - match: { items.4.create.result: created } + + - do: + count: + index: test + + - match: { count: 4 } + +--- +"empty action": + - skip: + features: headers + + - do: + catch: /Malformed action\/metadata line \[3\], expected FIELD_NAME but found \[END_OBJECT\]/ + headers: + Content-Type: application/json + bulk: + body: | + {"index": {"_index": "test_index", "_type": "test_type", "_id": "test_id"}} + {"f1": "v1", "f2": 42} + {} + +--- +"List of strings": + - do: + bulk: + refresh: true + body: + - '{"index": {"_index": "test_index", "_type": "test_type", "_id": "test_id"}}' + - '{"f1": "v1", "f2": 42}' + - '{"index": {"_index": "test_index", "_type": "test_type", "_id": "test_id2"}}' + - '{"f1": "v2", "f2": 47}' + + - do: + count: + index: test_index + + - match: {count: 2} diff --git a/server/src/internalClusterTest/java/org/opensearch/action/bulk/BulkIntegrationIT.java b/server/src/internalClusterTest/java/org/opensearch/action/bulk/BulkIntegrationIT.java index e2a1363f163da..a2f32c4ddf3c4 100644 --- a/server/src/internalClusterTest/java/org/opensearch/action/bulk/BulkIntegrationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/action/bulk/BulkIntegrationIT.java @@ -170,6 +170,18 @@ public void testBulkWithGlobalDefaults() throws Exception { } } + // Todo: This test is added to verify type support in bulk action. This should be removed once all clients + // avoid sending this param. + // https://github.com/opensearch-project/OpenSearch/issues/3484 + public void testBulkWithTypes() throws Exception { + String bulkAction = copyToStringFromClasspath("/org/opensearch/action/bulk/bulk-with-deprecated-types.json"); + { + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(bulkAction.getBytes(StandardCharsets.UTF_8), 0, bulkAction.length(), null, XContentType.JSON); + assertThat(bulkRequest.numberOfActions(), equalTo(5)); + } + } + private void createSamplePipeline(String pipelineId) throws IOException, ExecutionException, InterruptedException { XContentBuilder pipeline = jsonBuilder().startObject() .startArray("processors") diff --git a/server/src/main/java/org/opensearch/action/bulk/BulkRequest.java b/server/src/main/java/org/opensearch/action/bulk/BulkRequest.java index 3af4227bf46ca..162f3295b9f3a 100644 --- a/server/src/main/java/org/opensearch/action/bulk/BulkRequest.java +++ b/server/src/main/java/org/opensearch/action/bulk/BulkRequest.java @@ -287,7 +287,9 @@ public BulkRequest add( String routing = valueOrDefault(defaultRouting, globalRouting); String pipeline = valueOrDefault(defaultPipeline, globalPipeline); Boolean requireAlias = valueOrDefault(defaultRequireAlias, globalRequireAlias); - new BulkRequestParser().parse( + // https://github.com/opensearch-project/OpenSearch/issues/3484 + // Undo error on types which breaks compatibility with some external clients + new BulkRequestParser(false).parse( data, defaultIndex, routing, @@ -296,7 +298,7 @@ public BulkRequest add( requireAlias, allowExplicitIndex, xContentType, - this::internalAdd, + (indexRequest, type) -> internalAdd(indexRequest), this::internalAdd, this::add ); diff --git a/server/src/main/java/org/opensearch/action/bulk/BulkRequestParser.java b/server/src/main/java/org/opensearch/action/bulk/BulkRequestParser.java index 212450515b57e..675905cc60e75 100644 --- a/server/src/main/java/org/opensearch/action/bulk/BulkRequestParser.java +++ b/server/src/main/java/org/opensearch/action/bulk/BulkRequestParser.java @@ -53,6 +53,7 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; @@ -66,6 +67,7 @@ public final class BulkRequestParser { private static final ParseField INDEX = new ParseField("_index"); + private static final ParseField TYPE = new ParseField("_type"); private static final ParseField ID = new ParseField("_id"); private static final ParseField ROUTING = new ParseField("routing"); private static final ParseField OP_TYPE = new ParseField("op_type"); @@ -78,6 +80,17 @@ public final class BulkRequestParser { private static final ParseField IF_PRIMARY_TERM = new ParseField("if_primary_term"); private static final ParseField REQUIRE_ALIAS = new ParseField(DocWriteRequest.REQUIRE_ALIAS); + // TODO: Remove this parameter once the BulkMonitoring endpoint has been removed + private final boolean errorOnType; + + /** + * Create a new parser. + * @param errorOnType whether to allow _type information in the index line; used by BulkMonitoring + */ + public BulkRequestParser(boolean errorOnType) { + this.errorOnType = errorOnType; + } + private static int findNextMarker(byte marker, int from, BytesReference data) { final int res = data.indexOf(marker, from); if (res != -1) { @@ -123,7 +136,7 @@ public void parse( @Nullable Boolean defaultRequireAlias, boolean allowExplicitIndex, XContentType xContentType, - Consumer indexRequestConsumer, + BiConsumer indexRequestConsumer, Consumer updateRequestConsumer, Consumer deleteRequestConsumer ) throws IOException { @@ -179,6 +192,7 @@ public void parse( String action = parser.currentName(); String index = defaultIndex; + String type = null; String id = null; String routing = defaultRouting; FetchSourceContext fetchSourceContext = defaultFetchSourceContext; @@ -191,7 +205,7 @@ public void parse( String pipeline = defaultPipeline; boolean requireAlias = defaultRequireAlias != null && defaultRequireAlias; - // at this stage, next token can either be END_OBJECT (and use default index with auto generated id) + // at this stage, next token can either be END_OBJECT (and use default index and type, with auto generated id) // or START_OBJECT which will have another set of parameters token = parser.nextToken(); @@ -206,6 +220,13 @@ public void parse( throw new IllegalArgumentException("explicit index in bulk is not allowed"); } index = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity()); + } else if (TYPE.match(currentFieldName, parser.getDeprecationHandler())) { + if (errorOnType) { + throw new IllegalArgumentException( + "Action/metadata line [" + line + "] contains an unknown parameter [" + currentFieldName + "]" + ); + } + type = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity()); } else if (ID.match(currentFieldName, parser.getDeprecationHandler())) { id = parser.text(); } else if (ROUTING.match(currentFieldName, parser.getDeprecationHandler())) { @@ -301,7 +322,8 @@ public void parse( .setIfSeqNo(ifSeqNo) .setIfPrimaryTerm(ifPrimaryTerm) .source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType) - .setRequireAlias(requireAlias) + .setRequireAlias(requireAlias), + type ); } else { indexRequestConsumer.accept( @@ -314,7 +336,8 @@ public void parse( .setIfSeqNo(ifSeqNo) .setIfPrimaryTerm(ifPrimaryTerm) .source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType) - .setRequireAlias(requireAlias) + .setRequireAlias(requireAlias), + type ); } } else if ("create".equals(action)) { @@ -328,7 +351,8 @@ public void parse( .setIfSeqNo(ifSeqNo) .setIfPrimaryTerm(ifPrimaryTerm) .source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType) - .setRequireAlias(requireAlias) + .setRequireAlias(requireAlias), + type ); } else if ("update".equals(action)) { if (version != Versions.MATCH_ANY || versionType != VersionType.INTERNAL) { diff --git a/server/src/test/java/org/opensearch/action/bulk/BulkRequestParserTests.java b/server/src/test/java/org/opensearch/action/bulk/BulkRequestParserTests.java index d3da77112408b..ae4fc5fbd1fa4 100644 --- a/server/src/test/java/org/opensearch/action/bulk/BulkRequestParserTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/BulkRequestParserTests.java @@ -47,9 +47,9 @@ public class BulkRequestParserTests extends OpenSearchTestCase { public void testIndexRequest() throws IOException { BytesArray request = new BytesArray("{ \"index\":{ \"_id\": \"bar\" } }\n{}\n"); - BulkRequestParser parser = new BulkRequestParser(); + BulkRequestParser parser = new BulkRequestParser(randomBoolean()); final AtomicBoolean parsed = new AtomicBoolean(); - parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON, indexRequest -> { + parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON, (indexRequest, type) -> { assertFalse(parsed.get()); assertEquals("foo", indexRequest.index()); assertEquals("bar", indexRequest.id()); @@ -67,7 +67,7 @@ public void testIndexRequest() throws IOException { true, false, XContentType.JSON, - indexRequest -> { assertTrue(indexRequest.isRequireAlias()); }, + (indexRequest, type) -> { assertTrue(indexRequest.isRequireAlias()); }, req -> fail(), req -> fail() ); @@ -82,7 +82,7 @@ public void testIndexRequest() throws IOException { null, false, XContentType.JSON, - indexRequest -> { assertTrue(indexRequest.isRequireAlias()); }, + (indexRequest, type) -> { assertTrue(indexRequest.isRequireAlias()); }, req -> fail(), req -> fail() ); @@ -97,7 +97,7 @@ public void testIndexRequest() throws IOException { true, false, XContentType.JSON, - indexRequest -> { assertFalse(indexRequest.isRequireAlias()); }, + (indexRequest, type) -> { assertFalse(indexRequest.isRequireAlias()); }, req -> fail(), req -> fail() ); @@ -105,22 +105,34 @@ public void testIndexRequest() throws IOException { public void testDeleteRequest() throws IOException { BytesArray request = new BytesArray("{ \"delete\":{ \"_id\": \"bar\" } }\n"); - BulkRequestParser parser = new BulkRequestParser(); + BulkRequestParser parser = new BulkRequestParser(randomBoolean()); final AtomicBoolean parsed = new AtomicBoolean(); - parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON, req -> fail(), req -> fail(), deleteRequest -> { - assertFalse(parsed.get()); - assertEquals("foo", deleteRequest.index()); - assertEquals("bar", deleteRequest.id()); - parsed.set(true); - }); + parser.parse( + request, + "foo", + null, + null, + null, + null, + false, + XContentType.JSON, + (req, type) -> fail(), + req -> fail(), + deleteRequest -> { + assertFalse(parsed.get()); + assertEquals("foo", deleteRequest.index()); + assertEquals("bar", deleteRequest.id()); + parsed.set(true); + } + ); assertTrue(parsed.get()); } public void testUpdateRequest() throws IOException { BytesArray request = new BytesArray("{ \"update\":{ \"_id\": \"bar\" } }\n{}\n"); - BulkRequestParser parser = new BulkRequestParser(); + BulkRequestParser parser = new BulkRequestParser(randomBoolean()); final AtomicBoolean parsed = new AtomicBoolean(); - parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON, req -> fail(), updateRequest -> { + parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON, (req, type) -> fail(), updateRequest -> { assertFalse(parsed.get()); assertEquals("foo", updateRequest.index()); assertEquals("bar", updateRequest.id()); @@ -138,7 +150,7 @@ public void testUpdateRequest() throws IOException { true, false, XContentType.JSON, - req -> fail(), + (req, type) -> fail(), updateRequest -> { assertTrue(updateRequest.isRequireAlias()); }, req -> fail() ); @@ -153,7 +165,7 @@ public void testUpdateRequest() throws IOException { null, false, XContentType.JSON, - req -> fail(), + (req, type) -> fail(), updateRequest -> { assertTrue(updateRequest.isRequireAlias()); }, req -> fail() ); @@ -168,7 +180,7 @@ public void testUpdateRequest() throws IOException { true, false, XContentType.JSON, - req -> fail(), + (req, type) -> fail(), updateRequest -> { assertFalse(updateRequest.isRequireAlias()); }, req -> fail() ); @@ -176,7 +188,7 @@ public void testUpdateRequest() throws IOException { public void testBarfOnLackOfTrailingNewline() { BytesArray request = new BytesArray("{ \"index\":{ \"_id\": \"bar\" } }\n{}"); - BulkRequestParser parser = new BulkRequestParser(); + BulkRequestParser parser = new BulkRequestParser(randomBoolean()); IllegalArgumentException e = expectThrows( IllegalArgumentException.class, () -> parser.parse( @@ -188,7 +200,7 @@ public void testBarfOnLackOfTrailingNewline() { null, false, XContentType.JSON, - indexRequest -> fail(), + (indexRequest, type) -> fail(), req -> fail(), req -> fail() ) @@ -198,21 +210,46 @@ public void testBarfOnLackOfTrailingNewline() { public void testFailOnExplicitIndex() { BytesArray request = new BytesArray("{ \"index\":{ \"_index\": \"foo\", \"_id\": \"bar\" } }\n{}\n"); - BulkRequestParser parser = new BulkRequestParser(); + BulkRequestParser parser = new BulkRequestParser(randomBoolean()); IllegalArgumentException ex = expectThrows( IllegalArgumentException.class, - () -> parser.parse(request, null, null, null, null, null, false, XContentType.JSON, req -> fail(), req -> fail(), req -> fail()) + () -> parser.parse( + request, + null, + null, + null, + null, + null, + false, + XContentType.JSON, + (req, type) -> fail(), + req -> fail(), + req -> fail() + ) ); assertEquals("explicit index in bulk is not allowed", ex.getMessage()); } + public void testTypesStillParsedForExternalClients() throws IOException { + BytesArray request = new BytesArray("{ \"index\":{ \"_type\": \"quux\", \"_id\": \"bar\" } }\n{}\n"); + BulkRequestParser parser = new BulkRequestParser(false); + final AtomicBoolean parsed = new AtomicBoolean(); + parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON, (indexRequest, type) -> { + assertFalse(parsed.get()); + assertEquals("foo", indexRequest.index()); + assertEquals("bar", indexRequest.id()); + parsed.set(true); + }, req -> fail(), req -> fail()); + assertTrue(parsed.get()); + } + public void testParseDeduplicatesParameterStrings() throws IOException { BytesArray request = new BytesArray( "{ \"index\":{ \"_index\": \"bar\", \"pipeline\": \"foo\", \"routing\": \"blub\"} }\n{}\n" + "{ \"index\":{ \"_index\": \"bar\", \"pipeline\": \"foo\", \"routing\": \"blub\" } }\n{}\n" ); - BulkRequestParser parser = new BulkRequestParser(); + BulkRequestParser parser = new BulkRequestParser(randomBoolean()); final List indexRequests = new ArrayList<>(); parser.parse( request, @@ -223,7 +260,7 @@ public void testParseDeduplicatesParameterStrings() throws IOException { null, true, XContentType.JSON, - indexRequest -> indexRequests.add(indexRequest), + (indexRequest, type) -> indexRequests.add(indexRequest), req -> fail(), req -> fail() ); diff --git a/server/src/test/resources/org/opensearch/action/bulk/bulk-with-deprecated-types.json b/server/src/test/resources/org/opensearch/action/bulk/bulk-with-deprecated-types.json new file mode 100644 index 0000000000000..47ac3353e2d75 --- /dev/null +++ b/server/src/test/resources/org/opensearch/action/bulk/bulk-with-deprecated-types.json @@ -0,0 +1,9 @@ +{"index":{"_index":"logstash-2014.03.30", "_type":"logs"}} +{"message":"in24.inetnebr.com--[01/Aug/1995:00:00:01-0400]\"GET/shuttle/missions/sts-68/news/sts-68-mcc-05.txtHTTP/1.0\"2001839","@version":"1","@timestamp":"2014-03-30T12:38:10.048Z","host":["romeo","in24.inetnebr.com"],"monthday":1,"month":8,"year":1995,"time":"00:00:01","tz":"-0400","request":"\"GET/shuttle/missions/sts-68/news/sts-68-mcc-05.txtHTTP/1.0\"","httpresponse":"200","size":1839,"rtime":"1995-08-01T00:00:01.000Z"} +{ "index":{"_index":"test","_type":"type1","_id":"1"} } +{ "field1" : "value1" } +{ "delete" : { "_index" : "test", "_type" : "type1", "_id" : "2" } } +{ "create" : { "_index" : "test", "_type" : "type1", "_id" : "3" } } +{ "field1" : "value3" } +{ "update" : { "_id" : "0", "_type" : "type1", "_index" : "index1" } } +{ "doc" : {"field" : "updated_value"} }