From f376d721bf8a1503dd358993b170bf1255f667e6 Mon Sep 17 00:00:00 2001 From: Arpit Bandejiya Date: Tue, 26 Mar 2024 19:53:48 +0530 Subject: [PATCH] address comments Signed-off-by: Arpit Bandejiya --- .../SegmentReplicationAllocationIT.java | 87 +++++++++++++++++- .../allocation/AllocationConstraints.java | 7 +- .../allocation/AllocationParameter.java | 24 ----- .../routing/allocation/ConstraintTypes.java | 2 +- .../allocator/BalancedShardsAllocator.java | 4 +- .../allocator/LocalShardsBalancer.java | 92 ++++--------------- .../allocator/MinWeightedNodeDecision.java | 73 +++++++++++++++ .../allocation/BalanceConfigurationTests.java | 88 +++++++++++++++--- 8 files changed, 254 insertions(+), 123 deletions(-) delete mode 100644 server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationParameter.java create mode 100644 server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/MinWeightedNodeDecision.java diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationAllocationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationAllocationIT.java index 30edea6551067..669e24f9fb555 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationAllocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationAllocationIT.java @@ -31,6 +31,9 @@ import java.util.stream.Collectors; import static org.opensearch.cluster.routing.ShardRoutingState.STARTED; +import static org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.PREFER_PRIMARY_SHARD_BALANCE; +import static org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.PREFER_PRIMARY_SHARD_REBALANCE; +import static org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.PRIMARY_SHARD_REBALANCE_BUFFER; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) @@ -58,6 +61,20 @@ public void enablePreferPrimaryBalance() { ); } + public void setAllocationRelocationStrategy(boolean preferPrimaryBalance, boolean preferPrimaryRebalance, float buffer) { + assertAcked( + client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings( + Settings.builder() + .put(PREFER_PRIMARY_SHARD_BALANCE.getKey(), preferPrimaryBalance) + .put(PREFER_PRIMARY_SHARD_REBALANCE.getKey(), preferPrimaryRebalance) + .put(PRIMARY_SHARD_REBALANCE_BUFFER.getKey(), buffer) + ) + ); + } + /** * This test verifies that the overall primary balance is attained during allocation. This test verifies primary * balance per index and across all indices is maintained. @@ -87,7 +104,7 @@ public void testGlobalPrimaryAllocation() throws Exception { state = client().admin().cluster().prepareState().execute().actionGet().getState(); logger.info(ShardAllocations.printShardDistribution(state)); verifyPerIndexPrimaryBalance(); - verifyPrimaryBalance(); + verifyPrimaryBalance(0.0f); } /** @@ -224,6 +241,70 @@ public void testAllocationWithDisruption() throws Exception { verifyPerIndexPrimaryBalance(); } + /** + * Similar to testSingleIndexShardAllocation test but creates multiple indices, multiple nodes adding in and getting + * removed. The test asserts post each such event that primary shard distribution is balanced for each index as well as across the nodes + * when the PREFER_PRIMARY_SHARD_REBALANCE is set to true + */ + public void testAllocationAndRebalanceWithDisruption() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + final int maxReplicaCount = 2; + final int maxShardCount = 2; + // Create higher number of nodes than number of shards to reduce chances of SameShardAllocationDecider kicking-in + // and preventing primary relocations + final int nodeCount = randomIntBetween(5, 10); + final int numberOfIndices = randomIntBetween(1, 10); + final float buffer = randomIntBetween(1, 4) * 0.10f; + + logger.info("--> Creating {} nodes", nodeCount); + final List nodeNames = new ArrayList<>(); + for (int i = 0; i < nodeCount; i++) { + nodeNames.add(internalCluster().startNode()); + } + setAllocationRelocationStrategy(true, true, buffer); + + int shardCount, replicaCount; + ClusterState state; + for (int i = 0; i < numberOfIndices; i++) { + shardCount = randomIntBetween(1, maxShardCount); + replicaCount = randomIntBetween(1, maxReplicaCount); + logger.info("--> Creating index test{} with primary {} and replica {}", i, shardCount, replicaCount); + createIndex("test" + i, shardCount, replicaCount, i % 2 == 0); + ensureGreen(TimeValue.timeValueSeconds(60)); + if (logger.isTraceEnabled()) { + state = client().admin().cluster().prepareState().execute().actionGet().getState(); + logger.info(ShardAllocations.printShardDistribution(state)); + } + } + state = client().admin().cluster().prepareState().execute().actionGet().getState(); + logger.info(ShardAllocations.printShardDistribution(state)); + verifyPerIndexPrimaryBalance(); + verifyPrimaryBalance(buffer); + + final int additionalNodeCount = randomIntBetween(1, 5); + logger.info("--> Adding {} nodes", additionalNodeCount); + + internalCluster().startNodes(additionalNodeCount); + ensureGreen(TimeValue.timeValueSeconds(60)); + state = client().admin().cluster().prepareState().execute().actionGet().getState(); + logger.info(ShardAllocations.printShardDistribution(state)); + verifyPerIndexPrimaryBalance(); + verifyPrimaryBalance(buffer); + + int nodeCountToStop = additionalNodeCount; + while (nodeCountToStop > 0) { + internalCluster().stopRandomDataNode(); + // give replica a chance to promote as primary before terminating node containing the replica + ensureGreen(TimeValue.timeValueSeconds(60)); + nodeCountToStop--; + } + state = client().admin().cluster().prepareState().execute().actionGet().getState(); + logger.info("--> Cluster state post nodes stop {}", state); + logger.info(ShardAllocations.printShardDistribution(state)); + verifyPerIndexPrimaryBalance(); + verifyPrimaryBalance(buffer); + } + /** * Utility method which ensures cluster has balanced primary shard distribution across a single index. * @throws Exception exception @@ -263,7 +344,7 @@ private void verifyPerIndexPrimaryBalance() throws Exception { }, 60, TimeUnit.SECONDS); } - private void verifyPrimaryBalance() throws Exception { + private void verifyPrimaryBalance(float buffer) throws Exception { assertBusy(() -> { final ClusterState currentState = client().admin().cluster().prepareState().execute().actionGet().getState(); RoutingNodes nodes = currentState.getRoutingNodes(); @@ -278,7 +359,7 @@ private void verifyPrimaryBalance() throws Exception { .filter(ShardRouting::primary) .collect(Collectors.toList()) .size(); - assertTrue(primaryCount <= avgPrimaryShardsPerNode); + assertTrue(primaryCount <= (avgPrimaryShardsPerNode * (1 + buffer))); } }, 60, TimeUnit.SECONDS); } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java index a7732cde0d7f0..fb39ba0053486 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java @@ -28,14 +28,11 @@ public class AllocationConstraints { private Map constraints; - public AllocationConstraints(AllocationParameter allocationParameter) { + public AllocationConstraints() { this.constraints = new HashMap<>(); this.constraints.putIfAbsent(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, new Constraint(isIndexShardsPerNodeBreached())); this.constraints.putIfAbsent(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPerIndexPrimaryShardsPerNodeBreached())); - this.constraints.putIfAbsent( - CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, - new Constraint(isPrimaryShardsPerNodeBreached(allocationParameter.getPreferPrimaryBalanceBuffer())) - ); + this.constraints.putIfAbsent(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPrimaryShardsPerNodeBreached(0.0f))); } public void updateAllocationConstraint(String constraint, boolean enable) { diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationParameter.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationParameter.java deleted file mode 100644 index 2444f6405278e..0000000000000 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationParameter.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.cluster.routing.allocation; - -/** - * RebalanceConstraint Params - */ -public class AllocationParameter { - private final float preferPrimaryBalanceBuffer; - - public AllocationParameter(float preferPrimaryBalanceBuffer) { - this.preferPrimaryBalanceBuffer = preferPrimaryBalanceBuffer; - } - - public float getPreferPrimaryBalanceBuffer() { - return preferPrimaryBalanceBuffer; - } -} diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java index fa2bff875a6d5..08fe8f92d1f80 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java @@ -83,7 +83,7 @@ public static Predicate isPrimaryShardsPerNodeBreac return (params) -> { int primaryShardCount = params.getNode().numPrimaryShards(); int allowedPrimaryShardCount = (int) Math.ceil(params.getBalancer().avgPrimaryShardsPerNode() * (1 + buffer)); - return primaryShardCount > allowedPrimaryShardCount; + return primaryShardCount >= allowedPrimaryShardCount; }; } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index ef971d9cd5650..19b5347df911a 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -43,7 +43,6 @@ import org.opensearch.cluster.routing.UnassignedInfo.AllocationStatus; import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.opensearch.cluster.routing.allocation.AllocationConstraints; -import org.opensearch.cluster.routing.allocation.AllocationParameter; import org.opensearch.cluster.routing.allocation.ConstraintTypes; import org.opensearch.cluster.routing.allocation.MoveDecision; import org.opensearch.cluster.routing.allocation.RebalanceConstraints; @@ -439,9 +438,8 @@ static class WeightFunction { theta1 = indexBalance / sum; this.indexBalance = indexBalance; this.shardBalance = shardBalance; - AllocationParameter allocationParameter = new AllocationParameter(0.0f); RebalanceParameter rebalanceParameter = new RebalanceParameter(preferPrimaryBalanceBuffer); - this.constraints = new AllocationConstraints(allocationParameter); + this.constraints = new AllocationConstraints(); this.rebalanceConstraints = new RebalanceConstraints(rebalanceParameter); // Enable index shard per node breach constraint updateAllocationConstraint(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, true); diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java index bac057759fb66..adc86971e5f2f 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java @@ -28,7 +28,6 @@ import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; import org.opensearch.cluster.routing.allocation.decider.Decision; import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider; -import org.opensearch.common.Randomness; import org.opensearch.common.collect.Tuple; import org.opensearch.gateway.PriorityComparator; @@ -71,7 +70,7 @@ public class LocalShardsBalancer extends ShardsBalancer { private final float avgPrimaryShardsPerNode; private final BalancedShardsAllocator.NodeSorter sorter; private final Set inEligibleTargetNode; - private final boolean preferRandomShardAllocation; + private final boolean allowRandomAllocation; public LocalShardsBalancer( Logger logger, @@ -81,7 +80,7 @@ public LocalShardsBalancer( float threshold, boolean preferPrimaryBalance, boolean preferPrimaryRebalance, - boolean preferRandomShardAllocation + boolean allowRandomAllocation ) { this.logger = logger; this.allocation = allocation; @@ -98,7 +97,7 @@ public LocalShardsBalancer( this.preferPrimaryBalance = preferPrimaryBalance; this.preferPrimaryRebalance = preferPrimaryRebalance; this.shardMovementStrategy = shardMovementStrategy; - this.preferRandomShardAllocation = preferRandomShardAllocation; + this.allowRandomAllocation = allowRandomAllocation; } /** @@ -872,60 +871,7 @@ void allocateUnassigned() { // clear everything we have either added it or moved to ignoreUnassigned } - private class MinWeightedNodeDecision { - public float minWeight; - public Decision decision; - public List nodes; - - public MinWeightedNodeDecision(List nodes, float minWeight, Decision decision) { - this.minWeight = minWeight; - this.decision = decision; - this.nodes = nodes; - } - - public void updateMinNode(BalancedShardsAllocator.ModelNode node, boolean allowRandomAllocation) { - if (allowRandomAllocation) { - nodes.add(node); - } else { - if (nodes.isEmpty()) { - nodes.add(node); - } else { - nodes.set(0, node); - } - } - } - - public void updateMinNode(BalancedShardsAllocator.ModelNode node, float weight, Decision decision, boolean allowRandomAllocation) { - if (allowRandomAllocation) { - nodes.add(node); - } else { - if (nodes.isEmpty()) { - nodes.add(node); - } else { - nodes.set(0, node); - } - } - minWeight = weight; - this.decision = decision; - } - - public void clearAndUpdateMinNode(BalancedShardsAllocator.ModelNode node, float weight, Decision decision) { - nodes.clear(); - nodes.add(node); - minWeight = weight; - this.decision = decision; - } - - public BalancedShardsAllocator.ModelNode getMinNode(boolean allowRandomAllocation) { - if (allowRandomAllocation) { - return nodes.isEmpty() ? null : nodes.get(Randomness.get().nextInt(nodes.size())); - } else { - return nodes.isEmpty() ? null : nodes.get(0); - } - } - } - - MinWeightedNodeDecision updateEligibleNodes( + MinWeightedNodeDecision findMinWeightNode( MinWeightedNodeDecision minWeightedNodeDecision, Decision currentDecision, float currentWeight, @@ -946,8 +892,8 @@ MinWeightedNodeDecision updateEligibleNodes( * 2.b.1 Update the list if new decision is YES * 2.b.2 Append to list if new decision is THROTTLE */ - if (currentWeight == minWeightedNodeDecision.minWeight) { - if (currentDecision.type() == minWeightedNodeDecision.decision.type()) { + if (currentWeight == minWeightedNodeDecision.getMinWeight()) { + if (currentDecision.type() == minWeightedNodeDecision.getDecision().type()) { minWeightedNodeDecision.updateMinNode(node, allowRandomAllocation); } else { if (currentDecision.type() == Decision.Type.YES) { @@ -956,17 +902,15 @@ MinWeightedNodeDecision updateEligibleNodes( } } } else { - if (currentWeight < minWeightedNodeDecision.minWeight) { + if (currentWeight < minWeightedNodeDecision.getMinWeight()) { minWeightedNodeDecision.clearAndUpdateMinNode(node, currentWeight, currentDecision); } } } else { final boolean updateMinNode; BalancedShardsAllocator.ModelNode minNode = null; - if (!minWeightedNodeDecision.nodes.isEmpty()) { - minNode = minWeightedNodeDecision.nodes.get(0); - } - if (currentWeight == minWeightedNodeDecision.minWeight) { + minNode = minWeightedNodeDecision.getMinNode(allowRandomAllocation); + if (currentWeight == minWeightedNodeDecision.getMinWeight()) { /* we have an equal weight tie breaking: * 1. if one decision is YES prefer it * 2. prefer the node that holds the primary for this index with the next id in the ring ie. @@ -978,7 +922,7 @@ MinWeightedNodeDecision updateEligibleNodes( * than the id of the shard we need to assign. This works find when new indices are created since * primaries are added first and we only add one shard set a time in this algorithm. */ - if (currentDecision.type() == minWeightedNodeDecision.decision.type()) { + if (currentDecision.type() == minWeightedNodeDecision.getDecision().type()) { final int repId = shard.id(); final int nodeHigh = node.highestPrimary(shard.index().getName()); final int minNodeHigh = minNode.highestPrimary(shard.getIndexName()); @@ -988,7 +932,7 @@ MinWeightedNodeDecision updateEligibleNodes( updateMinNode = currentDecision.type() == Decision.Type.YES; } } else { - updateMinNode = currentWeight < minWeightedNodeDecision.minWeight; + updateMinNode = currentWeight < minWeightedNodeDecision.getMinWeight(); } if (updateMinNode) { minWeightedNodeDecision.updateMinNode(node, currentWeight, currentDecision, allowRandomAllocation); @@ -1036,7 +980,7 @@ AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting shard) { // weight of this index currently on the node float currentWeight = weight.weightWithAllocationConstraints(this, node, shard.getIndexName()); // moving the shard would not improve the balance, and we are not in explain mode, so short circuit - if (currentWeight > minWeightedNodeDecision.minWeight && explain == false) { + if (currentWeight > minWeightedNodeDecision.getMinWeight() && explain == false) { continue; } @@ -1045,19 +989,19 @@ AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting shard) { nodeExplanationMap.put(node.getNodeId(), new NodeAllocationResult(node.getRoutingNode().node(), currentDecision, 0)); nodeWeights.add(Tuple.tuple(node.getNodeId(), currentWeight)); } - minWeightedNodeDecision = updateEligibleNodes( + minWeightedNodeDecision = findMinWeightNode( minWeightedNodeDecision, currentDecision, currentWeight, - preferRandomShardAllocation, + allowRandomAllocation, shard, node ); } - if (minWeightedNodeDecision.decision == null) { + if (minWeightedNodeDecision.getDecision() == null) { // decision was not set and a node was not assigned, so treat it as a NO decision - minWeightedNodeDecision.decision = Decision.NO; + minWeightedNodeDecision.setDecision(Decision.NO); } List nodeDecisions = null; if (explain) { @@ -1071,9 +1015,9 @@ AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting shard) { } } - BalancedShardsAllocator.ModelNode minNode = minWeightedNodeDecision.getMinNode(preferRandomShardAllocation); + BalancedShardsAllocator.ModelNode minNode = minWeightedNodeDecision.getMinNode(allowRandomAllocation); return AllocateUnassignedDecision.fromDecision( - minWeightedNodeDecision.decision, + minWeightedNodeDecision.getDecision(), minNode != null ? minNode.getRoutingNode().node() : null, nodeDecisions ); diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/MinWeightedNodeDecision.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/MinWeightedNodeDecision.java new file mode 100644 index 0000000000000..b174f058b8f9d --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/MinWeightedNodeDecision.java @@ -0,0 +1,73 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation.allocator; + +import org.opensearch.cluster.routing.allocation.decider.Decision; +import org.opensearch.common.Randomness; + +import java.util.List; + +/** + * The MinWeightedNodeDecision is used to store the minWeighted nodes for taking allocation decision in + * {@link LocalShardsBalancer} + * + * @opensearch.internal + */ +public class MinWeightedNodeDecision { + private float minWeight; + private Decision decision; + private List nodes; + + public Decision getDecision() { + return decision; + } + + public void setDecision(Decision decision) { + this.decision = decision; + } + + public float getMinWeight() { + return minWeight; + } + + public MinWeightedNodeDecision(List nodes, float minWeight, Decision decision) { + this.minWeight = minWeight; + this.decision = decision; + this.nodes = nodes; + } + + public void updateMinNode(BalancedShardsAllocator.ModelNode node, boolean allowRandomAllocation) { + // Only for randomAllocation, we need to maintain all the possible nodes + if (!allowRandomAllocation) { + nodes.clear(); + } + nodes.add(node); + } + + public void updateMinNode(BalancedShardsAllocator.ModelNode node, float weight, Decision decision, boolean allowRandomAllocation) { + updateMinNode(node, allowRandomAllocation); + minWeight = weight; + this.decision = decision; + } + + public void clearAndUpdateMinNode(BalancedShardsAllocator.ModelNode node, float weight, Decision decision) { + nodes.clear(); + nodes.add(node); + minWeight = weight; + this.decision = decision; + } + + public BalancedShardsAllocator.ModelNode getMinNode(boolean allowRandomAllocation) { + if (allowRandomAllocation) { + return nodes.isEmpty() ? null : nodes.get(Randomness.get().nextInt(nodes.size())); + } else { + return nodes.isEmpty() ? null : nodes.get(0); + } + } +} diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java index fc5721a5380ed..6e0be9ce9a9e9 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java @@ -163,7 +163,6 @@ private Settings.Builder getSettingsBuilderForPrimaryBalance(boolean preferPrima settings.put(BalancedShardsAllocator.PREFER_PRIMARY_SHARD_REBALANCE.getKey(), preferPrimaryRebalance); settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), shardBalance); settings.put(BalancedShardsAllocator.THRESHOLD_SETTING.getKey(), balanceThreshold); - settings.put(BalancedShardsAllocator.ALLOW_RANDOM_ALLOCATION.getKey(), randomBoolean()); return settings; } @@ -251,16 +250,43 @@ public void testPrimaryBalanceWithPreferPrimaryBalanceSetting() { assertTrue(balanceFailed <= 1); } + /** + * This test verifies primary shard balance is attained setting. + */ + public void testPrimaryBalanceNotSolvedForNodeDropWithPreferPrimaryBalanceSetting() { + final int numberOfNodes = 4; + final int numberOfIndices = 4; + final int numberOfShards = 4; + final int numberOfReplicas = 1; + final int numberOfRuns = 5; + final float buffer = 0.10f; + int balanceFailed = 0; + + AllocationService strategy = createAllocationService(getSettingsBuilderForPrimaryBalance().build(), new TestGatewayAllocator()); + for (int i = 0; i < numberOfRuns; i++) { + ClusterState clusterState = initCluster(strategy, numberOfIndices, numberOfNodes, numberOfShards, numberOfReplicas); + clusterState = removeOneNode(clusterState, strategy); + logger.info(ShardAllocations.printShardDistribution(clusterState)); + try { + verifyPrimaryBalance(clusterState, buffer); + } catch (AssertionError | Exception e) { + balanceFailed++; + logger.info("Unexpected assertion failure"); + } + } + assertTrue(balanceFailed >= 4); + } + /** * This test verifies primary shard balance is attained with PREFER_PRIMARY_SHARD_BALANCE setting. */ - public void testPrimaryBalanceWithPreferPrimaryReBalanceSetting() { + public void testPrimaryBalanceSolvedWithPreferPrimaryRebalanceSetting() { final int numberOfNodes = 4; final int numberOfIndices = 4; final int numberOfShards = 4; final int numberOfReplicas = 1; final int numberOfRuns = 5; - final float buffer = 0.05f; + final float buffer = 0.10f; int balanceFailed = 0; AllocationService strategy = createAllocationService(getSettingsBuilderForPrimaryReBalance().build(), new TestGatewayAllocator()); @@ -418,18 +444,15 @@ public void testGlobalPrimaryBalance() throws Exception { * @throws Exception generic exception */ public void testGlobalPrimaryBalanceWithNodeDrops() throws Exception { - final float buffer = 0.05f; + final float buffer = 0.10f; AllocationService strategy = createAllocationService(getSettingsBuilderForPrimaryReBalance().build(), new TestGatewayAllocator()); ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).build(); - clusterState = addNode(clusterState, strategy); - clusterState = addNode(clusterState, strategy); - clusterState = addNode(clusterState, strategy); - clusterState = addNode(clusterState, strategy); - clusterState = addNode(clusterState, strategy); + clusterState = addNodes(clusterState, strategy, 5); - clusterState = addIndex(clusterState, strategy, "test-index1", 5, 1); - clusterState = addIndex(clusterState, strategy, "test-index2", 5, 1); - clusterState = addIndex(clusterState, strategy, "test-index3", 5, 1); + clusterState = addIndices(clusterState, strategy, 5, 1, 8); + + logger.info(ShardAllocations.printShardDistribution(clusterState)); + verifyPrimaryBalance(clusterState, buffer); clusterState = removeOneNode(clusterState, strategy); @@ -610,7 +633,7 @@ private void verifyPrimaryBalance(ClusterState clusterState, float buffer) throw .filter(ShardRouting::primary) .collect(Collectors.toList()) .size(); - assertTrue(primaryCount < (avgPrimaryShardsPerNode * (1 + buffer))); + assertTrue(primaryCount <= (avgPrimaryShardsPerNode * (1 + buffer))); } }, 60, TimeUnit.SECONDS); } @@ -712,6 +735,34 @@ private ClusterState addIndex( return applyAllocationUntilNoChange(clusterState, strategy); } + private ClusterState addIndices( + ClusterState clusterState, + AllocationService strategy, + int numberOfShards, + int numberOfReplicas, + int numberOfIndices + ) { + Metadata.Builder metadataBuilder = Metadata.builder(clusterState.getMetadata()); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(clusterState.routingTable()); + + for (int i = 0; i < numberOfIndices; i++) { + IndexMetadata.Builder index = IndexMetadata.builder("test" + i) + .settings(settings(Version.CURRENT)) + .numberOfShards(numberOfShards) + .numberOfReplicas(numberOfReplicas); + + metadataBuilder = metadataBuilder.put(index); + routingTableBuilder.addAsNew(index.build()); + } + + clusterState = ClusterState.builder(clusterState) + .metadata(metadataBuilder.build()) + .routingTable(routingTableBuilder.build()) + .build(); + clusterState = strategy.reroute(clusterState, "indices-created"); + return applyAllocationUntilNoChange(clusterState, strategy); + } + private ClusterState initCluster( AllocationService strategy, int numberOfIndices, @@ -751,6 +802,17 @@ private ClusterState initCluster( return applyAllocationUntilNoChange(clusterState, strategy); } + private ClusterState addNodes(ClusterState clusterState, AllocationService strategy, int numberOfNodes) { + logger.info("now, start [{}] more node, check that rebalancing will happen because we set it to always", numberOfNodes); + DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(clusterState.nodes()); + for (int i = 0; i < numberOfNodes; i++) { + nodes.add(newNode("node" + (clusterState.nodes().getSize() + i))); + } + clusterState = ClusterState.builder(clusterState).nodes(nodes.build()).build(); + clusterState = strategy.reroute(clusterState, "reroute"); + return applyStartedShardsUntilNoChange(clusterState, strategy); + } + private ClusterState addNode(ClusterState clusterState, AllocationService strategy) { logger.info("now, start 1 more node, check that rebalancing will happen because we set it to always"); clusterState = ClusterState.builder(clusterState)