Skip to content

Commit

Permalink
Revert [Type removal] Remove type from BulkRequestParser
Browse files Browse the repository at this point in the history
Signed-off-by: Suraj Singh <[email protected]>
  • Loading branch information
dreamer-89 committed Jun 3, 2022
1 parent 253891e commit 06d9129
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ public BulkRequest add(
String routing = valueOrDefault(defaultRouting, globalRouting);
String pipeline = valueOrDefault(defaultPipeline, globalPipeline);
Boolean requireAlias = valueOrDefault(defaultRequireAlias, globalRequireAlias);
new BulkRequestParser().parse(
new BulkRequestParser(true).parse(
data,
defaultIndex,
routing,
Expand All @@ -296,7 +296,7 @@ public BulkRequest add(
requireAlias,
allowExplicitIndex,
xContentType,
this::internalAdd,
(indexRequest, type) -> internalAdd(indexRequest),
this::internalAdd,
this::add
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;

Expand All @@ -66,6 +67,7 @@
public final class BulkRequestParser {

private static final ParseField INDEX = new ParseField("_index");
private static final ParseField TYPE = new ParseField("_type");
private static final ParseField ID = new ParseField("_id");
private static final ParseField ROUTING = new ParseField("routing");
private static final ParseField OP_TYPE = new ParseField("op_type");
Expand All @@ -78,6 +80,17 @@ public final class BulkRequestParser {
private static final ParseField IF_PRIMARY_TERM = new ParseField("if_primary_term");
private static final ParseField REQUIRE_ALIAS = new ParseField(DocWriteRequest.REQUIRE_ALIAS);

// TODO: Remove this parameter once the BulkMonitoring endpoint has been removed
private final boolean errorOnType;

/**
* Create a new parser.
* @param errorOnType whether to allow _type information in the index line; used by BulkMonitoring
*/
public BulkRequestParser(boolean errorOnType) {
this.errorOnType = errorOnType;
}

private static int findNextMarker(byte marker, int from, BytesReference data) {
final int res = data.indexOf(marker, from);
if (res != -1) {
Expand Down Expand Up @@ -123,7 +136,7 @@ public void parse(
@Nullable Boolean defaultRequireAlias,
boolean allowExplicitIndex,
XContentType xContentType,
Consumer<IndexRequest> indexRequestConsumer,
BiConsumer<IndexRequest, String> indexRequestConsumer,
Consumer<UpdateRequest> updateRequestConsumer,
Consumer<DeleteRequest> deleteRequestConsumer
) throws IOException {
Expand Down Expand Up @@ -179,6 +192,7 @@ public void parse(
String action = parser.currentName();

String index = defaultIndex;
String type = null;
String id = null;
String routing = defaultRouting;
FetchSourceContext fetchSourceContext = defaultFetchSourceContext;
Expand All @@ -191,7 +205,7 @@ public void parse(
String pipeline = defaultPipeline;
boolean requireAlias = defaultRequireAlias != null && defaultRequireAlias;

// at this stage, next token can either be END_OBJECT (and use default index with auto generated id)
// at this stage, next token can either be END_OBJECT (and use default index and type, with auto generated id)
// or START_OBJECT which will have another set of parameters
token = parser.nextToken();

Expand All @@ -206,6 +220,13 @@ public void parse(
throw new IllegalArgumentException("explicit index in bulk is not allowed");
}
index = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
} else if (TYPE.match(currentFieldName, parser.getDeprecationHandler())) {
if (errorOnType) {
throw new IllegalArgumentException(
"Action/metadata line [" + line + "] contains an unknown parameter [" + currentFieldName + "]"
);
}
type = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
} else if (ID.match(currentFieldName, parser.getDeprecationHandler())) {
id = parser.text();
} else if (ROUTING.match(currentFieldName, parser.getDeprecationHandler())) {
Expand Down Expand Up @@ -301,7 +322,8 @@ public void parse(
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
.setRequireAlias(requireAlias)
.setRequireAlias(requireAlias),
type
);
} else {
indexRequestConsumer.accept(
Expand All @@ -314,7 +336,8 @@ public void parse(
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
.setRequireAlias(requireAlias)
.setRequireAlias(requireAlias),
type
);
}
} else if ("create".equals(action)) {
Expand All @@ -328,7 +351,8 @@ public void parse(
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
.setRequireAlias(requireAlias)
.setRequireAlias(requireAlias),
type
);
} else if ("update".equals(action)) {
if (version != Versions.MATCH_ANY || versionType != VersionType.INTERNAL) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ public class BulkRequestParserTests extends OpenSearchTestCase {

public void testIndexRequest() throws IOException {
BytesArray request = new BytesArray("{ \"index\":{ \"_id\": \"bar\" } }\n{}\n");
BulkRequestParser parser = new BulkRequestParser();
BulkRequestParser parser = new BulkRequestParser(randomBoolean());
final AtomicBoolean parsed = new AtomicBoolean();
parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON, indexRequest -> {
parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON, (indexRequest, type) -> {
assertFalse(parsed.get());
assertEquals("foo", indexRequest.index());
assertEquals("bar", indexRequest.id());
Expand All @@ -67,7 +67,7 @@ public void testIndexRequest() throws IOException {
true,
false,
XContentType.JSON,
indexRequest -> { assertTrue(indexRequest.isRequireAlias()); },
(indexRequest, type) -> { assertTrue(indexRequest.isRequireAlias()); },
req -> fail(),
req -> fail()
);
Expand All @@ -82,7 +82,7 @@ public void testIndexRequest() throws IOException {
null,
false,
XContentType.JSON,
indexRequest -> { assertTrue(indexRequest.isRequireAlias()); },
(indexRequest, type) -> { assertTrue(indexRequest.isRequireAlias()); },
req -> fail(),
req -> fail()
);
Expand All @@ -97,30 +97,42 @@ public void testIndexRequest() throws IOException {
true,
false,
XContentType.JSON,
indexRequest -> { assertFalse(indexRequest.isRequireAlias()); },
(indexRequest, type) -> { assertFalse(indexRequest.isRequireAlias()); },
req -> fail(),
req -> fail()
);
}

public void testDeleteRequest() throws IOException {
BytesArray request = new BytesArray("{ \"delete\":{ \"_id\": \"bar\" } }\n");
BulkRequestParser parser = new BulkRequestParser();
BulkRequestParser parser = new BulkRequestParser(randomBoolean());
final AtomicBoolean parsed = new AtomicBoolean();
parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON, req -> fail(), req -> fail(), deleteRequest -> {
assertFalse(parsed.get());
assertEquals("foo", deleteRequest.index());
assertEquals("bar", deleteRequest.id());
parsed.set(true);
});
parser.parse(
request,
"foo",
null,
null,
null,
null,
false,
XContentType.JSON,
(req, type) -> fail(),
req -> fail(),
deleteRequest -> {
assertFalse(parsed.get());
assertEquals("foo", deleteRequest.index());
assertEquals("bar", deleteRequest.id());
parsed.set(true);
}
);
assertTrue(parsed.get());
}

public void testUpdateRequest() throws IOException {
BytesArray request = new BytesArray("{ \"update\":{ \"_id\": \"bar\" } }\n{}\n");
BulkRequestParser parser = new BulkRequestParser();
BulkRequestParser parser = new BulkRequestParser(randomBoolean());
final AtomicBoolean parsed = new AtomicBoolean();
parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON, req -> fail(), updateRequest -> {
parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON, (req, type) -> fail(), updateRequest -> {
assertFalse(parsed.get());
assertEquals("foo", updateRequest.index());
assertEquals("bar", updateRequest.id());
Expand All @@ -138,7 +150,7 @@ public void testUpdateRequest() throws IOException {
true,
false,
XContentType.JSON,
req -> fail(),
(req, type) -> fail(),
updateRequest -> { assertTrue(updateRequest.isRequireAlias()); },
req -> fail()
);
Expand All @@ -153,7 +165,7 @@ public void testUpdateRequest() throws IOException {
null,
false,
XContentType.JSON,
req -> fail(),
(req, type) -> fail(),
updateRequest -> { assertTrue(updateRequest.isRequireAlias()); },
req -> fail()
);
Expand All @@ -168,15 +180,15 @@ public void testUpdateRequest() throws IOException {
true,
false,
XContentType.JSON,
req -> fail(),
(req, type) -> fail(),
updateRequest -> { assertFalse(updateRequest.isRequireAlias()); },
req -> fail()
);
}

public void testBarfOnLackOfTrailingNewline() {
BytesArray request = new BytesArray("{ \"index\":{ \"_id\": \"bar\" } }\n{}");
BulkRequestParser parser = new BulkRequestParser();
BulkRequestParser parser = new BulkRequestParser(randomBoolean());
IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> parser.parse(
Expand All @@ -188,7 +200,7 @@ public void testBarfOnLackOfTrailingNewline() {
null,
false,
XContentType.JSON,
indexRequest -> fail(),
(indexRequest, type) -> fail(),
req -> fail(),
req -> fail()
)
Expand All @@ -198,21 +210,46 @@ public void testBarfOnLackOfTrailingNewline() {

public void testFailOnExplicitIndex() {
BytesArray request = new BytesArray("{ \"index\":{ \"_index\": \"foo\", \"_id\": \"bar\" } }\n{}\n");
BulkRequestParser parser = new BulkRequestParser();
BulkRequestParser parser = new BulkRequestParser(randomBoolean());

IllegalArgumentException ex = expectThrows(
IllegalArgumentException.class,
() -> parser.parse(request, null, null, null, null, null, false, XContentType.JSON, req -> fail(), req -> fail(), req -> fail())
() -> parser.parse(
request,
null,
null,
null,
null,
null,
false,
XContentType.JSON,
(req, type) -> fail(),
req -> fail(),
req -> fail()
)
);
assertEquals("explicit index in bulk is not allowed", ex.getMessage());
}

public void testTypesStillParsedForBulkMonitoring() throws IOException {
BytesArray request = new BytesArray("{ \"index\":{ \"_type\": \"quux\", \"_id\": \"bar\" } }\n{}\n");
BulkRequestParser parser = new BulkRequestParser(false);
final AtomicBoolean parsed = new AtomicBoolean();
parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON, (indexRequest, type) -> {
assertFalse(parsed.get());
assertEquals("foo", indexRequest.index());
assertEquals("bar", indexRequest.id());
parsed.set(true);
}, req -> fail(), req -> fail());
assertTrue(parsed.get());
}

public void testParseDeduplicatesParameterStrings() throws IOException {
BytesArray request = new BytesArray(
"{ \"index\":{ \"_index\": \"bar\", \"pipeline\": \"foo\", \"routing\": \"blub\"} }\n{}\n"
+ "{ \"index\":{ \"_index\": \"bar\", \"pipeline\": \"foo\", \"routing\": \"blub\" } }\n{}\n"
);
BulkRequestParser parser = new BulkRequestParser();
BulkRequestParser parser = new BulkRequestParser(randomBoolean());
final List<IndexRequest> indexRequests = new ArrayList<>();
parser.parse(
request,
Expand All @@ -223,7 +260,7 @@ public void testParseDeduplicatesParameterStrings() throws IOException {
null,
true,
XContentType.JSON,
indexRequest -> indexRequests.add(indexRequest),
(indexRequest, type) -> indexRequests.add(indexRequest),
req -> fail(),
req -> fail()
);
Expand Down

0 comments on commit 06d9129

Please sign in to comment.