diff --git a/CHANGELOG.md b/CHANGELOG.md index cbbfc18a7c9b2..e697d3263dcf3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix negative RequestStats metric issue ([#13553](https://github.com/opensearch-project/OpenSearch/pull/13553)) - Fix get field mapping API returns 404 error in mixed cluster with multiple versions ([#13624](https://github.com/opensearch-project/OpenSearch/pull/13624)) - Allow clearing `remote_store.compatibility_mode` setting ([#13646](https://github.com/opensearch-project/OpenSearch/pull/13646)) +- Fix ReplicaShardBatchAllocator to batch shards without duplicates ([#13710](https://github.com/opensearch-project/OpenSearch/pull/13710)) ### Security diff --git a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java index d2de78ffac965..3c0797cd450d2 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java @@ -117,14 +117,17 @@ public void cleanCaches() { // for tests protected ShardsBatchGatewayAllocator() { + this(DEFAULT_SHARD_BATCH_SIZE); + } + + protected ShardsBatchGatewayAllocator(long batchSize) { this.rerouteService = null; this.batchStartedAction = null; this.primaryShardBatchAllocator = null; this.batchStoreAction = null; this.replicaShardBatchAllocator = null; - this.maxBatchSize = DEFAULT_SHARD_BATCH_SIZE; + this.maxBatchSize = batchSize; } - // for tests @Override @@ -228,13 +231,13 @@ protected Set createAndUpdateBatches(RoutingAllocation allocation, boole batchEntry.getValue().getBatchedShards().forEach(shardId -> currentBatchedShards.put(shardId, batchEntry.getKey())); } - Set newShardsToBatch = Sets.newHashSet(); + Map newShardsToBatch = new HashMap<>(); Set batchedShardsToAssign = Sets.newHashSet(); // add all unassigned shards to the batch if they are not already in a batch unassigned.forEach(shardRouting -> { if ((currentBatchedShards.containsKey(shardRouting.shardId()) == false) && (shardRouting.primary() == primary)) { assert shardRouting.unassigned(); - newShardsToBatch.add(shardRouting); + newShardsToBatch.put(shardRouting.shardId(), shardRouting); } // if shard is already batched update to latest shardRouting information in the batches // Replica shard assignment can be cancelled if we get a better match. These ShardRouting objects also @@ -262,7 +265,7 @@ else if (shardRouting.primary() == primary) { refreshShardBatches(currentBatches, batchedShardsToAssign, primary); - Iterator iterator = newShardsToBatch.iterator(); + Iterator iterator = newShardsToBatch.values().iterator(); assert maxBatchSize > 0 : "Shards batch size must be greater than 0"; long batchSize = maxBatchSize; diff --git a/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java index bb59a5792ec8c..aa31c710c1fbd 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java @@ -18,14 +18,21 @@ import org.opensearch.cluster.OpenSearchAllocationTestCase; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.RoutingNodes; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.TestShardRouting; +import org.opensearch.cluster.routing.UnassignedInfo; import org.opensearch.cluster.routing.allocation.RoutingAllocation; import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; import org.opensearch.common.collect.Tuple; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.set.Sets; import org.opensearch.core.index.shard.ShardId; import org.opensearch.snapshots.SnapshotShardSizeInfo; import org.opensearch.test.gateway.TestShardBatchGatewayAllocator; @@ -222,6 +229,21 @@ public void testSafelyRemoveShardFromBothBatch() { assertEquals(0, testShardsBatchGatewayAllocator.getBatchIdToStoreShardBatch().size()); } + public void testDeDuplicationOfReplicaShardsAcrossBatch() { + final ShardId shardId = new ShardId("test", "_na_", 0); + final DiscoveryNode node = newNode("node1"); + // number of replicas is greater than batch size - to ensure shardRouting gets de-duped across batch + createRoutingWithDifferentUnAssignedInfo(shardId, node, 50); + testShardsBatchGatewayAllocator = new TestShardBatchGatewayAllocator(10); + + // only replica shard should be in the batch + Set replicaBatches = testShardsBatchGatewayAllocator.createAndUpdateBatches(testAllocation, false); + assertEquals(1, replicaBatches.size()); + ShardsBatchGatewayAllocator.ShardsBatch shardsBatch = testShardsBatchGatewayAllocator.getBatchIdToStoreShardBatch() + .get(replicaBatches.iterator().next()); + assertEquals(1, shardsBatch.getBatchedShards().size()); + } + public void testGetBatchIdExisting() { createIndexAndUpdateClusterState(2, 1020, 1); // get all shardsRoutings for test index @@ -345,6 +367,59 @@ private void createIndexAndUpdateClusterState(int count, int numberOfShards, int ); } + private void createRoutingWithDifferentUnAssignedInfo(ShardId primaryShardId, DiscoveryNode node, int numberOfReplicas) { + + ShardRouting primaryShard = TestShardRouting.newShardRouting(primaryShardId, node.getId(), true, ShardRoutingState.STARTED); + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder(primaryShardId.getIndexName()) + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(numberOfReplicas) + .putInSyncAllocationIds(0, Sets.newHashSet(primaryShard.allocationId().getId())) + ) + .build(); + + IndexRoutingTable.Builder isd = IndexRoutingTable.builder(primaryShardId.getIndex()) + .addIndexShard(new IndexShardRoutingTable.Builder(primaryShardId).addShard(primaryShard).build()); + + for (int i = 0; i < numberOfReplicas; i++) { + isd.addShard( + ShardRouting.newUnassigned( + primaryShardId, + false, + RecoverySource.PeerRecoverySource.INSTANCE, + new UnassignedInfo( + UnassignedInfo.Reason.REPLICA_ADDED, + "message for replica-copy " + i, + null, + 0, + System.nanoTime(), + System.currentTimeMillis(), + false, + UnassignedInfo.AllocationStatus.NO_ATTEMPT, + Collections.emptySet() + ) + ) + ); + } + + RoutingTable routingTable = RoutingTable.builder().add(isd).build(); + clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .build(); + testAllocation = new RoutingAllocation( + new AllocationDeciders(Collections.emptyList()), + new RoutingNodes(clusterState, false), + clusterState, + ClusterInfo.EMPTY, + SnapshotShardSizeInfo.EMPTY, + System.nanoTime() + ); + + } + // call this after index creation and update cluster state private Tuple, Set> createBatchesAndAssert(int expectedBatchSize) { Set primaryBatches = testShardsBatchGatewayAllocator.createAndUpdateBatches(testAllocation, true); diff --git a/test/framework/src/main/java/org/opensearch/test/gateway/TestShardBatchGatewayAllocator.java b/test/framework/src/main/java/org/opensearch/test/gateway/TestShardBatchGatewayAllocator.java index 53a4e90adb976..fbb39c284f0ff 100644 --- a/test/framework/src/main/java/org/opensearch/test/gateway/TestShardBatchGatewayAllocator.java +++ b/test/framework/src/main/java/org/opensearch/test/gateway/TestShardBatchGatewayAllocator.java @@ -31,6 +31,14 @@ public class TestShardBatchGatewayAllocator extends ShardsBatchGatewayAllocator { + public TestShardBatchGatewayAllocator() { + + } + + public TestShardBatchGatewayAllocator(long maxBatchSize) { + super(maxBatchSize); + } + Map> knownAllocations = new HashMap<>(); DiscoveryNodes currentNodes = DiscoveryNodes.EMPTY_NODES; Map shardIdNodeToReplicationCheckPointMap = new HashMap<>();