Skip to content

Commit

Permalink
[WIP] Revert beat types changes 2x (#3560)
Browse files Browse the repository at this point in the history
* Revert "Revert removal of typed end-points for bulk, search, index APIs (#3524)"

This reverts commit f48043b.

* Revert "[Type removal] Ignore _type field in bulk request (#3505)"

This reverts commit a93ff13.

Signed-off-by: Suraj Singh <[email protected]>
Signed-off-by: Nicholas Walter Knize <[email protected]>
  • Loading branch information
dreamer-89 authored Jun 10, 2022
1 parent 5e30383 commit 55d50dd
Show file tree
Hide file tree
Showing 11 changed files with 37 additions and 308 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -170,18 +170,6 @@ public void testBulkWithGlobalDefaults() throws Exception {
}
}

// Todo: This test is added to verify type support in bulk action. This should be removed once all clients
// avoid sending this param.
// https://github.com/opensearch-project/OpenSearch/issues/3484
public void testBulkWithTypes() throws Exception {
String bulkAction = copyToStringFromClasspath("/org/opensearch/action/bulk/bulk-with-deprecated-types.json");
{
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(bulkAction.getBytes(StandardCharsets.UTF_8), 0, bulkAction.length(), null, XContentType.JSON);
assertThat(bulkRequest.numberOfActions(), equalTo(5));
}
}

private void createSamplePipeline(String pipelineId) throws IOException, ExecutionException, InterruptedException {
XContentBuilder pipeline = jsonBuilder().startObject()
.startArray("processors")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,9 +287,7 @@ public BulkRequest add(
String routing = valueOrDefault(defaultRouting, globalRouting);
String pipeline = valueOrDefault(defaultPipeline, globalPipeline);
Boolean requireAlias = valueOrDefault(defaultRequireAlias, globalRequireAlias);
// https://github.com/opensearch-project/OpenSearch/issues/3484
// Undo error on types which breaks compatibility with some external clients
new BulkRequestParser(false).parse(
new BulkRequestParser().parse(
data,
defaultIndex,
routing,
Expand All @@ -298,7 +296,7 @@ public BulkRequest add(
requireAlias,
allowExplicitIndex,
xContentType,
(indexRequest, type) -> internalAdd(indexRequest),
this::internalAdd,
this::internalAdd,
this::add
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;

Expand All @@ -67,7 +66,6 @@
public final class BulkRequestParser {

private static final ParseField INDEX = new ParseField("_index");
private static final ParseField TYPE = new ParseField("_type");
private static final ParseField ID = new ParseField("_id");
private static final ParseField ROUTING = new ParseField("routing");
private static final ParseField OP_TYPE = new ParseField("op_type");
Expand All @@ -80,17 +78,6 @@ public final class BulkRequestParser {
private static final ParseField IF_PRIMARY_TERM = new ParseField("if_primary_term");
private static final ParseField REQUIRE_ALIAS = new ParseField(DocWriteRequest.REQUIRE_ALIAS);

// TODO: Remove this parameter once the BulkMonitoring endpoint has been removed
private final boolean errorOnType;

/**
* Create a new parser.
* @param errorOnType whether to allow _type information in the index line; used by BulkMonitoring
*/
public BulkRequestParser(boolean errorOnType) {
this.errorOnType = errorOnType;
}

private static int findNextMarker(byte marker, int from, BytesReference data) {
final int res = data.indexOf(marker, from);
if (res != -1) {
Expand Down Expand Up @@ -136,7 +123,7 @@ public void parse(
@Nullable Boolean defaultRequireAlias,
boolean allowExplicitIndex,
XContentType xContentType,
BiConsumer<IndexRequest, String> indexRequestConsumer,
Consumer<IndexRequest> indexRequestConsumer,
Consumer<UpdateRequest> updateRequestConsumer,
Consumer<DeleteRequest> deleteRequestConsumer
) throws IOException {
Expand Down Expand Up @@ -192,7 +179,6 @@ public void parse(
String action = parser.currentName();

String index = defaultIndex;
String type = null;
String id = null;
String routing = defaultRouting;
FetchSourceContext fetchSourceContext = defaultFetchSourceContext;
Expand All @@ -205,7 +191,7 @@ public void parse(
String pipeline = defaultPipeline;
boolean requireAlias = defaultRequireAlias != null && defaultRequireAlias;

// at this stage, next token can either be END_OBJECT (and use default index and type, with auto generated id)
// at this stage, next token can either be END_OBJECT (and use default index with auto generated id)
// or START_OBJECT which will have another set of parameters
token = parser.nextToken();

Expand All @@ -220,13 +206,6 @@ public void parse(
throw new IllegalArgumentException("explicit index in bulk is not allowed");
}
index = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
} else if (TYPE.match(currentFieldName, parser.getDeprecationHandler())) {
if (errorOnType) {
throw new IllegalArgumentException(
"Action/metadata line [" + line + "] contains an unknown parameter [" + currentFieldName + "]"
);
}
type = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
} else if (ID.match(currentFieldName, parser.getDeprecationHandler())) {
id = parser.text();
} else if (ROUTING.match(currentFieldName, parser.getDeprecationHandler())) {
Expand Down Expand Up @@ -322,8 +301,7 @@ public void parse(
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
.setRequireAlias(requireAlias),
type
.setRequireAlias(requireAlias)
);
} else {
indexRequestConsumer.accept(
Expand All @@ -336,8 +314,7 @@ public void parse(
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
.setRequireAlias(requireAlias),
type
.setRequireAlias(requireAlias)
);
}
} else if ("create".equals(action)) {
Expand All @@ -351,8 +328,7 @@ public void parse(
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
.setRequireAlias(requireAlias),
type
.setRequireAlias(requireAlias)
);
} else if ("update".equals(action)) {
if (version != Versions.MATCH_ANY || versionType != VersionType.INTERNAL) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,7 @@ public RestBulkAction(Settings settings) {
@Override
public List<Route> routes() {
return unmodifiableList(
asList(
new Route(POST, "/_bulk"),
new Route(PUT, "/_bulk"),
new Route(POST, "/{index}/_bulk"),
new Route(PUT, "/{index}/_bulk"),
// Deprecated typed endpoints.
new Route(POST, "/{index}/{type}/_bulk"),
new Route(PUT, "/{index}/{type}/_bulk")
)
asList(new Route(POST, "/_bulk"), new Route(PUT, "/_bulk"), new Route(POST, "/{index}/_bulk"), new Route(PUT, "/{index}/_bulk"))
);
}

Expand All @@ -95,9 +87,6 @@ public String getName() {
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
BulkRequest bulkRequest = Requests.bulkRequest();
String defaultIndex = request.param("index");
if (request.hasParam("type")) {
request.param("type");
}
String defaultRouting = request.param("routing");
FetchSourceContext defaultFetchSourceContext = FetchSourceContext.parseFromRestRequest(request);
String defaultPipeline = request.param("pipeline");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,7 @@ public class RestDeleteAction extends BaseRestHandler {

@Override
public List<Route> routes() {
return unmodifiableList(
asList(
new Route(DELETE, "/{index}/_doc/{id}"),
// Deprecated typed endpoint.
new Route(DELETE, "/{index}/{type}/{id}")
)
);
return unmodifiableList(asList(new Route(DELETE, "/{index}/_doc/{id}")));
}

@Override
Expand All @@ -73,9 +67,6 @@ public String getName() {

@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
if (request.hasParam("type")) {
request.param("type");
}
DeleteRequest deleteRequest = new DeleteRequest(request.param("index"), request.param("id"));
deleteRequest.routing(request.param("routing"));
deleteRequest.timeout(request.paramAsTime("timeout", DeleteRequest.DEFAULT_TIMEOUT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,7 @@ public class RestIndexAction extends BaseRestHandler {

@Override
public List<Route> routes() {
return unmodifiableList(
asList(
new Route(POST, "/{index}/_doc/{id}"),
new Route(PUT, "/{index}/_doc/{id}"),
new Route(POST, "/{index}/{type}/{id}"),
new Route(PUT, "/{index}/{type}/{id}")
)
);
return unmodifiableList(asList(new Route(POST, "/{index}/_doc/{id}"), new Route(PUT, "/{index}/_doc/{id}")));
}

@Override
Expand All @@ -92,14 +85,7 @@ public String getName() {

@Override
public List<Route> routes() {
return unmodifiableList(
asList(
new Route(POST, "/{index}/_create/{id}"),
new Route(PUT, "/{index}/_create/{id}"),
new Route(POST, "/{index}/{type}/{id}/_create"),
new Route(PUT, "/{index}/{type}/{id}/_create")
)
);
return unmodifiableList(asList(new Route(POST, "/{index}/_create/{id}"), new Route(PUT, "/{index}/_create/{id}")));
}

@Override
Expand Down Expand Up @@ -136,7 +122,7 @@ public String getName() {

@Override
public List<Route> routes() {
return unmodifiableList(asList(new Route(POST, "/{index}/_doc"), new Route(POST, "/{index}/{type}")));
return unmodifiableList(asList(new Route(POST, "/{index}/_doc")));
}

@Override
Expand All @@ -153,9 +139,6 @@ public RestChannelConsumer prepareRequest(RestRequest request, final NodeClient
@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
IndexRequest indexRequest = new IndexRequest(request.param("index"));
if (request.hasParam("type")) {
request.param("type");
}
indexRequest.id(request.param("id"));
indexRequest.routing(request.param("routing"));
indexRequest.setPipeline(request.param("pipeline"));
Expand Down
Loading

0 comments on commit 55d50dd

Please sign in to comment.