From e83f114d68d3ee14ef38b64d223b12977eee00ab Mon Sep 17 00:00:00 2001 From: Swetha Guptha Date: Mon, 20 May 2024 10:24:27 +0530 Subject: [PATCH] Evaluate and execute decisions for shards in a batch together (#13702) Signed-off-by: Swetha Guptha --- .../decider/ThrottlingAllocationDecider.java | 9 + .../gateway/BaseGatewayShardAllocator.java | 53 +---- .../gateway/PrimaryShardBatchAllocator.java | 63 +++-- .../gateway/ReplicaShardBatchAllocator.java | 90 ++++--- .../PrimaryShardBatchAllocatorTests.java | 116 +++++---- .../ReplicaShardBatchAllocatorTests.java | 222 ++++++++++++++++-- 6 files changed, 364 insertions(+), 189 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java index 26a04de31ce39..bf7ca4aa1c159 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java @@ -177,6 +177,15 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing int primariesInRecovery = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node.nodeId()); + logger.debug( + "ThrottlingAllocationDecider decision, throttle: [{}] primary recovery limit [{}]," + + " primaries in recovery [{}] invoked for [{}] on node [{}]", + primariesInRecovery >= primariesInitialRecoveries, + primariesInitialRecoveries, + primariesInRecovery, + shardRouting, + node.node() + ); if (primariesInRecovery >= primariesInitialRecoveries) { // TODO: Should index creation not be throttled for primary shards? return allocation.decision( diff --git a/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java b/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java index eed5de65258fc..58982e869794f 100644 --- a/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java @@ -36,7 +36,6 @@ import org.apache.logging.log4j.Logger; import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.RoutingNode; -import org.opensearch.cluster.routing.RoutingNodes; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.opensearch.cluster.routing.allocation.AllocationDecision; @@ -46,9 +45,7 @@ import org.opensearch.cluster.routing.allocation.decider.Decision; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.stream.Collectors; /** * An abstract class that implements basic functionality for allocating @@ -81,38 +78,7 @@ public void allocateUnassigned( executeDecision(shardRouting, allocateUnassignedDecision, allocation, unassignedAllocationHandler); } - /** - * Allocate Batch of unassigned shard to nodes where valid copies of the shard already exists - * @param shardRoutings the shards to allocate - * @param allocation the allocation state container object - */ - public void allocateUnassignedBatch(List shardRoutings, RoutingAllocation allocation) { - // make Allocation Decisions for all shards - HashMap decisionMap = makeAllocationDecision(shardRoutings, allocation, logger); - assert shardRoutings.size() == decisionMap.size() : "make allocation decision didn't return allocation decision for " - + "some shards"; - // get all unassigned shards iterator - RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator(); - - while (iterator.hasNext()) { - ShardRouting shard = iterator.next(); - try { - if (decisionMap.isEmpty() == false) { - if (decisionMap.containsKey(shard)) { - executeDecision(shard, decisionMap.remove(shard), allocation, iterator); - } - } else { - // no need to keep iterating the unassigned shards, if we don't have anything in decision map - break; - } - } catch (Exception e) { - logger.error("Failed to execute decision for shard {} while initializing {}", shard, e); - throw e; - } - } - } - - private void executeDecision( + protected void executeDecision( ShardRouting shardRouting, AllocateUnassignedDecision allocateUnassignedDecision, RoutingAllocation allocation, @@ -135,8 +101,6 @@ private void executeDecision( } } - public void allocateUnassignedBatch(String batchId, RoutingAllocation allocation) {} - protected long getExpectedShardSize(ShardRouting shardRouting, RoutingAllocation allocation) { if (shardRouting.primary()) { if (shardRouting.recoverySource().getType() == RecoverySource.Type.SNAPSHOT) { @@ -165,21 +129,6 @@ public abstract AllocateUnassignedDecision makeAllocationDecision( Logger logger ); - public HashMap makeAllocationDecision( - List unassignedShardBatch, - RoutingAllocation allocation, - Logger logger - ) { - - return (HashMap) unassignedShardBatch.stream() - .collect( - Collectors.toMap( - unassignedShard -> unassignedShard, - unassignedShard -> makeAllocationDecision(unassignedShard, allocation, logger) - ) - ); - } - /** * Builds decisions for all nodes in the cluster, so that the explain API can provide information on * allocation decisions for each node, while still waiting to allocate the shard (e.g. due to fetching shard data). diff --git a/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java b/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java index 1979f33484d49..7518d74ff17b8 100644 --- a/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java @@ -14,6 +14,7 @@ import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.opensearch.cluster.routing.allocation.RoutingAllocation; +import org.opensearch.core.index.shard.ShardId; import org.opensearch.gateway.AsyncShardFetch.FetchResult; import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.GatewayStartedShard; import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.NodeGatewayStartedShard; @@ -24,6 +25,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; /** * PrimaryShardBatchAllocator is similar to {@link org.opensearch.gateway.PrimaryShardAllocator} only difference is @@ -61,50 +64,58 @@ protected FetchResult shardsState = fetchData( + List.of(unassignedShard), + Collections.emptyList(), + allocation + ); + List nodeGatewayStartedShards = adaptToNodeShardStates(unassignedShard, shardsState); + return getAllocationDecision(unassignedShard, allocation, nodeGatewayStartedShards, logger); } /** - * Build allocation decisions for all the shards present in the batch identified by batchId. - * - * @param shards set of shards given for allocation - * @param allocation current allocation of all the shards - * @param logger logger used for logging - * @return shard to allocation decision map + * Allocate Batch of unassigned shard to nodes where valid copies of the shard already exists + * @param shardRoutings the shards to allocate + * @param allocation the allocation state container object */ - @Override - public HashMap makeAllocationDecision( - List shards, - RoutingAllocation allocation, - Logger logger - ) { - HashMap shardAllocationDecisions = new HashMap<>(); + public void allocateUnassignedBatch(List shardRoutings, RoutingAllocation allocation) { + HashMap ineligibleShardAllocationDecisions = new HashMap<>(); List eligibleShards = new ArrayList<>(); List inEligibleShards = new ArrayList<>(); // identify ineligible shards - for (ShardRouting shard : shards) { + for (ShardRouting shard : shardRoutings) { AllocateUnassignedDecision decision = getInEligibleShardDecision(shard, allocation); if (decision != null) { + ineligibleShardAllocationDecisions.put(shard.shardId(), decision); inEligibleShards.add(shard); - shardAllocationDecisions.put(shard, decision); } else { eligibleShards.add(shard); } } - // Do not call fetchData if there are no eligible shards - if (eligibleShards.isEmpty()) { - return shardAllocationDecisions; - } + // only fetch data for eligible shards final FetchResult shardsState = fetchData(eligibleShards, inEligibleShards, allocation); - // process the received data - for (ShardRouting unassignedShard : eligibleShards) { - List nodeShardStates = adaptToNodeShardStates(unassignedShard, shardsState); - // get allocation decision for this shard - shardAllocationDecisions.put(unassignedShard, getAllocationDecision(unassignedShard, allocation, nodeShardStates, logger)); + Set shardIdsFromBatch = shardRoutings.stream().map(shardRouting -> shardRouting.shardId()).collect(Collectors.toSet()); + RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator(); + while (iterator.hasNext()) { + ShardRouting unassignedShard = iterator.next(); + AllocateUnassignedDecision allocationDecision; + + if (shardIdsFromBatch.contains(unassignedShard.shardId())) { + if (ineligibleShardAllocationDecisions.containsKey(unassignedShard.shardId())) { + allocationDecision = ineligibleShardAllocationDecisions.get(unassignedShard.shardId()); + } else { + List nodeShardStates = adaptToNodeShardStates(unassignedShard, shardsState); + allocationDecision = getAllocationDecision(unassignedShard, allocation, nodeShardStates, logger); + } + executeDecision(unassignedShard, allocationDecision, allocation, iterator); + } } - return shardAllocationDecisions; } /** diff --git a/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java b/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java index be7867b7823f6..f99dfa6fee14f 100644 --- a/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.Logger; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.RoutingNodes; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.UnassignedInfo; import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; @@ -29,6 +30,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** * Allocates replica shards in a batch mode @@ -98,25 +100,47 @@ protected FetchResult> result = canBeAllocatedToAtLeastOneNode(unassignedShard, allocation); + Decision allocateDecision = result.v1(); + if (allocateDecision.type() != Decision.Type.YES + && (allocation.debugDecision() == false || hasInitiatedFetching(unassignedShard) == false)) { + // only return early if we are not in explain mode, or we are in explain mode but we have not + // yet attempted to fetch any shard data + logger.trace("{}: ignoring allocation, can't be allocated on any node", unassignedShard); + return AllocateUnassignedDecision.no( + UnassignedInfo.AllocationStatus.fromDecision(allocateDecision.type()), + result.v2() != null ? new ArrayList<>(result.v2().values()) : null + ); + } + + final FetchResult shardsState = fetchData( + List.of(unassignedShard), + Collections.emptyList(), + allocation + ); + Map nodeShardStores = convertToNodeStoreFilesMetadataMap(unassignedShard, shardsState); + return getAllocationDecision(unassignedShard, allocation, nodeShardStores, result, logger); } - @Override - public HashMap makeAllocationDecision( - List shards, - RoutingAllocation allocation, - Logger logger - ) { - HashMap shardAllocationDecisions = new HashMap<>(); + /** + * Allocate Batch of unassigned shard to nodes where valid copies of the shard already exists + * @param shardRoutings the shards to allocate + * @param allocation the allocation state container object + */ + public void allocateUnassignedBatch(List shardRoutings, RoutingAllocation allocation) { + HashMap ineligibleShardAllocationDecisions = new HashMap<>(); final boolean explain = allocation.debugDecision(); List eligibleShards = new ArrayList<>(); List ineligibleShards = new ArrayList<>(); HashMap>> nodeAllocationDecisions = new HashMap<>(); - for (ShardRouting shard : shards) { + for (ShardRouting shard : shardRoutings) { if (!isResponsibleFor(shard)) { // this allocator n is not responsible for allocating this shard ineligibleShards.add(shard); - shardAllocationDecisions.put(shard, AllocateUnassignedDecision.NOT_TAKEN); + ineligibleShardAllocationDecisions.put(shard.shardId(), AllocateUnassignedDecision.NOT_TAKEN); continue; } @@ -126,8 +150,9 @@ public HashMap makeAllocationDecision( // only return early if we are not in explain mode, or we are in explain mode but we have not // yet attempted to fetch any shard data logger.trace("{}: ignoring allocation, can't be allocated on any node", shard); - shardAllocationDecisions.put( - shard, + ineligibleShards.add(shard); + ineligibleShardAllocationDecisions.put( + shard.shardId(), AllocateUnassignedDecision.no( UnassignedInfo.AllocationStatus.fromDecision(allocationDecision.type()), result.v2() != null ? new ArrayList<>(result.v2().values()) : null @@ -142,27 +167,34 @@ public HashMap makeAllocationDecision( eligibleShards.add(shard); } - // Do not call fetchData if there are no eligible shards - if (eligibleShards.isEmpty()) { - return shardAllocationDecisions; - } // only fetch data for eligible shards final FetchResult shardsState = fetchData(eligibleShards, ineligibleShards, allocation); - for (ShardRouting unassignedShard : eligibleShards) { - Tuple> result = nodeAllocationDecisions.get(unassignedShard); - shardAllocationDecisions.put( - unassignedShard, - getAllocationDecision( - unassignedShard, - allocation, - convertToNodeStoreFilesMetadataMap(unassignedShard, shardsState), - result, - logger - ) - ); + List shardIdsFromBatch = shardRoutings.stream().map(shardRouting -> shardRouting.shardId()).collect(Collectors.toList()); + RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator(); + while (iterator.hasNext()) { + ShardRouting unassignedShard = iterator.next(); + // There will be only one entry for the shard in the unassigned shards batch + // for a shard with multiple unassigned replicas, hence we are comparing the shard ids + // instead of ShardRouting in-order to evaluate shard assignment for all unassigned replicas of a shard. + if (!unassignedShard.primary() && shardIdsFromBatch.contains(unassignedShard.shardId())) { + logger.info("Executing allocation decision for shard {}", shardIdsFromBatch); + AllocateUnassignedDecision allocateUnassignedDecision; + if (ineligibleShardAllocationDecisions.containsKey(unassignedShard.shardId())) { + allocateUnassignedDecision = ineligibleShardAllocationDecisions.get(unassignedShard.shardId()); + } else { + Tuple> result = nodeAllocationDecisions.get(unassignedShard); + allocateUnassignedDecision = getAllocationDecision( + unassignedShard, + allocation, + convertToNodeStoreFilesMetadataMap(unassignedShard, shardsState), + result, + logger + ); + } + executeDecision(unassignedShard, allocateUnassignedDecision, allocation, iterator); + } } - return shardAllocationDecisions; } private Map convertToNodeStoreFilesMetadataMap( diff --git a/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java index 522ad2a64ea5d..729e484575bce 100644 --- a/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java @@ -9,6 +9,7 @@ import org.apache.lucene.codecs.Codec; import org.opensearch.Version; +import org.opensearch.cluster.ClusterInfo; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.OpenSearchAllocationTestCase; @@ -19,12 +20,13 @@ import org.opensearch.cluster.routing.RoutingNodes; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.UnassignedInfo; -import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; -import org.opensearch.cluster.routing.allocation.AllocationDecision; import org.opensearch.cluster.routing.allocation.RoutingAllocation; import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; +import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.opensearch.common.Nullable; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.set.Sets; import org.opensearch.core.index.shard.ShardId; @@ -44,6 +46,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import static org.opensearch.cluster.routing.UnassignedInfo.Reason.CLUSTER_RECOVERED; @@ -69,13 +72,6 @@ public void buildTestAllocator() { this.batchAllocator = new TestBatchAllocator(); } - private void allocateAllUnassigned(final RoutingAllocation allocation) { - final RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator(); - while (iterator.hasNext()) { - batchAllocator.allocateUnassigned(iterator.next(), allocation, iterator); - } - } - private void allocateAllUnassignedBatch(final RoutingAllocation allocation) { final RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator(); List shardsToBatch = new ArrayList<>(); @@ -85,61 +81,60 @@ private void allocateAllUnassignedBatch(final RoutingAllocation allocation) { batchAllocator.allocateUnassignedBatch(shardsToBatch, allocation); } - public void testMakeAllocationDecisionDataFetching() { - final RoutingAllocation allocation = routingAllocationWithOnePrimary(noAllocationDeciders(), CLUSTER_RECOVERED, "allocId1"); - - List shards = new ArrayList<>(); - allocateAllUnassignedBatch(allocation); - ShardRouting shard = allocation.routingTable().getIndicesRouting().get("test").shard(shardId.id()).primaryShard(); - shards.add(shard); - HashMap allDecisions = batchAllocator.makeAllocationDecision(shards, allocation, logger); - // verify we get decisions for all the shards - assertEquals(shards.size(), allDecisions.size()); - assertEquals(shards, new ArrayList<>(allDecisions.keySet())); - assertEquals(AllocationDecision.AWAITING_INFO, allDecisions.get(shard).getAllocationDecision()); - } - - public void testMakeAllocationDecisionForReplicaShard() { - final RoutingAllocation allocation = routingAllocationWithOnePrimary(noAllocationDeciders(), CLUSTER_RECOVERED, "allocId1"); + public void testAllocateUnassignedBatch() { + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + AllocationDeciders allocationDeciders = randomAllocationDeciders(Settings.builder().build(), clusterSettings, random()); + setUpShards(2); + final RoutingAllocation routingAllocation = routingAllocationWithMultiplePrimaries( + allocationDeciders, + CLUSTER_RECOVERED, + 2, + 0, + "allocId-0", + "allocId-1" + ); - List replicaShards = allocation.routingTable().getIndicesRouting().get("test").shard(shardId.id()).replicaShards(); - List shards = new ArrayList<>(replicaShards); - HashMap allDecisions = batchAllocator.makeAllocationDecision(shards, allocation, logger); - // verify we get decisions for all the shards - assertEquals(shards.size(), allDecisions.size()); - assertEquals(shards, new ArrayList<>(allDecisions.keySet())); - assertFalse(allDecisions.get(replicaShards.get(0)).isDecisionTaken()); - } + for (ShardId shardId : shardsInBatch) { + batchAllocator.addShardData( + node1, + "allocId-" + shardId.id(), + shardId, + true, + new ReplicationCheckpoint(shardId, 20, 101, 1, Codec.getDefault().getName()), + null + ); + } - public void testMakeAllocationDecisionDataFetched() { - final RoutingAllocation allocation = routingAllocationWithOnePrimary(noAllocationDeciders(), CLUSTER_RECOVERED, "allocId1"); + allocateAllUnassignedBatch(routingAllocation); - List shards = new ArrayList<>(); - ShardRouting shard = allocation.routingTable().getIndicesRouting().get("test").shard(shardId.id()).primaryShard(); - shards.add(shard); - batchAllocator.addData(node1, "allocId1", true, new ReplicationCheckpoint(shardId, 20, 101, 1, Codec.getDefault().getName())); - HashMap allDecisions = batchAllocator.makeAllocationDecision(shards, allocation, logger); - // verify we get decisions for all the shards - assertEquals(shards.size(), allDecisions.size()); - assertEquals(shards, new ArrayList<>(allDecisions.keySet())); - assertEquals(AllocationDecision.YES, allDecisions.get(shard).getAllocationDecision()); + assertEquals(0, routingAllocation.routingNodes().unassigned().size()); + List initializingShards = routingAllocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING); + assertEquals(2, initializingShards.size()); + assertTrue(shardsInBatch.contains(initializingShards.get(0).shardId())); + assertTrue(shardsInBatch.contains(initializingShards.get(1).shardId())); + assertEquals(2, routingAllocation.routingNodes().getInitialPrimariesIncomingRecoveries(node1.getId())); } - public void testMakeAllocationDecisionDataFetchedMultipleShards() { + public void testAllocateUnassignedBatchThrottlingAllocationDeciderIsHonoured() { + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + AllocationDeciders allocationDeciders = randomAllocationDeciders( + Settings.builder() + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey(), 1) + .build(), + clusterSettings, + random() + ); setUpShards(2); - final RoutingAllocation allocation = routingAllocationWithMultiplePrimaries( - noAllocationDeciders(), + final RoutingAllocation routingAllocation = routingAllocationWithMultiplePrimaries( + allocationDeciders, CLUSTER_RECOVERED, 2, 0, "allocId-0", "allocId-1" ); - List shards = new ArrayList<>(); + for (ShardId shardId : shardsInBatch) { - ShardRouting shard = allocation.routingTable().getIndicesRouting().get("test").shard(shardId.id()).primaryShard(); - allocation.routingTable().getIndicesRouting().get("test").shard(shardId.id()).primaryShard().recoverySource(); - shards.add(shard); batchAllocator.addShardData( node1, "allocId-" + shardId.id(), @@ -149,13 +144,16 @@ public void testMakeAllocationDecisionDataFetchedMultipleShards() { null ); } - HashMap allDecisions = batchAllocator.makeAllocationDecision(shards, allocation, logger); - // verify we get decisions for all the shards - assertEquals(shards.size(), allDecisions.size()); - assertEquals(new HashSet<>(shards), allDecisions.keySet()); - for (ShardRouting shard : shards) { - assertEquals(AllocationDecision.YES, allDecisions.get(shard).getAllocationDecision()); - } + + allocateAllUnassignedBatch(routingAllocation); + + // Verify the throttling decider was not throttled, recovering shards on node greater than initial concurrent recovery setting + assertEquals(1, routingAllocation.routingNodes().getInitialPrimariesIncomingRecoveries(node1.getId())); + List initializingShards = routingAllocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING); + assertEquals(1, initializingShards.size()); + Set nodesWithInitialisingShards = initializingShards.stream().map(ShardRouting::currentNodeId).collect(Collectors.toSet()); + assertEquals(1, nodesWithInitialisingShards.size()); + assertEquals(Collections.singleton(node1.getId()), nodesWithInitialisingShards); } private RoutingAllocation routingAllocationWithOnePrimary( @@ -235,7 +233,7 @@ private RoutingAllocation routingAllocationWithMultiplePrimaries( .routingTable(routingTableBuilder.build()) .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)) .build(); - return new RoutingAllocation(deciders, new RoutingNodes(state, false), state, null, null, System.nanoTime()); + return new RoutingAllocation(deciders, new RoutingNodes(state, false), state, ClusterInfo.EMPTY, null, System.nanoTime()); } class TestBatchAllocator extends PrimaryShardBatchAllocator { diff --git a/server/src/test/java/org/opensearch/gateway/ReplicaShardBatchAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/ReplicaShardBatchAllocatorTests.java index 464038c93228b..892aaaad16ee3 100644 --- a/server/src/test/java/org/opensearch/gateway/ReplicaShardBatchAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/ReplicaShardBatchAllocatorTests.java @@ -33,11 +33,13 @@ import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; import org.opensearch.cluster.routing.allocation.decider.Decision; import org.opensearch.cluster.routing.allocation.decider.SameShardAllocationDecider; +import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.opensearch.common.Nullable; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.set.Sets; +import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.engine.Engine; import org.opensearch.index.seqno.ReplicationTracker; @@ -72,6 +74,7 @@ public class ReplicaShardBatchAllocatorTests extends OpenSearchAllocationTestCas private static final org.apache.lucene.util.Version MIN_SUPPORTED_LUCENE_VERSION = org.opensearch.Version.CURRENT .minimumIndexCompatibilityVersion().luceneVersion; private final ShardId shardId = new ShardId("test", "_na_", 0); + private static Set shardsInBatch; private final DiscoveryNode node1 = newNode("node1"); private final DiscoveryNode node2 = newNode("node2"); private final DiscoveryNode node3 = newNode("node3"); @@ -83,6 +86,14 @@ public void buildTestAllocator() { this.testBatchAllocator = new TestBatchAllocator(); } + public static void setUpShards(int numberOfShards) { + shardsInBatch = new HashSet<>(); + for (int shardNumber = 0; shardNumber < numberOfShards; shardNumber++) { + ShardId shardId = new ShardId("test", "_na_", shardNumber); + shardsInBatch.add(shardId); + } + } + private void allocateAllUnassignedBatch(final RoutingAllocation allocation) { final RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator(); List shardToBatch = new ArrayList<>(); @@ -115,8 +126,6 @@ public void testAsyncFetchWithNoShardOnIndexCreation() { ); testBatchAllocator.clean(); allocateAllUnassignedBatch(allocation); - assertThat(testBatchAllocator.getFetchDataCalledAndClean(), equalTo(false)); - assertThat(testBatchAllocator.getShardEligibleFetchDataCountAndClean(), equalTo(0)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(1)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).get(0).shardId(), equalTo(shardId)); } @@ -634,6 +643,50 @@ public void testDoNotCancelForBrokenNode() { assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED), empty()); } + public void testAllocateUnassignedBatchThrottlingAllocationDeciderIsHonoured() throws InterruptedException { + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + AllocationDeciders allocationDeciders = randomAllocationDeciders( + Settings.builder() + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), 1) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), 1) + .build(), + clusterSettings, + random() + ); + setUpShards(2); + final RoutingAllocation routingAllocation = routingAllocationWithMultipleShards(allocationDeciders, 2, 1); + for (ShardId shardIdFromBatch : shardsInBatch) { + testBatchAllocator.addShardData( + node1, + shardIdFromBatch, + "MATCH", + null, + new StoreFileMetadata("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION) + ) + .addShardData( + node2, + shardIdFromBatch, + "NO_MATCH", + null, + new StoreFileMetadata("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION) + ) + .addShardData( + node3, + shardIdFromBatch, + "MATCH", + null, + new StoreFileMetadata("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION) + ); + } + allocateAllUnassignedBatch(routingAllocation); + // Verify the throttling decider was throttled, incoming recoveries on a node should be + // lesser than or equal to allowed concurrent recoveries + assertEquals(0, routingAllocation.routingNodes().getIncomingRecoveries(node2.getId())); + assertEquals(1, routingAllocation.routingNodes().getIncomingRecoveries(node3.getId())); + List initializingShards = routingAllocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING); + assertEquals(1, initializingShards.size()); + } + private RoutingAllocation onePrimaryOnNode1And1Replica(AllocationDeciders deciders) { return onePrimaryOnNode1And1Replica(deciders, Settings.EMPTY, UnassignedInfo.Reason.CLUSTER_RECOVERED); } @@ -692,6 +745,87 @@ private RoutingAllocation onePrimaryOnNode1And1Replica(AllocationDeciders decide ); } + private RoutingAllocation routingAllocationWithMultipleShards(AllocationDeciders deciders, int primaries, int replicas) + throws InterruptedException { + return routingAllocationWithMultipleShards(deciders, Settings.EMPTY, primaries, replicas, UnassignedInfo.Reason.CLUSTER_RECOVERED); + } + + private RoutingAllocation routingAllocationWithMultipleShards( + AllocationDeciders deciders, + Settings settings, + int primaries, + int replicas, + UnassignedInfo.Reason reason + ) throws InterruptedException { + Map shardIdShardRoutingMap = new HashMap<>(); + Index index = null; + for (ShardId shardIdFromBatch : shardsInBatch) { + shardIdShardRoutingMap.put( + shardIdFromBatch, + TestShardRouting.newShardRouting(shardIdFromBatch, node1.getId(), true, ShardRoutingState.STARTED) + ); + if (index == null) { + index = shardIdFromBatch.getIndex(); + } + } + IndexMetadata.Builder indexMetadata = IndexMetadata.builder(index.getName()) + .settings(settings(Version.CURRENT).put(settings)) + .numberOfShards(primaries) + .numberOfReplicas(replicas); + for (ShardId shardIdFromBatch : shardsInBatch) { + indexMetadata.putInSyncAllocationIds( + shardIdFromBatch.id(), + Sets.newHashSet(shardIdShardRoutingMap.get(shardIdFromBatch).allocationId().getId()) + ); + } + Metadata metadata = Metadata.builder().put(indexMetadata).build(); + // mark shard as delayed if reason is NODE_LEFT + boolean delayed = reason == UnassignedInfo.Reason.NODE_LEFT + && UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(settings).nanos() > 0; + int failedAllocations = reason == UnassignedInfo.Reason.ALLOCATION_FAILED ? 1 : 0; + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); + for (ShardId shardIdFromBatch : shardsInBatch) { + IndexShardRoutingTable.Builder indexShardRoutingTableBuilder = new IndexShardRoutingTable.Builder(shardIdFromBatch); + indexShardRoutingTableBuilder.addShard(shardIdShardRoutingMap.get(shardIdFromBatch)); + for (int i = 0; i < replicas; i++) { + indexShardRoutingTableBuilder.addShard( + ShardRouting.newUnassigned( + shardIdFromBatch, + false, + RecoverySource.PeerRecoverySource.INSTANCE, + new UnassignedInfo( + reason, + null, + null, + failedAllocations, + System.nanoTime(), + System.currentTimeMillis(), + delayed, + UnassignedInfo.AllocationStatus.NO_ATTEMPT, + Collections.emptySet() + ) + ) + ); + } + indexRoutingTableBuilder.addIndexShard(indexShardRoutingTableBuilder.build()); + } + + RoutingTable routingTable = RoutingTable.builder().add(indexRoutingTableBuilder.build()).build(); + ClusterState state = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)) + .build(); + return new RoutingAllocation( + deciders, + new RoutingNodes(state, false), + state, + ClusterInfo.EMPTY, + SnapshotShardSizeInfo.EMPTY, + System.nanoTime() + ); + } + private RoutingAllocation onePrimaryOnNode1And1ReplicaRecovering(AllocationDeciders deciders, UnassignedInfo unassignedInfo) { ShardRouting primaryShard = TestShardRouting.newShardRouting(shardId, node1.getId(), true, ShardRoutingState.STARTED); Metadata metadata = Metadata.builder() @@ -755,7 +889,7 @@ static String randomSyncId() { } class TestBatchAllocator extends ReplicaShardBatchAllocator { - private Map data = null; + private Map data = null; private AtomicBoolean fetchDataCalled = new AtomicBoolean(false); private AtomicInteger eligibleShardFetchDataCount = new AtomicInteger(0); @@ -800,6 +934,55 @@ public TestBatchAllocator addData( } data.put( node, + new TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch( + node, + Map.of( + shardId, + new TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata( + new TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata( + shardId, + new Store.MetadataSnapshot(unmodifiableMap(filesAsMap), unmodifiableMap(commitData), randomInt()), + peerRecoveryRetentionLeases + ), + storeFileFetchException + ) + ) + ) + ); + return this; + } + + public TestBatchAllocator addShardData( + DiscoveryNode node, + ShardId shardId, + String syncId, + @Nullable Exception storeFileFetchException, + StoreFileMetadata... files + ) { + return addShardData(node, Collections.emptyList(), shardId, syncId, storeFileFetchException, files); + } + + public TestBatchAllocator addShardData( + DiscoveryNode node, + List peerRecoveryRetentionLeases, + ShardId shardId, + String syncId, + @Nullable Exception storeFileFetchException, + StoreFileMetadata... files + ) { + if (data == null) { + data = new HashMap<>(); + } + Map filesAsMap = new HashMap<>(); + for (StoreFileMetadata file : files) { + filesAsMap.put(file.name(), file); + } + Map commitData = new HashMap<>(); + if (syncId != null) { + commitData.put(Engine.SYNC_COMMIT_ID, syncId); + } + + TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata nodeStoreFilesMetadata = new TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata( new TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata( shardId, @@ -807,8 +990,19 @@ public TestBatchAllocator addData( peerRecoveryRetentionLeases ), storeFileFetchException - ) + ); + Map shardIdNodeStoreFilesMetadataHashMap = + new HashMap<>(); + if (data.containsKey(node)) { + NodeStoreFilesMetadataBatch nodeStoreFilesMetadataBatch = data.get(node); + shardIdNodeStoreFilesMetadataHashMap.putAll(nodeStoreFilesMetadataBatch.getNodeStoreFilesMetadataBatch()); + } + shardIdNodeStoreFilesMetadataHashMap.put(shardId, nodeStoreFilesMetadata); + data.put( + node, + new TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch(node, shardIdNodeStoreFilesMetadataHashMap) ); + return this; } @@ -820,25 +1014,7 @@ protected AsyncShardFetch.FetchResult fetchData( ) { fetchDataCalled.set(true); eligibleShardFetchDataCount.set(eligibleShards.size()); - Map tData = null; - if (data != null) { - tData = new HashMap<>(); - for (Map.Entry entry : data.entrySet()) { - Map shardData = Map.of( - shardId, - entry.getValue() - ); - tData.put( - entry.getKey(), - new TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch(entry.getKey(), shardData) - ); - } - } - return new AsyncShardFetch.FetchResult<>(tData, new HashMap<>() { - { - put(shardId, Collections.emptySet()); - } - }); + return new AsyncShardFetch.FetchResult<>(data, Collections.>emptyMap()); } @Override