Skip to content

Commit

Permalink
Delete batch if empty when removing shards from batch
Browse files Browse the repository at this point in the history
Signed-off-by: Aman Khare <[email protected]>
  • Loading branch information
Aman Khare committed Apr 12, 2024
1 parent ed9df68 commit 9b1d553
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ else if (shardRouting.primary() == primary) {
}
});

refreshShardBatches(currentBatches, batchedShardsToAssign);
refreshShardBatches(currentBatches, batchedShardsToAssign, primary);

Iterator<ShardRouting> iterator = newShardsToBatch.iterator();
assert maxBatchSize > 0 : "Shards batch size must be greater than 0";
Expand Down Expand Up @@ -283,7 +283,7 @@ else if (shardRouting.primary() == primary) {
return batchesToBeAssigned;
}

private void refreshShardBatches(ConcurrentMap<String, ShardsBatch> currentBatches, Set<ShardId> batchedShardsToAssign) {
private void refreshShardBatches(ConcurrentMap<String, ShardsBatch> currentBatches, Set<ShardId> batchedShardsToAssign, boolean primary) {
// cleanup shard from batches if they are not present in unassigned list from allocation object. This is
// needed as AllocationService.reroute can also be called directly by API flows for example DeleteIndices.
// So, as part of calling reroute, those shards will be removed from allocation object. It'll handle the
Expand All @@ -297,6 +297,8 @@ private void refreshShardBatches(ConcurrentMap<String, ShardsBatch> currentBatch
batchEntry.getValue().clearShardFromCache(shardId);
}
}
ConcurrentMap<String, ShardsBatch> batches = primary ? batchIdToStartedShardBatch : batchIdToStoreShardBatch;
deleteBatchIfEmpty(batches, batchEntry.getValue().getBatchId());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,13 @@ public static class NodeStoreFilesMetadataBatch extends BaseNodeResponse {

protected NodeStoreFilesMetadataBatch(StreamInput in) throws IOException {
super(in);
this.nodeStoreFilesMetadataBatch = in.readMap(ShardId::new, NodeStoreFilesMetadata::new);
this.nodeStoreFilesMetadataBatch = in.readMap(ShardId::new, i -> {
if (i.readBoolean()) {
return new NodeStoreFilesMetadata(i);
} else {
return null;
}
});
}

public NodeStoreFilesMetadataBatch(DiscoveryNode node, Map<ShardId, NodeStoreFilesMetadata> nodeStoreFilesMetadataBatch) {
Expand All @@ -344,7 +350,14 @@ public Map<ShardId, NodeStoreFilesMetadata> getNodeStoreFilesMetadataBatch() {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeMap(nodeStoreFilesMetadataBatch, (o, k) -> k.writeTo(o), (o, v) -> v.writeTo(o));
out.writeMap(nodeStoreFilesMetadataBatch, (o, k) -> k.writeTo(o), (o, v) -> {
if (v != null) {
o.writeBoolean(true);
v.writeTo(o);
} else {
o.writeBoolean(false);
}
});
}
}

Expand Down

0 comments on commit 9b1d553

Please sign in to comment.