diff --git a/CHANGELOG.md b/CHANGELOG.md index 8c3e63e36bc82..05de058fe8319 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Added - Add fingerprint ingest processor ([#13724](https://github.com/opensearch-project/OpenSearch/pull/13724)) - [Remote Store] Rate limiter for remote store low priority uploads ([#14374](https://github.com/opensearch-project/OpenSearch/pull/14374/)) +- Add allocation constraints buffer and setting to enable index balance constraint ([#14515](https://github.com/opensearch-project/OpenSearch/pull/14515)) - Apply the date histogram rewrite optimization to range aggregation ([#13865](https://github.com/opensearch-project/OpenSearch/pull/13865)) ### Dependencies diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java index 6702db4b43e91..e282e88d8bda0 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java @@ -28,11 +28,20 @@ public class AllocationConstraints { private Map constraints; - public AllocationConstraints() { + public AllocationConstraints(AllocationParameter allocationParameter) { this.constraints = new HashMap<>(); - this.constraints.put(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, new Constraint(isIndexShardsPerNodeBreached())); - this.constraints.put(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPerIndexPrimaryShardsPerNodeBreached())); - this.constraints.put(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPrimaryShardsPerNodeBreached(0.0f))); + this.constraints.put( + INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, + new Constraint(isIndexShardsPerNodeBreached(allocationParameter.getShardBalanceBuffer())) + ); + this.constraints.put( + INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, + new Constraint(isPerIndexPrimaryShardsPerNodeBreached(allocationParameter.getPrimaryBalanceIndexBuffer())) + ); + this.constraints.put( + CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, + new Constraint(isPrimaryShardsPerNodeBreached(allocationParameter.getPrimaryBalanceShardBuffer())) + ); } public void updateAllocationConstraint(String constraint, boolean enable) { @@ -43,4 +52,8 @@ public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode no Constraint.ConstraintParams params = new Constraint.ConstraintParams(balancer, node, index); return params.weight(constraints); } + + Constraint getAllocationConstraint(String name) { + return constraints.get(name); + } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationParameter.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationParameter.java new file mode 100644 index 0000000000000..7de8c361df73e --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationParameter.java @@ -0,0 +1,39 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation; + +/** + * Allocation Constraint Parameters + */ +public class AllocationParameter { + private final float primaryBalanceShardBuffer; + + private final float primaryBalanceIndexBuffer; + + private final float shardBalanceBuffer; + + public AllocationParameter(float shardBalanceBuffer, float primaryBalanceIndexBuffer, float primaryBalanceShardBuffer) { + this.shardBalanceBuffer = shardBalanceBuffer; + this.primaryBalanceShardBuffer = primaryBalanceShardBuffer; + this.primaryBalanceIndexBuffer = primaryBalanceIndexBuffer; + } + + public float getPrimaryBalanceShardBuffer() { + return primaryBalanceShardBuffer; + } + + public float getPrimaryBalanceIndexBuffer() { + return primaryBalanceIndexBuffer; + } + + public float getShardBalanceBuffer() { + return shardBalanceBuffer; + } + +} diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/Constraint.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/Constraint.java index e9c3c0afcbe88..378a4eebc49c6 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/Constraint.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/Constraint.java @@ -40,6 +40,10 @@ public void setEnable(boolean enable) { this.enable = enable; } + boolean isEnable() { + return enable; + } + static class ConstraintParams { private ShardsBalancer balancer; private BalancedShardsAllocator.ModelNode node; diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java index 08fe8f92d1f80..6aac694fc19ab 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java @@ -19,22 +19,22 @@ public class ConstraintTypes { public final static long CONSTRAINT_WEIGHT = 1000000L; /** - * Defines per index constraint which is breached when a node contains more than avg number of primary shards for an index + * Defines per index constraint which is breached when a node contains more than avg number of primary shards for an index with added buffer */ public final static String INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID = "index.primary.shard.balance.constraint"; /** - * Defines a cluster constraint which is breached when a node contains more than avg primary shards across all indices + * Defines a cluster constraint which is breached when a node contains more than avg primary shards across all indices with added buffer */ public final static String CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID = "cluster.primary.shard.balance.constraint"; /** - * Defines a cluster constraint which is breached when a node contains more than avg primary shards across all indices + * Defines a cluster constraint which is breached when a node contains more than avg primary shards across all indices with added buffer */ public final static String CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID = "cluster.primary.shard.rebalance.constraint"; /** - * Defines an index constraint which is breached when a node contains more than avg number of shards for an index + * Defines an index constraint which is breached when a node contains more than avg number of shards for an index with added buffer */ public final static String INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID = "index.shard.count.constraint"; @@ -50,27 +50,30 @@ public class ConstraintTypes { * on one node, often resulting in a hotspot on that node. *

* This constraint is breached when balancer attempts to allocate more than - * average shards per index per node. + * average shards per index per node with added buffer. */ - public static Predicate isIndexShardsPerNodeBreached() { + public static Predicate isIndexShardsPerNodeBreached(float buffer) { return (params) -> { int currIndexShardsOnNode = params.getNode().numShards(params.getIndex()); - int allowedIndexShardsPerNode = (int) Math.ceil(params.getBalancer().avgShardsPerNode(params.getIndex())); + int allowedIndexShardsPerNode = (int) Math.ceil(params.getBalancer().avgShardsPerNode(params.getIndex()) * (1 + buffer)); return (currIndexShardsOnNode >= allowedIndexShardsPerNode); }; } /** * Defines a predicate which returns true when specific to an index, a node contains more than average number of primary - * shards. This constraint is used in weight calculation during allocation and rebalancing. When breached a high weight + * shards with added buffer. This constraint is used in weight calculation during allocation and rebalancing. When breached a high weight * {@link ConstraintTypes#CONSTRAINT_WEIGHT} is assigned to node resulting in lesser chances of node being selected * as allocation or rebalancing target */ - public static Predicate isPerIndexPrimaryShardsPerNodeBreached() { + public static Predicate isPerIndexPrimaryShardsPerNodeBreached(float buffer) { return (params) -> { int perIndexPrimaryShardCount = params.getNode().numPrimaryShards(params.getIndex()); - int perIndexAllowedPrimaryShardCount = (int) Math.ceil(params.getBalancer().avgPrimaryShardsPerNode(params.getIndex())); - return perIndexPrimaryShardCount > perIndexAllowedPrimaryShardCount; + int perIndexAllowedPrimaryShardCount = (int) Math.ceil( + params.getBalancer().avgPrimaryShardsPerNode(params.getIndex()) * (1 + buffer) + ); + + return perIndexPrimaryShardCount >= perIndexAllowedPrimaryShardCount; }; } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceConstraints.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceConstraints.java index 2c2138af18abc..6d703cca2a663 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceConstraints.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceConstraints.java @@ -31,7 +31,7 @@ public class RebalanceConstraints { public RebalanceConstraints(RebalanceParameter rebalanceParameter) { this.constraints = new HashMap<>(); - this.constraints.put(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPerIndexPrimaryShardsPerNodeBreached())); + this.constraints.put(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPerIndexPrimaryShardsPerNodeBreached(0.0f))); this.constraints.put( CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID, new Constraint(isPrimaryShardsPerNodeBreached(rebalanceParameter.getPreferPrimaryBalanceBuffer())) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index b2443490dd973..de650c736c479 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -43,6 +43,7 @@ import org.opensearch.cluster.routing.UnassignedInfo.AllocationStatus; import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.opensearch.cluster.routing.allocation.AllocationConstraints; +import org.opensearch.cluster.routing.allocation.AllocationParameter; import org.opensearch.cluster.routing.allocation.ConstraintTypes; import org.opensearch.cluster.routing.allocation.MoveDecision; import org.opensearch.cluster.routing.allocation.RebalanceConstraints; @@ -62,7 +63,6 @@ import java.util.Set; import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID; -import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID; import static org.opensearch.cluster.routing.allocation.ConstraintTypes.INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID; import static org.opensearch.cluster.routing.allocation.ConstraintTypes.INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID; @@ -135,7 +135,7 @@ public class BalancedShardsAllocator implements ShardsAllocator { ); /** - * This setting governs whether primary shards balance is desired during allocation. This is used by {@link ConstraintTypes#isPerIndexPrimaryShardsPerNodeBreached()} + * This setting governs whether primary shards balance is desired during allocation. This is used by {@link ConstraintTypes#isPerIndexPrimaryShardsPerNodeBreached(float)} * and {@link ConstraintTypes#isPrimaryShardsPerNodeBreached} which is used during unassigned shard allocation * {@link LocalShardsBalancer#allocateUnassigned()} and shard re-balance/relocation to a different node via {@link LocalShardsBalancer#balance()} . */ @@ -147,6 +147,13 @@ public class BalancedShardsAllocator implements ShardsAllocator { Property.NodeScope ); + public static final Setting SHARDS_PER_INDEX_BALANCE = Setting.boolSetting( + "cluster.routing.allocation.balance.constraint.index.enable", + true, + Property.Dynamic, + Property.NodeScope + ); + public static final Setting PREFER_PRIMARY_SHARD_REBALANCE = Setting.boolSetting( "cluster.routing.allocation.rebalance.primary.enable", false, @@ -154,6 +161,30 @@ public class BalancedShardsAllocator implements ShardsAllocator { Property.NodeScope ); + public static final Setting PRIMARY_SHARD_BALANCE_BUFFER = Setting.floatSetting( + "cluster.routing.allocation.balance.primary.shard.buffer", + 0.0f, + 0.0f, + Property.Dynamic, + Property.NodeScope + ); + + public static final Setting PRIMARY_INDEX_BALANCE_BUFFER = Setting.floatSetting( + "cluster.routing.allocation.balance.primary.index.buffer", + 0.0f, + 0.0f, + Property.Dynamic, + Property.NodeScope + ); + + public static final Setting SHARDS_PER_INDEX_BALANCE_BUFFER = Setting.floatSetting( + "cluster.routing.allocation.balance.constraint.index.buffer", + 0.0f, + 0.0f, + Property.Dynamic, + Property.NodeScope + ); + public static final Setting PRIMARY_SHARD_REBALANCE_BUFFER = Setting.floatSetting( "cluster.routing.allocation.rebalance.primary.buffer", 0.10f, @@ -166,7 +197,11 @@ public class BalancedShardsAllocator implements ShardsAllocator { private volatile ShardMovementStrategy shardMovementStrategy; private volatile boolean preferPrimaryShardBalance; + private volatile boolean shardsPerIndexBalance; private volatile boolean preferPrimaryShardRebalance; + private volatile float primaryBalanceShardBuffer; + private volatile float primaryBalanceIndexBuffer; + private volatile float shardsPerIndexBalanceBuffer; private volatile float preferPrimaryShardRebalanceBuffer; private volatile float indexBalanceFactor; private volatile float shardBalanceFactor; @@ -179,19 +214,28 @@ public BalancedShardsAllocator(Settings settings) { @Inject public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSettings) { + setThreshold(THRESHOLD_SETTING.get(settings)); + setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings)); setShardBalanceFactor(SHARD_BALANCE_FACTOR_SETTING.get(settings)); setIndexBalanceFactor(INDEX_BALANCE_FACTOR_SETTING.get(settings)); + setPrimaryBalanceShardBuffer(PRIMARY_SHARD_BALANCE_BUFFER.get(settings)); + setPrimaryBalanceIndexBuffer(PRIMARY_INDEX_BALANCE_BUFFER.get(settings)); + setShardsPerIndexBalanceBuffer(SHARDS_PER_INDEX_BALANCE_BUFFER.get(settings)); setPreferPrimaryShardRebalanceBuffer(PRIMARY_SHARD_REBALANCE_BUFFER.get(settings)); - updateWeightFunction(); - setThreshold(THRESHOLD_SETTING.get(settings)); setPreferPrimaryShardBalance(PREFER_PRIMARY_SHARD_BALANCE.get(settings)); setPreferPrimaryShardRebalance(PREFER_PRIMARY_SHARD_REBALANCE.get(settings)); - setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings)); + setShardsPerIndexBalance(SHARDS_PER_INDEX_BALANCE.get(settings)); + updateWeightFunction(); + clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE, this::setPreferPrimaryShardBalance); + clusterSettings.addSettingsUpdateConsumer(SHARDS_PER_INDEX_BALANCE, this::setShardsPerIndexBalance); clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst); clusterSettings.addSettingsUpdateConsumer(SHARD_MOVEMENT_STRATEGY_SETTING, this::setShardMovementStrategy); clusterSettings.addSettingsUpdateConsumer(INDEX_BALANCE_FACTOR_SETTING, this::updateIndexBalanceFactor); clusterSettings.addSettingsUpdateConsumer(SHARD_BALANCE_FACTOR_SETTING, this::updateShardBalanceFactor); + clusterSettings.addSettingsUpdateConsumer(PRIMARY_SHARD_BALANCE_BUFFER, this::setPrimaryBalanceShardBuffer); + clusterSettings.addSettingsUpdateConsumer(PRIMARY_INDEX_BALANCE_BUFFER, this::setPrimaryBalanceIndexBuffer); + clusterSettings.addSettingsUpdateConsumer(SHARDS_PER_INDEX_BALANCE_BUFFER, this::setShardsPerIndexBalanceBuffer); clusterSettings.addSettingsUpdateConsumer(PRIMARY_SHARD_REBALANCE_BUFFER, this::updatePreferPrimaryShardBalanceBuffer); clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_REBALANCE, this::setPreferPrimaryShardRebalance); clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold); @@ -228,6 +272,7 @@ private void setShardBalanceFactor(float shardBalanceFactor) { private void setPreferPrimaryShardRebalanceBuffer(float preferPrimaryShardRebalanceBuffer) { this.preferPrimaryShardRebalanceBuffer = preferPrimaryShardRebalanceBuffer; + updateWeightFunction(); } private void updateIndexBalanceFactor(float indexBalanceFactor) { @@ -246,7 +291,18 @@ private void updatePreferPrimaryShardBalanceBuffer(float preferPrimaryShardBalan } private void updateWeightFunction() { - weightFunction = new WeightFunction(this.indexBalanceFactor, this.shardBalanceFactor, this.preferPrimaryShardRebalanceBuffer); + weightFunction = new WeightFunction( + this.indexBalanceFactor, + this.shardBalanceFactor, + this.shardsPerIndexBalanceBuffer, + this.primaryBalanceIndexBuffer, + this.primaryBalanceShardBuffer, + this.preferPrimaryShardRebalanceBuffer + ); + this.weightFunction.updateAllocationConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance); + this.weightFunction.updateAllocationConstraint(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance); + this.weightFunction.updateAllocationConstraint(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, shardsPerIndexBalance); + this.weightFunction.updateRebalanceConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance); } /** @@ -255,14 +311,32 @@ private void updateWeightFunction() { */ private void setPreferPrimaryShardBalance(boolean preferPrimaryShardBalance) { this.preferPrimaryShardBalance = preferPrimaryShardBalance; - this.weightFunction.updateAllocationConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance); - this.weightFunction.updateAllocationConstraint(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance); - this.weightFunction.updateRebalanceConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance); + updateWeightFunction(); + } + + private void setShardsPerIndexBalance(boolean shardsPerIndexBalance) { + this.shardsPerIndexBalance = shardsPerIndexBalance; + updateWeightFunction(); + } + + private void setPrimaryBalanceShardBuffer(float primaryBalanceShardBuffer) { + this.primaryBalanceShardBuffer = primaryBalanceShardBuffer; + updateWeightFunction(); + } + + private void setPrimaryBalanceIndexBuffer(float primaryBalanceIndexBuffer) { + this.primaryBalanceIndexBuffer = primaryBalanceIndexBuffer; + updateWeightFunction(); + } + + private void setShardsPerIndexBalanceBuffer(float shardsPerIndexBalanceBuffer) { + this.shardsPerIndexBalanceBuffer = shardsPerIndexBalanceBuffer; + updateWeightFunction(); } private void setPreferPrimaryShardRebalance(boolean preferPrimaryShardRebalance) { this.preferPrimaryShardRebalance = preferPrimaryShardRebalance; - this.weightFunction.updateRebalanceConstraint(CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID, preferPrimaryShardRebalance); + updateWeightFunction(); } private void setThreshold(float threshold) { @@ -375,6 +449,10 @@ public boolean getPreferPrimaryBalance() { return preferPrimaryShardBalance; } + public AllocationConstraints getAllocationParam() { + return weightFunction.constraints; + } + /** * This class is the primary weight function used to create balanced over nodes and shards in the cluster. * Currently this function has 3 properties: @@ -410,20 +488,34 @@ static class WeightFunction { private AllocationConstraints constraints; private RebalanceConstraints rebalanceConstraints; - WeightFunction(float indexBalance, float shardBalance, float preferPrimaryBalanceBuffer) { + WeightFunction( + float indexBalance, + float shardBalance, + float shardsPerIndexBalanceBuffer, + float primaryBalanceIndexBuffer, + float primaryBalanceShardBuffer, + float preferPrimaryBalanceBuffer + ) { float sum = indexBalance + shardBalance; - if (sum <= 0.0f) { - throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum); + if (sum > 0.0f) { + theta0 = shardBalance / sum; + theta1 = indexBalance / sum; + } else if (sum == 0.0f) { + theta0 = 0; + theta1 = 0; + } else { + throw new IllegalArgumentException("Balance factors must sum to a value >= 0 but was: " + sum); } - theta0 = shardBalance / sum; - theta1 = indexBalance / sum; this.indexBalance = indexBalance; this.shardBalance = shardBalance; + AllocationParameter allocationParameter = new AllocationParameter( + shardsPerIndexBalanceBuffer, + primaryBalanceIndexBuffer, + primaryBalanceShardBuffer + ); RebalanceParameter rebalanceParameter = new RebalanceParameter(preferPrimaryBalanceBuffer); - this.constraints = new AllocationConstraints(); + this.constraints = new AllocationConstraints(allocationParameter); this.rebalanceConstraints = new RebalanceConstraints(rebalanceParameter); - // Enable index shard per node breach constraint - updateAllocationConstraint(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, true); } public float weightWithAllocationConstraints(ShardsBalancer balancer, ModelNode node, String index) { diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 233a8d732d178..6b3d0b5fe3375 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -263,6 +263,10 @@ public void apply(Settings value, Settings current, Settings previous) { BalancedShardsAllocator.PRIMARY_SHARD_REBALANCE_BUFFER, BalancedShardsAllocator.PREFER_PRIMARY_SHARD_BALANCE, BalancedShardsAllocator.PREFER_PRIMARY_SHARD_REBALANCE, + BalancedShardsAllocator.PRIMARY_INDEX_BALANCE_BUFFER, + BalancedShardsAllocator.PRIMARY_SHARD_BALANCE_BUFFER, + BalancedShardsAllocator.SHARDS_PER_INDEX_BALANCE, + BalancedShardsAllocator.SHARDS_PER_INDEX_BALANCE_BUFFER, BalancedShardsAllocator.SHARD_MOVE_PRIMARY_FIRST_SETTING, BalancedShardsAllocator.SHARD_MOVEMENT_STRATEGY_SETTING, BalancedShardsAllocator.THRESHOLD_SETTING, diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationConstraintsTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationConstraintsTests.java index 90546620e9e3e..7bafaf6d0b2d5 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationConstraintsTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationConstraintsTests.java @@ -34,10 +34,12 @@ public void testSettings() { float indexBalanceFactor = randomFloat(); float shardBalance = randomFloat(); float threshold = randomFloat(); + boolean shardsPerIndexBalance = randomBoolean(); settings.put(BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING.getKey(), indexBalanceFactor); settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), shardBalance); settings.put(BalancedShardsAllocator.THRESHOLD_SETTING.getKey(), threshold); settings.put(BalancedShardsAllocator.PREFER_PRIMARY_SHARD_BALANCE.getKey(), true); + settings.put(BalancedShardsAllocator.SHARDS_PER_INDEX_BALANCE.getKey(), shardsPerIndexBalance); service.applySettings(settings.build()); @@ -45,6 +47,10 @@ public void testSettings() { assertEquals(shardBalance, allocator.getShardBalance(), 0.01); assertEquals(threshold, allocator.getThreshold(), 0.01); assertEquals(true, allocator.getPreferPrimaryBalance()); + assertEquals( + shardsPerIndexBalance, + allocator.getAllocationParam().getAllocationConstraint(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID).isEnable() + ); settings.put(BalancedShardsAllocator.PREFER_PRIMARY_SHARD_BALANCE.getKey(), false); service.applySettings(settings.build()); @@ -58,7 +64,9 @@ public void testSettings() { public void testIndexShardsPerNodeConstraint() { ShardsBalancer balancer = mock(LocalShardsBalancer.class); BalancedShardsAllocator.ModelNode node = mock(BalancedShardsAllocator.ModelNode.class); - AllocationConstraints constraints = new AllocationConstraints(); + float buffer = randomFloat(); + AllocationParameter allocationParameter = new AllocationParameter(buffer, 0, 0); + AllocationConstraints constraints = new AllocationConstraints(allocationParameter); constraints.updateAllocationConstraint(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, true); int shardCount = randomIntBetween(1, 500); @@ -68,9 +76,8 @@ public void testIndexShardsPerNodeConstraint() { when(node.numShards(anyString())).thenReturn(shardCount); when(node.getNodeId()).thenReturn("test-node"); - long expectedWeight = (shardCount >= avgShardsPerNode) ? CONSTRAINT_WEIGHT : 0; + long expectedWeight = (shardCount >= (1 + buffer) * avgShardsPerNode) ? CONSTRAINT_WEIGHT : 0; assertEquals(expectedWeight, constraints.weight(balancer, node, "index")); - } /** @@ -80,8 +87,11 @@ public void testIndexShardsPerNodeConstraint() { public void testPerIndexPrimaryShardsConstraint() { ShardsBalancer balancer = mock(LocalShardsBalancer.class); BalancedShardsAllocator.ModelNode node = mock(BalancedShardsAllocator.ModelNode.class); - AllocationConstraints constraints = new AllocationConstraints(); + float buffer = .6f; + AllocationParameter allocationParameter = new AllocationParameter(0, buffer, 0); + AllocationConstraints constraints = new AllocationConstraints(allocationParameter); constraints.updateAllocationConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, true); + constraints.updateAllocationConstraint(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, false); final String indexName = "test-index"; int perIndexPrimaryShardCount = 1; @@ -95,6 +105,19 @@ public void testPerIndexPrimaryShardsConstraint() { perIndexPrimaryShardCount = 3; when(node.numPrimaryShards(anyString())).thenReturn(perIndexPrimaryShardCount); + assertEquals(0, constraints.weight(balancer, node, indexName)); + + perIndexPrimaryShardCount = 4; + when(node.numPrimaryShards(anyString())).thenReturn(perIndexPrimaryShardCount); + assertEquals(CONSTRAINT_WEIGHT, constraints.weight(balancer, node, indexName)); + + perIndexPrimaryShardCount = 3; + when(node.numPrimaryShards(anyString())).thenReturn(perIndexPrimaryShardCount); + buffer = .0f; + allocationParameter = new AllocationParameter(0, buffer, 0); + constraints = new AllocationConstraints(allocationParameter); + constraints.updateAllocationConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, true); + constraints.updateAllocationConstraint(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, false); assertEquals(CONSTRAINT_WEIGHT, constraints.weight(balancer, node, indexName)); constraints.updateAllocationConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, false); @@ -107,7 +130,8 @@ public void testPerIndexPrimaryShardsConstraint() { public void testGlobalPrimaryShardsConstraint() { ShardsBalancer balancer = mock(LocalShardsBalancer.class); BalancedShardsAllocator.ModelNode node = mock(BalancedShardsAllocator.ModelNode.class); - AllocationConstraints constraints = new AllocationConstraints(); + AllocationParameter allocationParameter = new AllocationParameter(0, 0, 0); + AllocationConstraints constraints = new AllocationConstraints(allocationParameter); constraints.updateAllocationConstraint(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, true); final String indexName = "test-index"; @@ -124,8 +148,17 @@ public void testGlobalPrimaryShardsConstraint() { when(node.numPrimaryShards()).thenReturn(primaryShardCount); assertEquals(CONSTRAINT_WEIGHT, constraints.weight(balancer, node, indexName)); - constraints.updateAllocationConstraint(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, false); + // With buffer - Weight of 0 expected + float buffer = .6f; + allocationParameter = new AllocationParameter(0, 0, buffer); + constraints = new AllocationConstraints(allocationParameter); + constraints.updateAllocationConstraint(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, true); assertEquals(0, constraints.weight(balancer, node, indexName)); + + // With buffer - Weight of CONSTRAINT_WEIGHT + primaryShardCount = 5; + when(node.numPrimaryShards()).thenReturn(primaryShardCount); + assertEquals(CONSTRAINT_WEIGHT, constraints.weight(balancer, node, indexName)); } /** @@ -134,7 +167,8 @@ public void testGlobalPrimaryShardsConstraint() { public void testPrimaryShardsConstraints() { ShardsBalancer balancer = mock(LocalShardsBalancer.class); BalancedShardsAllocator.ModelNode node = mock(BalancedShardsAllocator.ModelNode.class); - AllocationConstraints constraints = new AllocationConstraints(); + AllocationParameter allocationParameter = new AllocationParameter(0, 0, 0); + AllocationConstraints constraints = new AllocationConstraints(allocationParameter); constraints.updateAllocationConstraint(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, true); constraints.updateAllocationConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, true); @@ -175,7 +209,8 @@ public void testPrimaryShardsConstraints() { public void testAllConstraints() { ShardsBalancer balancer = mock(LocalShardsBalancer.class); BalancedShardsAllocator.ModelNode node = mock(BalancedShardsAllocator.ModelNode.class); - AllocationConstraints constraints = new AllocationConstraints(); + AllocationParameter allocationParameter = new AllocationParameter(0, 0, 0); + AllocationConstraints constraints = new AllocationConstraints(allocationParameter); constraints.updateAllocationConstraint(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, true); constraints.updateAllocationConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, true); constraints.updateAllocationConstraint(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, true);