From 9b1d5536ee643162aa0c2cb450424e10236076e4 Mon Sep 17 00:00:00 2001 From: Aman Khare Date: Fri, 12 Apr 2024 16:21:44 +0530 Subject: [PATCH] Delete batch if empty when removing shards from batch Signed-off-by: Aman Khare --- .../gateway/ShardsBatchGatewayAllocator.java | 6 ++++-- ...ansportNodesListShardStoreMetadataBatch.java | 17 +++++++++++++++-- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java index 0972f7b1398eb..204bb140c4031 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java @@ -251,7 +251,7 @@ else if (shardRouting.primary() == primary) { } }); - refreshShardBatches(currentBatches, batchedShardsToAssign); + refreshShardBatches(currentBatches, batchedShardsToAssign, primary); Iterator iterator = newShardsToBatch.iterator(); assert maxBatchSize > 0 : "Shards batch size must be greater than 0"; @@ -283,7 +283,7 @@ else if (shardRouting.primary() == primary) { return batchesToBeAssigned; } - private void refreshShardBatches(ConcurrentMap currentBatches, Set batchedShardsToAssign) { + private void refreshShardBatches(ConcurrentMap currentBatches, Set batchedShardsToAssign, boolean primary) { // cleanup shard from batches if they are not present in unassigned list from allocation object. This is // needed as AllocationService.reroute can also be called directly by API flows for example DeleteIndices. // So, as part of calling reroute, those shards will be removed from allocation object. It'll handle the @@ -297,6 +297,8 @@ private void refreshShardBatches(ConcurrentMap currentBatch batchEntry.getValue().clearShardFromCache(shardId); } } + ConcurrentMap batches = primary ? batchIdToStartedShardBatch : batchIdToStoreShardBatch; + deleteBatchIfEmpty(batches, batchEntry.getValue().getBatchId()); } } diff --git a/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java b/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java index 68dffa62f7b2c..22b03539cca74 100644 --- a/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java +++ b/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java @@ -329,7 +329,13 @@ public static class NodeStoreFilesMetadataBatch extends BaseNodeResponse { protected NodeStoreFilesMetadataBatch(StreamInput in) throws IOException { super(in); - this.nodeStoreFilesMetadataBatch = in.readMap(ShardId::new, NodeStoreFilesMetadata::new); + this.nodeStoreFilesMetadataBatch = in.readMap(ShardId::new, i -> { + if (i.readBoolean()) { + return new NodeStoreFilesMetadata(i); + } else { + return null; + } + }); } public NodeStoreFilesMetadataBatch(DiscoveryNode node, Map nodeStoreFilesMetadataBatch) { @@ -344,7 +350,14 @@ public Map getNodeStoreFilesMetadataBatch() { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeMap(nodeStoreFilesMetadataBatch, (o, k) -> k.writeTo(o), (o, v) -> v.writeTo(o)); + out.writeMap(nodeStoreFilesMetadataBatch, (o, k) -> k.writeTo(o), (o, v) -> { + if (v != null) { + o.writeBoolean(true); + v.writeTo(o); + } else { + o.writeBoolean(false); + } + }); } }