Skip to content

Commit

Permalink
Small improvement in TransportBulkAction (elastic#70752)
Browse files Browse the repository at this point in the history
Avoid looping again over the DocWriteRequests in order to group them
by ShardId, but instead do this in one step in the existing
preprocessing loop.
  • Loading branch information
matriv authored Mar 24, 2021
1 parent 5bb440c commit ac6b45b
Showing 1 changed file with 6 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,8 @@ protected void doRun() {
}
final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameExpressionResolver);
Metadata metadata = clusterState.metadata();
// Group the requests by ShardId -> Operations mapping
Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
for (int i = 0; i < bulkRequest.requests.size(); i++) {
DocWriteRequest<?> docWriteRequest = bulkRequest.requests.get(i);
//the request can only be null because we set it to null in the previous step, so it gets ignored
Expand Down Expand Up @@ -462,6 +464,10 @@ protected void doRun() {
break;
default: throw new AssertionError("request type not supported: [" + docWriteRequest.opType() + "]");
}
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex.getName(),
docWriteRequest.id(), docWriteRequest.routing()).shardId();
List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
shardRequests.add(new BulkItemRequest(i, docWriteRequest));
} catch (ElasticsearchParseException | IllegalArgumentException | RoutingMissingException e) {
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(),
docWriteRequest.id(), e);
Expand All @@ -472,20 +478,6 @@ protected void doRun() {
}
}

// first, go over all the requests and create a ShardId -> Operations mapping
Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
for (int i = 0; i < bulkRequest.requests.size(); i++) {
DocWriteRequest<?> request = bulkRequest.requests.get(i);
if (request == null) {
continue;
}
String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName();
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(),
request.routing()).shardId();
List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
shardRequests.add(new BulkItemRequest(i, request));
}

if (requestsByShard.isEmpty()) {
listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]),
buildTookInMillis(startTimeNanos)));
Expand Down

0 comments on commit ac6b45b

Please sign in to comment.