From ac6b45beb2e036daaa1a9287950c2401a9072b3e Mon Sep 17 00:00:00 2001 From: Marios Trivyzas Date: Wed, 24 Mar 2021 20:03:49 +0100 Subject: [PATCH] Small improvement in TransportBulkAction (#70752) Avoid looping again over the DocWriteRequests in order to group them by ShardId, but instead do this in one step in the existing preprocessing loop. --- .../action/bulk/TransportBulkAction.java | 20 ++++++------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 3cf9306cd806e..fed22fbc67559 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -411,6 +411,8 @@ protected void doRun() { } final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameExpressionResolver); Metadata metadata = clusterState.metadata(); + // Group the requests by ShardId -> Operations mapping + Map> requestsByShard = new HashMap<>(); for (int i = 0; i < bulkRequest.requests.size(); i++) { DocWriteRequest docWriteRequest = bulkRequest.requests.get(i); //the request can only be null because we set it to null in the previous step, so it gets ignored @@ -462,6 +464,10 @@ protected void doRun() { break; default: throw new AssertionError("request type not supported: [" + docWriteRequest.opType() + "]"); } + ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex.getName(), + docWriteRequest.id(), docWriteRequest.routing()).shardId(); + List shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>()); + shardRequests.add(new BulkItemRequest(i, docWriteRequest)); } catch (ElasticsearchParseException | IllegalArgumentException | RoutingMissingException e) { BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), docWriteRequest.id(), e); @@ -472,20 +478,6 @@ protected void doRun() { } } - // first, go over all the requests and create a ShardId -> Operations mapping - Map> requestsByShard = new HashMap<>(); - for (int i = 0; i < bulkRequest.requests.size(); i++) { - DocWriteRequest request = bulkRequest.requests.get(i); - if (request == null) { - continue; - } - String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName(); - ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), - request.routing()).shardId(); - List shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>()); - shardRequests.add(new BulkItemRequest(i, request)); - } - if (requestsByShard.isEmpty()) { listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos)));