diff --git a/server/src/main/java/org/opensearch/action/bulk/BulkRequest.java b/server/src/main/java/org/opensearch/action/bulk/BulkRequest.java index 3af4227bf46ca..25b335eae0bf1 100644 --- a/server/src/main/java/org/opensearch/action/bulk/BulkRequest.java +++ b/server/src/main/java/org/opensearch/action/bulk/BulkRequest.java @@ -287,7 +287,7 @@ public BulkRequest add( String routing = valueOrDefault(defaultRouting, globalRouting); String pipeline = valueOrDefault(defaultPipeline, globalPipeline); Boolean requireAlias = valueOrDefault(defaultRequireAlias, globalRequireAlias); - new BulkRequestParser().parse( + new BulkRequestParser(true).parse( data, defaultIndex, routing, @@ -296,7 +296,7 @@ public BulkRequest add( requireAlias, allowExplicitIndex, xContentType, - this::internalAdd, + (indexRequest, type) -> internalAdd(indexRequest), this::internalAdd, this::add ); diff --git a/server/src/main/java/org/opensearch/action/bulk/BulkRequestParser.java b/server/src/main/java/org/opensearch/action/bulk/BulkRequestParser.java index 212450515b57e..675905cc60e75 100644 --- a/server/src/main/java/org/opensearch/action/bulk/BulkRequestParser.java +++ b/server/src/main/java/org/opensearch/action/bulk/BulkRequestParser.java @@ -53,6 +53,7 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; @@ -66,6 +67,7 @@ public final class BulkRequestParser { private static final ParseField INDEX = new ParseField("_index"); + private static final ParseField TYPE = new ParseField("_type"); private static final ParseField ID = new ParseField("_id"); private static final ParseField ROUTING = new ParseField("routing"); private static final ParseField OP_TYPE = new ParseField("op_type"); @@ -78,6 +80,17 @@ public final class BulkRequestParser { private static final ParseField IF_PRIMARY_TERM = new ParseField("if_primary_term"); private static final ParseField REQUIRE_ALIAS = new ParseField(DocWriteRequest.REQUIRE_ALIAS); + // TODO: Remove this parameter once the BulkMonitoring endpoint has been removed + private final boolean errorOnType; + + /** + * Create a new parser. + * @param errorOnType whether to allow _type information in the index line; used by BulkMonitoring + */ + public BulkRequestParser(boolean errorOnType) { + this.errorOnType = errorOnType; + } + private static int findNextMarker(byte marker, int from, BytesReference data) { final int res = data.indexOf(marker, from); if (res != -1) { @@ -123,7 +136,7 @@ public void parse( @Nullable Boolean defaultRequireAlias, boolean allowExplicitIndex, XContentType xContentType, - Consumer indexRequestConsumer, + BiConsumer indexRequestConsumer, Consumer updateRequestConsumer, Consumer deleteRequestConsumer ) throws IOException { @@ -179,6 +192,7 @@ public void parse( String action = parser.currentName(); String index = defaultIndex; + String type = null; String id = null; String routing = defaultRouting; FetchSourceContext fetchSourceContext = defaultFetchSourceContext; @@ -191,7 +205,7 @@ public void parse( String pipeline = defaultPipeline; boolean requireAlias = defaultRequireAlias != null && defaultRequireAlias; - // at this stage, next token can either be END_OBJECT (and use default index with auto generated id) + // at this stage, next token can either be END_OBJECT (and use default index and type, with auto generated id) // or START_OBJECT which will have another set of parameters token = parser.nextToken(); @@ -206,6 +220,13 @@ public void parse( throw new IllegalArgumentException("explicit index in bulk is not allowed"); } index = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity()); + } else if (TYPE.match(currentFieldName, parser.getDeprecationHandler())) { + if (errorOnType) { + throw new IllegalArgumentException( + "Action/metadata line [" + line + "] contains an unknown parameter [" + currentFieldName + "]" + ); + } + type = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity()); } else if (ID.match(currentFieldName, parser.getDeprecationHandler())) { id = parser.text(); } else if (ROUTING.match(currentFieldName, parser.getDeprecationHandler())) { @@ -301,7 +322,8 @@ public void parse( .setIfSeqNo(ifSeqNo) .setIfPrimaryTerm(ifPrimaryTerm) .source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType) - .setRequireAlias(requireAlias) + .setRequireAlias(requireAlias), + type ); } else { indexRequestConsumer.accept( @@ -314,7 +336,8 @@ public void parse( .setIfSeqNo(ifSeqNo) .setIfPrimaryTerm(ifPrimaryTerm) .source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType) - .setRequireAlias(requireAlias) + .setRequireAlias(requireAlias), + type ); } } else if ("create".equals(action)) { @@ -328,7 +351,8 @@ public void parse( .setIfSeqNo(ifSeqNo) .setIfPrimaryTerm(ifPrimaryTerm) .source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType) - .setRequireAlias(requireAlias) + .setRequireAlias(requireAlias), + type ); } else if ("update".equals(action)) { if (version != Versions.MATCH_ANY || versionType != VersionType.INTERNAL) { diff --git a/server/src/test/java/org/opensearch/action/bulk/BulkRequestParserTests.java b/server/src/test/java/org/opensearch/action/bulk/BulkRequestParserTests.java index d3da77112408b..239bb19c5f6ad 100644 --- a/server/src/test/java/org/opensearch/action/bulk/BulkRequestParserTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/BulkRequestParserTests.java @@ -47,9 +47,9 @@ public class BulkRequestParserTests extends OpenSearchTestCase { public void testIndexRequest() throws IOException { BytesArray request = new BytesArray("{ \"index\":{ \"_id\": \"bar\" } }\n{}\n"); - BulkRequestParser parser = new BulkRequestParser(); + BulkRequestParser parser = new BulkRequestParser(randomBoolean()); final AtomicBoolean parsed = new AtomicBoolean(); - parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON, indexRequest -> { + parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON, (indexRequest, type) -> { assertFalse(parsed.get()); assertEquals("foo", indexRequest.index()); assertEquals("bar", indexRequest.id()); @@ -67,7 +67,7 @@ public void testIndexRequest() throws IOException { true, false, XContentType.JSON, - indexRequest -> { assertTrue(indexRequest.isRequireAlias()); }, + (indexRequest, type) -> { assertTrue(indexRequest.isRequireAlias()); }, req -> fail(), req -> fail() ); @@ -82,7 +82,7 @@ public void testIndexRequest() throws IOException { null, false, XContentType.JSON, - indexRequest -> { assertTrue(indexRequest.isRequireAlias()); }, + (indexRequest, type) -> { assertTrue(indexRequest.isRequireAlias()); }, req -> fail(), req -> fail() ); @@ -97,7 +97,7 @@ public void testIndexRequest() throws IOException { true, false, XContentType.JSON, - indexRequest -> { assertFalse(indexRequest.isRequireAlias()); }, + (indexRequest, type) -> { assertFalse(indexRequest.isRequireAlias()); }, req -> fail(), req -> fail() ); @@ -105,22 +105,34 @@ public void testIndexRequest() throws IOException { public void testDeleteRequest() throws IOException { BytesArray request = new BytesArray("{ \"delete\":{ \"_id\": \"bar\" } }\n"); - BulkRequestParser parser = new BulkRequestParser(); + BulkRequestParser parser = new BulkRequestParser(randomBoolean()); final AtomicBoolean parsed = new AtomicBoolean(); - parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON, req -> fail(), req -> fail(), deleteRequest -> { - assertFalse(parsed.get()); - assertEquals("foo", deleteRequest.index()); - assertEquals("bar", deleteRequest.id()); - parsed.set(true); - }); + parser.parse( + request, + "foo", + null, + null, + null, + null, + false, + XContentType.JSON, + (req, type) -> fail(), + req -> fail(), + deleteRequest -> { + assertFalse(parsed.get()); + assertEquals("foo", deleteRequest.index()); + assertEquals("bar", deleteRequest.id()); + parsed.set(true); + } + ); assertTrue(parsed.get()); } public void testUpdateRequest() throws IOException { BytesArray request = new BytesArray("{ \"update\":{ \"_id\": \"bar\" } }\n{}\n"); - BulkRequestParser parser = new BulkRequestParser(); + BulkRequestParser parser = new BulkRequestParser(randomBoolean()); final AtomicBoolean parsed = new AtomicBoolean(); - parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON, req -> fail(), updateRequest -> { + parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON, (req, type) -> fail(), updateRequest -> { assertFalse(parsed.get()); assertEquals("foo", updateRequest.index()); assertEquals("bar", updateRequest.id()); @@ -138,7 +150,7 @@ public void testUpdateRequest() throws IOException { true, false, XContentType.JSON, - req -> fail(), + (req, type) -> fail(), updateRequest -> { assertTrue(updateRequest.isRequireAlias()); }, req -> fail() ); @@ -153,7 +165,7 @@ public void testUpdateRequest() throws IOException { null, false, XContentType.JSON, - req -> fail(), + (req, type) -> fail(), updateRequest -> { assertTrue(updateRequest.isRequireAlias()); }, req -> fail() ); @@ -168,7 +180,7 @@ public void testUpdateRequest() throws IOException { true, false, XContentType.JSON, - req -> fail(), + (req, type) -> fail(), updateRequest -> { assertFalse(updateRequest.isRequireAlias()); }, req -> fail() ); @@ -176,7 +188,7 @@ public void testUpdateRequest() throws IOException { public void testBarfOnLackOfTrailingNewline() { BytesArray request = new BytesArray("{ \"index\":{ \"_id\": \"bar\" } }\n{}"); - BulkRequestParser parser = new BulkRequestParser(); + BulkRequestParser parser = new BulkRequestParser(randomBoolean()); IllegalArgumentException e = expectThrows( IllegalArgumentException.class, () -> parser.parse( @@ -188,7 +200,7 @@ public void testBarfOnLackOfTrailingNewline() { null, false, XContentType.JSON, - indexRequest -> fail(), + (indexRequest, type) -> fail(), req -> fail(), req -> fail() ) @@ -198,21 +210,46 @@ public void testBarfOnLackOfTrailingNewline() { public void testFailOnExplicitIndex() { BytesArray request = new BytesArray("{ \"index\":{ \"_index\": \"foo\", \"_id\": \"bar\" } }\n{}\n"); - BulkRequestParser parser = new BulkRequestParser(); + BulkRequestParser parser = new BulkRequestParser(randomBoolean()); IllegalArgumentException ex = expectThrows( IllegalArgumentException.class, - () -> parser.parse(request, null, null, null, null, null, false, XContentType.JSON, req -> fail(), req -> fail(), req -> fail()) + () -> parser.parse( + request, + null, + null, + null, + null, + null, + false, + XContentType.JSON, + (req, type) -> fail(), + req -> fail(), + req -> fail() + ) ); assertEquals("explicit index in bulk is not allowed", ex.getMessage()); } + public void testTypesStillParsedForBulkMonitoring() throws IOException { + BytesArray request = new BytesArray("{ \"index\":{ \"_type\": \"quux\", \"_id\": \"bar\" } }\n{}\n"); + BulkRequestParser parser = new BulkRequestParser(false); + final AtomicBoolean parsed = new AtomicBoolean(); + parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON, (indexRequest, type) -> { + assertFalse(parsed.get()); + assertEquals("foo", indexRequest.index()); + assertEquals("bar", indexRequest.id()); + parsed.set(true); + }, req -> fail(), req -> fail()); + assertTrue(parsed.get()); + } + public void testParseDeduplicatesParameterStrings() throws IOException { BytesArray request = new BytesArray( "{ \"index\":{ \"_index\": \"bar\", \"pipeline\": \"foo\", \"routing\": \"blub\"} }\n{}\n" + "{ \"index\":{ \"_index\": \"bar\", \"pipeline\": \"foo\", \"routing\": \"blub\" } }\n{}\n" ); - BulkRequestParser parser = new BulkRequestParser(); + BulkRequestParser parser = new BulkRequestParser(randomBoolean()); final List indexRequests = new ArrayList<>(); parser.parse( request, @@ -223,7 +260,7 @@ public void testParseDeduplicatesParameterStrings() throws IOException { null, true, XContentType.JSON, - indexRequest -> indexRequests.add(indexRequest), + (indexRequest, type) -> indexRequests.add(indexRequest), req -> fail(), req -> fail() );