Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] [Type removal] Remove type from BulkRequestParser #3431

Merged
merged 1 commit into from
May 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ public BulkRequest add(
String routing = valueOrDefault(defaultRouting, globalRouting);
String pipeline = valueOrDefault(defaultPipeline, globalPipeline);
Boolean requireAlias = valueOrDefault(defaultRequireAlias, globalRequireAlias);
new BulkRequestParser(true).parse(
new BulkRequestParser().parse(
data,
defaultIndex,
routing,
Expand All @@ -296,7 +296,7 @@ public BulkRequest add(
requireAlias,
allowExplicitIndex,
xContentType,
(indexRequest, type) -> internalAdd(indexRequest),
this::internalAdd,
this::internalAdd,
this::add
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;

Expand All @@ -67,7 +66,6 @@
public final class BulkRequestParser {

private static final ParseField INDEX = new ParseField("_index");
private static final ParseField TYPE = new ParseField("_type");
private static final ParseField ID = new ParseField("_id");
private static final ParseField ROUTING = new ParseField("routing");
private static final ParseField OP_TYPE = new ParseField("op_type");
Expand All @@ -80,17 +78,6 @@ public final class BulkRequestParser {
private static final ParseField IF_PRIMARY_TERM = new ParseField("if_primary_term");
private static final ParseField REQUIRE_ALIAS = new ParseField(DocWriteRequest.REQUIRE_ALIAS);

// TODO: Remove this parameter once the BulkMonitoring endpoint has been removed
private final boolean errorOnType;

/**
* Create a new parser.
* @param errorOnType whether to allow _type information in the index line; used by BulkMonitoring
*/
public BulkRequestParser(boolean errorOnType) {
this.errorOnType = errorOnType;
}

private static int findNextMarker(byte marker, int from, BytesReference data) {
final int res = data.indexOf(marker, from);
if (res != -1) {
Expand Down Expand Up @@ -136,7 +123,7 @@ public void parse(
@Nullable Boolean defaultRequireAlias,
boolean allowExplicitIndex,
XContentType xContentType,
BiConsumer<IndexRequest, String> indexRequestConsumer,
Consumer<IndexRequest> indexRequestConsumer,
Consumer<UpdateRequest> updateRequestConsumer,
Consumer<DeleteRequest> deleteRequestConsumer
) throws IOException {
Expand Down Expand Up @@ -192,7 +179,6 @@ public void parse(
String action = parser.currentName();

String index = defaultIndex;
String type = null;
String id = null;
String routing = defaultRouting;
FetchSourceContext fetchSourceContext = defaultFetchSourceContext;
Expand All @@ -205,7 +191,7 @@ public void parse(
String pipeline = defaultPipeline;
boolean requireAlias = defaultRequireAlias != null && defaultRequireAlias;

// at this stage, next token can either be END_OBJECT (and use default index and type, with auto generated id)
// at this stage, next token can either be END_OBJECT (and use default index with auto generated id)
// or START_OBJECT which will have another set of parameters
token = parser.nextToken();

Expand All @@ -220,13 +206,6 @@ public void parse(
throw new IllegalArgumentException("explicit index in bulk is not allowed");
}
index = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
} else if (TYPE.match(currentFieldName, parser.getDeprecationHandler())) {
if (errorOnType) {
throw new IllegalArgumentException(
"Action/metadata line [" + line + "] contains an unknown parameter [" + currentFieldName + "]"
);
}
type = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
} else if (ID.match(currentFieldName, parser.getDeprecationHandler())) {
id = parser.text();
} else if (ROUTING.match(currentFieldName, parser.getDeprecationHandler())) {
Expand Down Expand Up @@ -322,8 +301,7 @@ public void parse(
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
.setRequireAlias(requireAlias),
type
.setRequireAlias(requireAlias)
);
} else {
indexRequestConsumer.accept(
Expand All @@ -336,8 +314,7 @@ public void parse(
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
.setRequireAlias(requireAlias),
type
.setRequireAlias(requireAlias)
);
}
} else if ("create".equals(action)) {
Expand All @@ -351,8 +328,7 @@ public void parse(
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
.setRequireAlias(requireAlias),
type
.setRequireAlias(requireAlias)
);
} else if ("update".equals(action)) {
if (version != Versions.MATCH_ANY || versionType != VersionType.INTERNAL) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ public class BulkRequestParserTests extends OpenSearchTestCase {

public void testIndexRequest() throws IOException {
BytesArray request = new BytesArray("{ \"index\":{ \"_id\": \"bar\" } }\n{}\n");
BulkRequestParser parser = new BulkRequestParser(randomBoolean());
BulkRequestParser parser = new BulkRequestParser();
final AtomicBoolean parsed = new AtomicBoolean();
parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON, (indexRequest, type) -> {
parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON, indexRequest -> {
assertFalse(parsed.get());
assertEquals("foo", indexRequest.index());
assertEquals("bar", indexRequest.id());
Expand All @@ -67,7 +67,7 @@ public void testIndexRequest() throws IOException {
true,
false,
XContentType.JSON,
(indexRequest, type) -> { assertTrue(indexRequest.isRequireAlias()); },
indexRequest -> { assertTrue(indexRequest.isRequireAlias()); },
req -> fail(),
req -> fail()
);
Expand All @@ -82,7 +82,7 @@ public void testIndexRequest() throws IOException {
null,
false,
XContentType.JSON,
(indexRequest, type) -> { assertTrue(indexRequest.isRequireAlias()); },
indexRequest -> { assertTrue(indexRequest.isRequireAlias()); },
req -> fail(),
req -> fail()
);
Expand All @@ -97,42 +97,30 @@ public void testIndexRequest() throws IOException {
true,
false,
XContentType.JSON,
(indexRequest, type) -> { assertFalse(indexRequest.isRequireAlias()); },
indexRequest -> { assertFalse(indexRequest.isRequireAlias()); },
req -> fail(),
req -> fail()
);
}

public void testDeleteRequest() throws IOException {
BytesArray request = new BytesArray("{ \"delete\":{ \"_id\": \"bar\" } }\n");
BulkRequestParser parser = new BulkRequestParser(randomBoolean());
BulkRequestParser parser = new BulkRequestParser();
final AtomicBoolean parsed = new AtomicBoolean();
parser.parse(
request,
"foo",
null,
null,
null,
null,
false,
XContentType.JSON,
(req, type) -> fail(),
req -> fail(),
deleteRequest -> {
assertFalse(parsed.get());
assertEquals("foo", deleteRequest.index());
assertEquals("bar", deleteRequest.id());
parsed.set(true);
}
);
parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON, req -> fail(), req -> fail(), deleteRequest -> {
assertFalse(parsed.get());
assertEquals("foo", deleteRequest.index());
assertEquals("bar", deleteRequest.id());
parsed.set(true);
});
assertTrue(parsed.get());
}

public void testUpdateRequest() throws IOException {
BytesArray request = new BytesArray("{ \"update\":{ \"_id\": \"bar\" } }\n{}\n");
BulkRequestParser parser = new BulkRequestParser(randomBoolean());
BulkRequestParser parser = new BulkRequestParser();
final AtomicBoolean parsed = new AtomicBoolean();
parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON, (req, type) -> fail(), updateRequest -> {
parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON, req -> fail(), updateRequest -> {
assertFalse(parsed.get());
assertEquals("foo", updateRequest.index());
assertEquals("bar", updateRequest.id());
Expand All @@ -150,7 +138,7 @@ public void testUpdateRequest() throws IOException {
true,
false,
XContentType.JSON,
(req, type) -> fail(),
req -> fail(),
updateRequest -> { assertTrue(updateRequest.isRequireAlias()); },
req -> fail()
);
Expand All @@ -165,7 +153,7 @@ public void testUpdateRequest() throws IOException {
null,
false,
XContentType.JSON,
(req, type) -> fail(),
req -> fail(),
updateRequest -> { assertTrue(updateRequest.isRequireAlias()); },
req -> fail()
);
Expand All @@ -180,15 +168,15 @@ public void testUpdateRequest() throws IOException {
true,
false,
XContentType.JSON,
(req, type) -> fail(),
req -> fail(),
updateRequest -> { assertFalse(updateRequest.isRequireAlias()); },
req -> fail()
);
}

public void testBarfOnLackOfTrailingNewline() {
BytesArray request = new BytesArray("{ \"index\":{ \"_id\": \"bar\" } }\n{}");
BulkRequestParser parser = new BulkRequestParser(randomBoolean());
BulkRequestParser parser = new BulkRequestParser();
IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> parser.parse(
Expand All @@ -200,7 +188,7 @@ public void testBarfOnLackOfTrailingNewline() {
null,
false,
XContentType.JSON,
(indexRequest, type) -> fail(),
indexRequest -> fail(),
req -> fail(),
req -> fail()
)
Expand All @@ -210,46 +198,21 @@ public void testBarfOnLackOfTrailingNewline() {

public void testFailOnExplicitIndex() {
BytesArray request = new BytesArray("{ \"index\":{ \"_index\": \"foo\", \"_id\": \"bar\" } }\n{}\n");
BulkRequestParser parser = new BulkRequestParser(randomBoolean());
BulkRequestParser parser = new BulkRequestParser();

IllegalArgumentException ex = expectThrows(
IllegalArgumentException.class,
() -> parser.parse(
request,
null,
null,
null,
null,
null,
false,
XContentType.JSON,
(req, type) -> fail(),
req -> fail(),
req -> fail()
)
() -> parser.parse(request, null, null, null, null, null, false, XContentType.JSON, req -> fail(), req -> fail(), req -> fail())
);
assertEquals("explicit index in bulk is not allowed", ex.getMessage());
}

public void testTypesStillParsedForBulkMonitoring() throws IOException {
BytesArray request = new BytesArray("{ \"index\":{ \"_type\": \"quux\", \"_id\": \"bar\" } }\n{}\n");
BulkRequestParser parser = new BulkRequestParser(false);
final AtomicBoolean parsed = new AtomicBoolean();
parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON, (indexRequest, type) -> {
assertFalse(parsed.get());
assertEquals("foo", indexRequest.index());
assertEquals("bar", indexRequest.id());
parsed.set(true);
}, req -> fail(), req -> fail());
assertTrue(parsed.get());
}

public void testParseDeduplicatesParameterStrings() throws IOException {
BytesArray request = new BytesArray(
"{ \"index\":{ \"_index\": \"bar\", \"pipeline\": \"foo\", \"routing\": \"blub\"} }\n{}\n"
+ "{ \"index\":{ \"_index\": \"bar\", \"pipeline\": \"foo\", \"routing\": \"blub\" } }\n{}\n"
);
BulkRequestParser parser = new BulkRequestParser(randomBoolean());
BulkRequestParser parser = new BulkRequestParser();
final List<IndexRequest> indexRequests = new ArrayList<>();
parser.parse(
request,
Expand All @@ -260,7 +223,7 @@ public void testParseDeduplicatesParameterStrings() throws IOException {
null,
true,
XContentType.JSON,
(indexRequest, type) -> indexRequests.add(indexRequest),
indexRequest -> indexRequests.add(indexRequest),
req -> fail(),
req -> fail()
);
Expand Down