From 950b9ab1a8751493b756ee6fb4418c4dc2c52b9b Mon Sep 17 00:00:00 2001 From: Gaurav Bafna Date: Mon, 24 Jun 2024 14:04:39 +0530 Subject: [PATCH 1/4] Add buffer for allocation constraints and a switch to disable index balance constraint Signed-off-by: Gaurav Bafna --- .../allocation/AllocationConstraints.java | 21 +++- .../allocation/AllocationParameter.java | 36 ++++++ .../routing/allocation/Constraint.java | 4 + .../routing/allocation/ConstraintTypes.java | 13 +- .../allocation/RebalanceConstraints.java | 2 +- .../allocator/BalancedShardsAllocator.java | 111 ++++++++++++++++-- .../common/settings/ClusterSettings.java | 4 + .../AllocationConstraintsTests.java | 51 ++++++-- 8 files changed, 215 insertions(+), 27 deletions(-) create mode 100644 server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationParameter.java diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java index 6702db4b43e91..92b2aecfecbe6 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java @@ -28,11 +28,20 @@ public class AllocationConstraints { private Map constraints; - public AllocationConstraints() { + public AllocationConstraints(AllocationParameter allocationParameter) { this.constraints = new HashMap<>(); - this.constraints.put(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, new Constraint(isIndexShardsPerNodeBreached())); - this.constraints.put(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPerIndexPrimaryShardsPerNodeBreached())); - this.constraints.put(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPrimaryShardsPerNodeBreached(0.0f))); + this.constraints.put( + INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, + new Constraint(isIndexShardsPerNodeBreached(allocationParameter.getShardBalanceBuffer())) + ); + this.constraints.put( + INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, + new Constraint(isPerIndexPrimaryShardsPerNodeBreached(allocationParameter.getPreferPrimaryBalanceIndexBuffer())) + ); + this.constraints.put( + CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, + new Constraint(isPrimaryShardsPerNodeBreached(allocationParameter.getPreferPrimaryBalanceShardBuffer())) + ); } public void updateAllocationConstraint(String constraint, boolean enable) { @@ -43,4 +52,8 @@ public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode no Constraint.ConstraintParams params = new Constraint.ConstraintParams(balancer, node, index); return params.weight(constraints); } + + public Constraint getAllocationConstraint(String name) { + return constraints.get(name); + } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationParameter.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationParameter.java new file mode 100644 index 0000000000000..30b829df072c5 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationParameter.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation; + +public class AllocationParameter { + private float preferPrimaryBalanceShardBuffer; + + private float preferPrimaryBalanceIndexBuffer; + + private float shardBalanceBuffer; + + public AllocationParameter(float preferPrimaryBalanceShardBuffer, float preferPrimaryBalanceIndexBuffer, float shardBalanceBuffer) { + this.preferPrimaryBalanceShardBuffer = preferPrimaryBalanceShardBuffer; + this.preferPrimaryBalanceIndexBuffer = preferPrimaryBalanceIndexBuffer; + this.shardBalanceBuffer = shardBalanceBuffer; + } + + public float getPreferPrimaryBalanceShardBuffer() { + return preferPrimaryBalanceShardBuffer; + } + + public float getPreferPrimaryBalanceIndexBuffer() { + return preferPrimaryBalanceIndexBuffer; + } + + public float getShardBalanceBuffer() { + return shardBalanceBuffer; + } + +} diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/Constraint.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/Constraint.java index e9c3c0afcbe88..8ac2fdd779f7c 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/Constraint.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/Constraint.java @@ -40,6 +40,10 @@ public void setEnable(boolean enable) { this.enable = enable; } + public boolean isEnable() { + return enable; + } + static class ConstraintParams { private ShardsBalancer balancer; private BalancedShardsAllocator.ModelNode node; diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java index 08fe8f92d1f80..2a6299e5c043f 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java @@ -52,10 +52,10 @@ public class ConstraintTypes { * This constraint is breached when balancer attempts to allocate more than * average shards per index per node. */ - public static Predicate isIndexShardsPerNodeBreached() { + public static Predicate isIndexShardsPerNodeBreached(float buffer) { return (params) -> { int currIndexShardsOnNode = params.getNode().numShards(params.getIndex()); - int allowedIndexShardsPerNode = (int) Math.ceil(params.getBalancer().avgShardsPerNode(params.getIndex())); + int allowedIndexShardsPerNode = (int) Math.ceil(params.getBalancer().avgShardsPerNode(params.getIndex()) * (1 + buffer)); return (currIndexShardsOnNode >= allowedIndexShardsPerNode); }; } @@ -66,11 +66,14 @@ public static Predicate isIndexShardsPerNodeBreache * {@link ConstraintTypes#CONSTRAINT_WEIGHT} is assigned to node resulting in lesser chances of node being selected * as allocation or rebalancing target */ - public static Predicate isPerIndexPrimaryShardsPerNodeBreached() { + public static Predicate isPerIndexPrimaryShardsPerNodeBreached(float buffer) { return (params) -> { int perIndexPrimaryShardCount = params.getNode().numPrimaryShards(params.getIndex()); - int perIndexAllowedPrimaryShardCount = (int) Math.ceil(params.getBalancer().avgPrimaryShardsPerNode(params.getIndex())); - return perIndexPrimaryShardCount > perIndexAllowedPrimaryShardCount; + int perIndexAllowedPrimaryShardCount = (int) Math.ceil( + params.getBalancer().avgPrimaryShardsPerNode(params.getIndex()) * (1 + buffer) + ); + + return perIndexPrimaryShardCount >= perIndexAllowedPrimaryShardCount; }; } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceConstraints.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceConstraints.java index 2c2138af18abc..6d703cca2a663 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceConstraints.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceConstraints.java @@ -31,7 +31,7 @@ public class RebalanceConstraints { public RebalanceConstraints(RebalanceParameter rebalanceParameter) { this.constraints = new HashMap<>(); - this.constraints.put(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPerIndexPrimaryShardsPerNodeBreached())); + this.constraints.put(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPerIndexPrimaryShardsPerNodeBreached(0.0f))); this.constraints.put( CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID, new Constraint(isPrimaryShardsPerNodeBreached(rebalanceParameter.getPreferPrimaryBalanceBuffer())) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index b2443490dd973..4103fd05ab398 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -43,6 +43,7 @@ import org.opensearch.cluster.routing.UnassignedInfo.AllocationStatus; import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.opensearch.cluster.routing.allocation.AllocationConstraints; +import org.opensearch.cluster.routing.allocation.AllocationParameter; import org.opensearch.cluster.routing.allocation.ConstraintTypes; import org.opensearch.cluster.routing.allocation.MoveDecision; import org.opensearch.cluster.routing.allocation.RebalanceConstraints; @@ -135,7 +136,7 @@ public class BalancedShardsAllocator implements ShardsAllocator { ); /** - * This setting governs whether primary shards balance is desired during allocation. This is used by {@link ConstraintTypes#isPerIndexPrimaryShardsPerNodeBreached()} + * This setting governs whether primary shards balance is desired during allocation. This is used by * and {@link ConstraintTypes#isPrimaryShardsPerNodeBreached} which is used during unassigned shard allocation * {@link LocalShardsBalancer#allocateUnassigned()} and shard re-balance/relocation to a different node via {@link LocalShardsBalancer#balance()} . */ @@ -147,6 +148,13 @@ public class BalancedShardsAllocator implements ShardsAllocator { Property.NodeScope ); + public static final Setting SHARDS_PER_INDEX_BALANCE = Setting.boolSetting( + "cluster.routing.allocation.balance.constraint.shard.enable", + true, + Property.Dynamic, + Property.NodeScope + ); + public static final Setting PREFER_PRIMARY_SHARD_REBALANCE = Setting.boolSetting( "cluster.routing.allocation.rebalance.primary.enable", false, @@ -154,6 +162,30 @@ public class BalancedShardsAllocator implements ShardsAllocator { Property.NodeScope ); + public static final Setting PREFER_PRIMARY_SHARD_BALANCE_BUFFER = Setting.floatSetting( + "cluster.routing.allocation.balance.primary.shard.buffer", + 0.0f, + 0.0f, + Property.Dynamic, + Property.NodeScope + ); + + public static final Setting PREFER_PRIMARY_INDEX_BALANCE_BUFFER = Setting.floatSetting( + "cluster.routing.allocation.balance.primary.index.buffer", + 0.0f, + 0.0f, + Property.Dynamic, + Property.NodeScope + ); + + public static final Setting SHARDS_PER_INDEX_BALANCE_BUFFER = Setting.floatSetting( + "cluster.routing.allocation.balance.constraint.shard.buffer", + 0.0f, + 0.0f, + Property.Dynamic, + Property.NodeScope + ); + public static final Setting PRIMARY_SHARD_REBALANCE_BUFFER = Setting.floatSetting( "cluster.routing.allocation.rebalance.primary.buffer", 0.10f, @@ -166,7 +198,15 @@ public class BalancedShardsAllocator implements ShardsAllocator { private volatile ShardMovementStrategy shardMovementStrategy; private volatile boolean preferPrimaryShardBalance; + + private volatile boolean shardsPerIndexBalance; private volatile boolean preferPrimaryShardRebalance; + + private volatile float preferPrimaryBalanceShardBuffer; + + private volatile float preferPrimaryBalanceIndexBuffer; + + private volatile float shardsPerIndexBalanceBuffer; private volatile float preferPrimaryShardRebalanceBuffer; private volatile float indexBalanceFactor; private volatile float shardBalanceFactor; @@ -181,17 +221,25 @@ public BalancedShardsAllocator(Settings settings) { public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSettings) { setShardBalanceFactor(SHARD_BALANCE_FACTOR_SETTING.get(settings)); setIndexBalanceFactor(INDEX_BALANCE_FACTOR_SETTING.get(settings)); + setPreferPrimaryBalanceShardBuffer(PREFER_PRIMARY_SHARD_BALANCE_BUFFER.get(settings)); + setPreferPrimaryBalanceIndexBuffer(PREFER_PRIMARY_INDEX_BALANCE_BUFFER.get(settings)); + setShardsPerIndexBalanceBuffer(SHARDS_PER_INDEX_BALANCE_BUFFER.get(settings)); setPreferPrimaryShardRebalanceBuffer(PRIMARY_SHARD_REBALANCE_BUFFER.get(settings)); updateWeightFunction(); setThreshold(THRESHOLD_SETTING.get(settings)); setPreferPrimaryShardBalance(PREFER_PRIMARY_SHARD_BALANCE.get(settings)); setPreferPrimaryShardRebalance(PREFER_PRIMARY_SHARD_REBALANCE.get(settings)); + setShardsPerIndexBalance(SHARDS_PER_INDEX_BALANCE.get(settings)); setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings)); clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE, this::setPreferPrimaryShardBalance); + clusterSettings.addSettingsUpdateConsumer(SHARDS_PER_INDEX_BALANCE, this::setShardsPerIndexBalance); clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst); clusterSettings.addSettingsUpdateConsumer(SHARD_MOVEMENT_STRATEGY_SETTING, this::setShardMovementStrategy); clusterSettings.addSettingsUpdateConsumer(INDEX_BALANCE_FACTOR_SETTING, this::updateIndexBalanceFactor); clusterSettings.addSettingsUpdateConsumer(SHARD_BALANCE_FACTOR_SETTING, this::updateShardBalanceFactor); + clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE_BUFFER, this::setPreferPrimaryBalanceShardBuffer); + clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_INDEX_BALANCE_BUFFER, this::setPreferPrimaryBalanceIndexBuffer); + clusterSettings.addSettingsUpdateConsumer(SHARDS_PER_INDEX_BALANCE_BUFFER, this::setShardsPerIndexBalanceBuffer); clusterSettings.addSettingsUpdateConsumer(PRIMARY_SHARD_REBALANCE_BUFFER, this::updatePreferPrimaryShardBalanceBuffer); clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_REBALANCE, this::setPreferPrimaryShardRebalance); clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold); @@ -246,7 +294,14 @@ private void updatePreferPrimaryShardBalanceBuffer(float preferPrimaryShardBalan } private void updateWeightFunction() { - weightFunction = new WeightFunction(this.indexBalanceFactor, this.shardBalanceFactor, this.preferPrimaryShardRebalanceBuffer); + weightFunction = new WeightFunction( + this.indexBalanceFactor, + this.shardBalanceFactor, + this.preferPrimaryBalanceShardBuffer, + this.preferPrimaryBalanceIndexBuffer, + this.shardsPerIndexBalanceBuffer, + this.preferPrimaryShardRebalanceBuffer + ); } /** @@ -260,6 +315,26 @@ private void setPreferPrimaryShardBalance(boolean preferPrimaryShardBalance) { this.weightFunction.updateRebalanceConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance); } + private void setShardsPerIndexBalance(boolean shardsPerIndexBalance) { + this.shardsPerIndexBalance = shardsPerIndexBalance; + this.weightFunction.updateAllocationConstraint(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, shardsPerIndexBalance); + } + + private void setPreferPrimaryBalanceShardBuffer(float preferPrimaryBalanceShardBuffer) { + this.preferPrimaryBalanceShardBuffer = preferPrimaryBalanceShardBuffer; + updateWeightFunction(); + } + + private void setPreferPrimaryBalanceIndexBuffer(float preferPrimaryBalanceIndexBuffer) { + this.preferPrimaryBalanceIndexBuffer = preferPrimaryBalanceIndexBuffer; + updateWeightFunction(); + } + + private void setShardsPerIndexBalanceBuffer(float shardsPerIndexBalanceBuffer) { + this.shardsPerIndexBalanceBuffer = shardsPerIndexBalanceBuffer; + updateWeightFunction(); + } + private void setPreferPrimaryShardRebalance(boolean preferPrimaryShardRebalance) { this.preferPrimaryShardRebalance = preferPrimaryShardRebalance; this.weightFunction.updateRebalanceConstraint(CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID, preferPrimaryShardRebalance); @@ -375,6 +450,10 @@ public boolean getPreferPrimaryBalance() { return preferPrimaryShardBalance; } + public AllocationConstraints getAllocationParam() { + return weightFunction.constraints; + } + /** * This class is the primary weight function used to create balanced over nodes and shards in the cluster. * Currently this function has 3 properties: @@ -410,20 +489,34 @@ static class WeightFunction { private AllocationConstraints constraints; private RebalanceConstraints rebalanceConstraints; - WeightFunction(float indexBalance, float shardBalance, float preferPrimaryBalanceBuffer) { + WeightFunction( + float indexBalance, + float shardBalance, + float preferPrimaryBalanceShardBuffer, + float preferPrimaryBalanceIndexBuffer, + float shardsPerIndexBalanceBuffer, + float preferPrimaryBalanceBuffer + ) { float sum = indexBalance + shardBalance; - if (sum <= 0.0f) { + if (sum > 0.0f) { + theta0 = shardBalance / sum; + theta1 = indexBalance / sum; + } else if (sum == 0.0f) { + theta0 = 0; + theta1 = 0; + } else { throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum); } - theta0 = shardBalance / sum; - theta1 = indexBalance / sum; this.indexBalance = indexBalance; this.shardBalance = shardBalance; + AllocationParameter allocationParameter = new AllocationParameter( + preferPrimaryBalanceShardBuffer, + preferPrimaryBalanceIndexBuffer, + shardsPerIndexBalanceBuffer + ); RebalanceParameter rebalanceParameter = new RebalanceParameter(preferPrimaryBalanceBuffer); - this.constraints = new AllocationConstraints(); + this.constraints = new AllocationConstraints(allocationParameter); this.rebalanceConstraints = new RebalanceConstraints(rebalanceParameter); - // Enable index shard per node breach constraint - updateAllocationConstraint(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, true); } public float weightWithAllocationConstraints(ShardsBalancer balancer, ModelNode node, String index) { diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 233a8d732d178..3e26e69c0c020 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -263,6 +263,10 @@ public void apply(Settings value, Settings current, Settings previous) { BalancedShardsAllocator.PRIMARY_SHARD_REBALANCE_BUFFER, BalancedShardsAllocator.PREFER_PRIMARY_SHARD_BALANCE, BalancedShardsAllocator.PREFER_PRIMARY_SHARD_REBALANCE, + BalancedShardsAllocator.PREFER_PRIMARY_INDEX_BALANCE_BUFFER, + BalancedShardsAllocator.PREFER_PRIMARY_SHARD_BALANCE_BUFFER, + BalancedShardsAllocator.SHARDS_PER_INDEX_BALANCE, + BalancedShardsAllocator.SHARDS_PER_INDEX_BALANCE_BUFFER, BalancedShardsAllocator.SHARD_MOVE_PRIMARY_FIRST_SETTING, BalancedShardsAllocator.SHARD_MOVEMENT_STRATEGY_SETTING, BalancedShardsAllocator.THRESHOLD_SETTING, diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationConstraintsTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationConstraintsTests.java index 90546620e9e3e..d05b47424bd98 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationConstraintsTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationConstraintsTests.java @@ -34,10 +34,12 @@ public void testSettings() { float indexBalanceFactor = randomFloat(); float shardBalance = randomFloat(); float threshold = randomFloat(); + boolean shardsPerIndexBalance = randomBoolean(); settings.put(BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING.getKey(), indexBalanceFactor); settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), shardBalance); settings.put(BalancedShardsAllocator.THRESHOLD_SETTING.getKey(), threshold); settings.put(BalancedShardsAllocator.PREFER_PRIMARY_SHARD_BALANCE.getKey(), true); + settings.put(BalancedShardsAllocator.SHARDS_PER_INDEX_BALANCE.getKey(), shardsPerIndexBalance); service.applySettings(settings.build()); @@ -45,6 +47,10 @@ public void testSettings() { assertEquals(shardBalance, allocator.getShardBalance(), 0.01); assertEquals(threshold, allocator.getThreshold(), 0.01); assertEquals(true, allocator.getPreferPrimaryBalance()); + assertEquals( + shardsPerIndexBalance, + allocator.getAllocationParam().getAllocationConstraint(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID).isEnable() + ); settings.put(BalancedShardsAllocator.PREFER_PRIMARY_SHARD_BALANCE.getKey(), false); service.applySettings(settings.build()); @@ -58,7 +64,9 @@ public void testSettings() { public void testIndexShardsPerNodeConstraint() { ShardsBalancer balancer = mock(LocalShardsBalancer.class); BalancedShardsAllocator.ModelNode node = mock(BalancedShardsAllocator.ModelNode.class); - AllocationConstraints constraints = new AllocationConstraints(); + float buffer = randomFloat(); + AllocationParameter allocationParameter = new AllocationParameter(0, 0, buffer); + AllocationConstraints constraints = new AllocationConstraints(allocationParameter); constraints.updateAllocationConstraint(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, true); int shardCount = randomIntBetween(1, 500); @@ -68,9 +76,8 @@ public void testIndexShardsPerNodeConstraint() { when(node.numShards(anyString())).thenReturn(shardCount); when(node.getNodeId()).thenReturn("test-node"); - long expectedWeight = (shardCount >= avgShardsPerNode) ? CONSTRAINT_WEIGHT : 0; + long expectedWeight = (shardCount >= (1 + buffer) * avgShardsPerNode) ? CONSTRAINT_WEIGHT : 0; assertEquals(expectedWeight, constraints.weight(balancer, node, "index")); - } /** @@ -80,8 +87,11 @@ public void testIndexShardsPerNodeConstraint() { public void testPerIndexPrimaryShardsConstraint() { ShardsBalancer balancer = mock(LocalShardsBalancer.class); BalancedShardsAllocator.ModelNode node = mock(BalancedShardsAllocator.ModelNode.class); - AllocationConstraints constraints = new AllocationConstraints(); + float buffer = .6f; + AllocationParameter allocationParameter = new AllocationParameter(0, buffer, 0); + AllocationConstraints constraints = new AllocationConstraints(allocationParameter); constraints.updateAllocationConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, true); + constraints.updateAllocationConstraint(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, false); final String indexName = "test-index"; int perIndexPrimaryShardCount = 1; @@ -95,6 +105,19 @@ public void testPerIndexPrimaryShardsConstraint() { perIndexPrimaryShardCount = 3; when(node.numPrimaryShards(anyString())).thenReturn(perIndexPrimaryShardCount); + assertEquals(0, constraints.weight(balancer, node, indexName)); + + perIndexPrimaryShardCount = 4; + when(node.numPrimaryShards(anyString())).thenReturn(perIndexPrimaryShardCount); + assertEquals(CONSTRAINT_WEIGHT, constraints.weight(balancer, node, indexName)); + + perIndexPrimaryShardCount = 3; + when(node.numPrimaryShards(anyString())).thenReturn(perIndexPrimaryShardCount); + buffer = .0f; + allocationParameter = new AllocationParameter(0, buffer, 0); + constraints = new AllocationConstraints(allocationParameter); + constraints.updateAllocationConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, true); + constraints.updateAllocationConstraint(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, false); assertEquals(CONSTRAINT_WEIGHT, constraints.weight(balancer, node, indexName)); constraints.updateAllocationConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, false); @@ -107,7 +130,8 @@ public void testPerIndexPrimaryShardsConstraint() { public void testGlobalPrimaryShardsConstraint() { ShardsBalancer balancer = mock(LocalShardsBalancer.class); BalancedShardsAllocator.ModelNode node = mock(BalancedShardsAllocator.ModelNode.class); - AllocationConstraints constraints = new AllocationConstraints(); + AllocationParameter allocationParameter = new AllocationParameter(0, 0, 0); + AllocationConstraints constraints = new AllocationConstraints(allocationParameter); constraints.updateAllocationConstraint(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, true); final String indexName = "test-index"; @@ -124,8 +148,17 @@ public void testGlobalPrimaryShardsConstraint() { when(node.numPrimaryShards()).thenReturn(primaryShardCount); assertEquals(CONSTRAINT_WEIGHT, constraints.weight(balancer, node, indexName)); - constraints.updateAllocationConstraint(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, false); + // With buffer - Weight of 0 expected + float buffer = .6f; + allocationParameter = new AllocationParameter(buffer, 0, 0); + constraints = new AllocationConstraints(allocationParameter); + constraints.updateAllocationConstraint(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, true); assertEquals(0, constraints.weight(balancer, node, indexName)); + + // With buffer - Weight of CONSTRAINT_WEIGHT + primaryShardCount = 5; + when(node.numPrimaryShards()).thenReturn(primaryShardCount); + assertEquals(CONSTRAINT_WEIGHT, constraints.weight(balancer, node, indexName)); } /** @@ -134,7 +167,8 @@ public void testGlobalPrimaryShardsConstraint() { public void testPrimaryShardsConstraints() { ShardsBalancer balancer = mock(LocalShardsBalancer.class); BalancedShardsAllocator.ModelNode node = mock(BalancedShardsAllocator.ModelNode.class); - AllocationConstraints constraints = new AllocationConstraints(); + AllocationParameter allocationParameter = new AllocationParameter(0, 0, 0); + AllocationConstraints constraints = new AllocationConstraints(allocationParameter); constraints.updateAllocationConstraint(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, true); constraints.updateAllocationConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, true); @@ -175,7 +209,8 @@ public void testPrimaryShardsConstraints() { public void testAllConstraints() { ShardsBalancer balancer = mock(LocalShardsBalancer.class); BalancedShardsAllocator.ModelNode node = mock(BalancedShardsAllocator.ModelNode.class); - AllocationConstraints constraints = new AllocationConstraints(); + AllocationParameter allocationParameter = new AllocationParameter(0, 0, 0); + AllocationConstraints constraints = new AllocationConstraints(allocationParameter); constraints.updateAllocationConstraint(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, true); constraints.updateAllocationConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, true); constraints.updateAllocationConstraint(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, true); From cf053d204c78d6a725a5b4ec082278a541b427a0 Mon Sep 17 00:00:00 2001 From: Gaurav Bafna Date: Mon, 24 Jun 2024 16:24:28 +0530 Subject: [PATCH 2/4] Doc changes and refactor Signed-off-by: Gaurav Bafna --- .../routing/allocation/AllocationParameter.java | 5 ++++- .../allocator/BalancedShardsAllocator.java | 14 +++++++------- .../allocation/AllocationConstraintsTests.java | 4 ++-- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationParameter.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationParameter.java index 30b829df072c5..b31dcd766162a 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationParameter.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationParameter.java @@ -8,6 +8,9 @@ package org.opensearch.cluster.routing.allocation; +/** + * Allocation Constraint Parameters + */ public class AllocationParameter { private float preferPrimaryBalanceShardBuffer; @@ -15,7 +18,7 @@ public class AllocationParameter { private float shardBalanceBuffer; - public AllocationParameter(float preferPrimaryBalanceShardBuffer, float preferPrimaryBalanceIndexBuffer, float shardBalanceBuffer) { + public AllocationParameter(float shardBalanceBuffer, float preferPrimaryBalanceIndexBuffer, float preferPrimaryBalanceShardBuffer) { this.preferPrimaryBalanceShardBuffer = preferPrimaryBalanceShardBuffer; this.preferPrimaryBalanceIndexBuffer = preferPrimaryBalanceIndexBuffer; this.shardBalanceBuffer = shardBalanceBuffer; diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 4103fd05ab398..998c2ded0baad 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -136,7 +136,7 @@ public class BalancedShardsAllocator implements ShardsAllocator { ); /** - * This setting governs whether primary shards balance is desired during allocation. This is used by + * This setting governs whether primary shards balance is desired during allocation. This is used by {@link ConstraintTypes#isPerIndexPrimaryShardsPerNodeBreached(float)} * and {@link ConstraintTypes#isPrimaryShardsPerNodeBreached} which is used during unassigned shard allocation * {@link LocalShardsBalancer#allocateUnassigned()} and shard re-balance/relocation to a different node via {@link LocalShardsBalancer#balance()} . */ @@ -297,9 +297,9 @@ private void updateWeightFunction() { weightFunction = new WeightFunction( this.indexBalanceFactor, this.shardBalanceFactor, - this.preferPrimaryBalanceShardBuffer, - this.preferPrimaryBalanceIndexBuffer, this.shardsPerIndexBalanceBuffer, + this.preferPrimaryBalanceIndexBuffer, + this.preferPrimaryBalanceShardBuffer, this.preferPrimaryShardRebalanceBuffer ); } @@ -492,9 +492,9 @@ static class WeightFunction { WeightFunction( float indexBalance, float shardBalance, - float preferPrimaryBalanceShardBuffer, - float preferPrimaryBalanceIndexBuffer, float shardsPerIndexBalanceBuffer, + float preferPrimaryBalanceIndexBuffer, + float preferPrimaryBalanceShardBuffer, float preferPrimaryBalanceBuffer ) { float sum = indexBalance + shardBalance; @@ -510,9 +510,9 @@ static class WeightFunction { this.indexBalance = indexBalance; this.shardBalance = shardBalance; AllocationParameter allocationParameter = new AllocationParameter( - preferPrimaryBalanceShardBuffer, + shardsPerIndexBalanceBuffer, preferPrimaryBalanceIndexBuffer, - shardsPerIndexBalanceBuffer + preferPrimaryBalanceShardBuffer ); RebalanceParameter rebalanceParameter = new RebalanceParameter(preferPrimaryBalanceBuffer); this.constraints = new AllocationConstraints(allocationParameter); diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationConstraintsTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationConstraintsTests.java index d05b47424bd98..7bafaf6d0b2d5 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationConstraintsTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationConstraintsTests.java @@ -65,7 +65,7 @@ public void testIndexShardsPerNodeConstraint() { ShardsBalancer balancer = mock(LocalShardsBalancer.class); BalancedShardsAllocator.ModelNode node = mock(BalancedShardsAllocator.ModelNode.class); float buffer = randomFloat(); - AllocationParameter allocationParameter = new AllocationParameter(0, 0, buffer); + AllocationParameter allocationParameter = new AllocationParameter(buffer, 0, 0); AllocationConstraints constraints = new AllocationConstraints(allocationParameter); constraints.updateAllocationConstraint(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, true); @@ -150,7 +150,7 @@ public void testGlobalPrimaryShardsConstraint() { // With buffer - Weight of 0 expected float buffer = .6f; - allocationParameter = new AllocationParameter(buffer, 0, 0); + allocationParameter = new AllocationParameter(0, 0, buffer); constraints = new AllocationConstraints(allocationParameter); constraints.updateAllocationConstraint(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, true); assertEquals(0, constraints.weight(balancer, node, indexName)); From a82e4f50b2a3ff99511a92391bf5b1ef9514e4c2 Mon Sep 17 00:00:00 2001 From: Gaurav Bafna Date: Mon, 24 Jun 2024 19:04:26 +0530 Subject: [PATCH 3/4] Address PR comments Signed-off-by: Gaurav Bafna --- CHANGELOG.md | 1 + .../allocation/AllocationConstraints.java | 4 +- .../allocation/AllocationParameter.java | 20 +++--- .../allocator/BalancedShardsAllocator.java | 67 +++++++++---------- .../common/settings/ClusterSettings.java | 4 +- 5 files changed, 48 insertions(+), 48 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8c3e63e36bc82..05de058fe8319 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Added - Add fingerprint ingest processor ([#13724](https://github.com/opensearch-project/OpenSearch/pull/13724)) - [Remote Store] Rate limiter for remote store low priority uploads ([#14374](https://github.com/opensearch-project/OpenSearch/pull/14374/)) +- Add allocation constraints buffer and setting to enable index balance constraint ([#14515](https://github.com/opensearch-project/OpenSearch/pull/14515)) - Apply the date histogram rewrite optimization to range aggregation ([#13865](https://github.com/opensearch-project/OpenSearch/pull/13865)) ### Dependencies diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java index 92b2aecfecbe6..7c7b7146661eb 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java @@ -36,11 +36,11 @@ public AllocationConstraints(AllocationParameter allocationParameter) { ); this.constraints.put( INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, - new Constraint(isPerIndexPrimaryShardsPerNodeBreached(allocationParameter.getPreferPrimaryBalanceIndexBuffer())) + new Constraint(isPerIndexPrimaryShardsPerNodeBreached(allocationParameter.getPrimaryBalanceIndexBuffer())) ); this.constraints.put( CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, - new Constraint(isPrimaryShardsPerNodeBreached(allocationParameter.getPreferPrimaryBalanceShardBuffer())) + new Constraint(isPrimaryShardsPerNodeBreached(allocationParameter.getPrimaryBalanceShardBuffer())) ); } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationParameter.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationParameter.java index b31dcd766162a..7de8c361df73e 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationParameter.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationParameter.java @@ -12,24 +12,24 @@ * Allocation Constraint Parameters */ public class AllocationParameter { - private float preferPrimaryBalanceShardBuffer; + private final float primaryBalanceShardBuffer; - private float preferPrimaryBalanceIndexBuffer; + private final float primaryBalanceIndexBuffer; - private float shardBalanceBuffer; + private final float shardBalanceBuffer; - public AllocationParameter(float shardBalanceBuffer, float preferPrimaryBalanceIndexBuffer, float preferPrimaryBalanceShardBuffer) { - this.preferPrimaryBalanceShardBuffer = preferPrimaryBalanceShardBuffer; - this.preferPrimaryBalanceIndexBuffer = preferPrimaryBalanceIndexBuffer; + public AllocationParameter(float shardBalanceBuffer, float primaryBalanceIndexBuffer, float primaryBalanceShardBuffer) { this.shardBalanceBuffer = shardBalanceBuffer; + this.primaryBalanceShardBuffer = primaryBalanceShardBuffer; + this.primaryBalanceIndexBuffer = primaryBalanceIndexBuffer; } - public float getPreferPrimaryBalanceShardBuffer() { - return preferPrimaryBalanceShardBuffer; + public float getPrimaryBalanceShardBuffer() { + return primaryBalanceShardBuffer; } - public float getPreferPrimaryBalanceIndexBuffer() { - return preferPrimaryBalanceIndexBuffer; + public float getPrimaryBalanceIndexBuffer() { + return primaryBalanceIndexBuffer; } public float getShardBalanceBuffer() { diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 998c2ded0baad..de650c736c479 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -63,7 +63,6 @@ import java.util.Set; import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID; -import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID; import static org.opensearch.cluster.routing.allocation.ConstraintTypes.INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID; import static org.opensearch.cluster.routing.allocation.ConstraintTypes.INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID; @@ -149,7 +148,7 @@ public class BalancedShardsAllocator implements ShardsAllocator { ); public static final Setting SHARDS_PER_INDEX_BALANCE = Setting.boolSetting( - "cluster.routing.allocation.balance.constraint.shard.enable", + "cluster.routing.allocation.balance.constraint.index.enable", true, Property.Dynamic, Property.NodeScope @@ -162,7 +161,7 @@ public class BalancedShardsAllocator implements ShardsAllocator { Property.NodeScope ); - public static final Setting PREFER_PRIMARY_SHARD_BALANCE_BUFFER = Setting.floatSetting( + public static final Setting PRIMARY_SHARD_BALANCE_BUFFER = Setting.floatSetting( "cluster.routing.allocation.balance.primary.shard.buffer", 0.0f, 0.0f, @@ -170,7 +169,7 @@ public class BalancedShardsAllocator implements ShardsAllocator { Property.NodeScope ); - public static final Setting PREFER_PRIMARY_INDEX_BALANCE_BUFFER = Setting.floatSetting( + public static final Setting PRIMARY_INDEX_BALANCE_BUFFER = Setting.floatSetting( "cluster.routing.allocation.balance.primary.index.buffer", 0.0f, 0.0f, @@ -179,7 +178,7 @@ public class BalancedShardsAllocator implements ShardsAllocator { ); public static final Setting SHARDS_PER_INDEX_BALANCE_BUFFER = Setting.floatSetting( - "cluster.routing.allocation.balance.constraint.shard.buffer", + "cluster.routing.allocation.balance.constraint.index.buffer", 0.0f, 0.0f, Property.Dynamic, @@ -198,14 +197,10 @@ public class BalancedShardsAllocator implements ShardsAllocator { private volatile ShardMovementStrategy shardMovementStrategy; private volatile boolean preferPrimaryShardBalance; - private volatile boolean shardsPerIndexBalance; private volatile boolean preferPrimaryShardRebalance; - - private volatile float preferPrimaryBalanceShardBuffer; - - private volatile float preferPrimaryBalanceIndexBuffer; - + private volatile float primaryBalanceShardBuffer; + private volatile float primaryBalanceIndexBuffer; private volatile float shardsPerIndexBalanceBuffer; private volatile float preferPrimaryShardRebalanceBuffer; private volatile float indexBalanceFactor; @@ -219,26 +214,27 @@ public BalancedShardsAllocator(Settings settings) { @Inject public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSettings) { + setThreshold(THRESHOLD_SETTING.get(settings)); + setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings)); setShardBalanceFactor(SHARD_BALANCE_FACTOR_SETTING.get(settings)); setIndexBalanceFactor(INDEX_BALANCE_FACTOR_SETTING.get(settings)); - setPreferPrimaryBalanceShardBuffer(PREFER_PRIMARY_SHARD_BALANCE_BUFFER.get(settings)); - setPreferPrimaryBalanceIndexBuffer(PREFER_PRIMARY_INDEX_BALANCE_BUFFER.get(settings)); + setPrimaryBalanceShardBuffer(PRIMARY_SHARD_BALANCE_BUFFER.get(settings)); + setPrimaryBalanceIndexBuffer(PRIMARY_INDEX_BALANCE_BUFFER.get(settings)); setShardsPerIndexBalanceBuffer(SHARDS_PER_INDEX_BALANCE_BUFFER.get(settings)); setPreferPrimaryShardRebalanceBuffer(PRIMARY_SHARD_REBALANCE_BUFFER.get(settings)); - updateWeightFunction(); - setThreshold(THRESHOLD_SETTING.get(settings)); setPreferPrimaryShardBalance(PREFER_PRIMARY_SHARD_BALANCE.get(settings)); setPreferPrimaryShardRebalance(PREFER_PRIMARY_SHARD_REBALANCE.get(settings)); setShardsPerIndexBalance(SHARDS_PER_INDEX_BALANCE.get(settings)); - setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings)); + updateWeightFunction(); + clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE, this::setPreferPrimaryShardBalance); clusterSettings.addSettingsUpdateConsumer(SHARDS_PER_INDEX_BALANCE, this::setShardsPerIndexBalance); clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst); clusterSettings.addSettingsUpdateConsumer(SHARD_MOVEMENT_STRATEGY_SETTING, this::setShardMovementStrategy); clusterSettings.addSettingsUpdateConsumer(INDEX_BALANCE_FACTOR_SETTING, this::updateIndexBalanceFactor); clusterSettings.addSettingsUpdateConsumer(SHARD_BALANCE_FACTOR_SETTING, this::updateShardBalanceFactor); - clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE_BUFFER, this::setPreferPrimaryBalanceShardBuffer); - clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_INDEX_BALANCE_BUFFER, this::setPreferPrimaryBalanceIndexBuffer); + clusterSettings.addSettingsUpdateConsumer(PRIMARY_SHARD_BALANCE_BUFFER, this::setPrimaryBalanceShardBuffer); + clusterSettings.addSettingsUpdateConsumer(PRIMARY_INDEX_BALANCE_BUFFER, this::setPrimaryBalanceIndexBuffer); clusterSettings.addSettingsUpdateConsumer(SHARDS_PER_INDEX_BALANCE_BUFFER, this::setShardsPerIndexBalanceBuffer); clusterSettings.addSettingsUpdateConsumer(PRIMARY_SHARD_REBALANCE_BUFFER, this::updatePreferPrimaryShardBalanceBuffer); clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_REBALANCE, this::setPreferPrimaryShardRebalance); @@ -276,6 +272,7 @@ private void setShardBalanceFactor(float shardBalanceFactor) { private void setPreferPrimaryShardRebalanceBuffer(float preferPrimaryShardRebalanceBuffer) { this.preferPrimaryShardRebalanceBuffer = preferPrimaryShardRebalanceBuffer; + updateWeightFunction(); } private void updateIndexBalanceFactor(float indexBalanceFactor) { @@ -298,10 +295,14 @@ private void updateWeightFunction() { this.indexBalanceFactor, this.shardBalanceFactor, this.shardsPerIndexBalanceBuffer, - this.preferPrimaryBalanceIndexBuffer, - this.preferPrimaryBalanceShardBuffer, + this.primaryBalanceIndexBuffer, + this.primaryBalanceShardBuffer, this.preferPrimaryShardRebalanceBuffer ); + this.weightFunction.updateAllocationConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance); + this.weightFunction.updateAllocationConstraint(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance); + this.weightFunction.updateAllocationConstraint(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, shardsPerIndexBalance); + this.weightFunction.updateRebalanceConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance); } /** @@ -310,23 +311,21 @@ private void updateWeightFunction() { */ private void setPreferPrimaryShardBalance(boolean preferPrimaryShardBalance) { this.preferPrimaryShardBalance = preferPrimaryShardBalance; - this.weightFunction.updateAllocationConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance); - this.weightFunction.updateAllocationConstraint(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance); - this.weightFunction.updateRebalanceConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance); + updateWeightFunction(); } private void setShardsPerIndexBalance(boolean shardsPerIndexBalance) { this.shardsPerIndexBalance = shardsPerIndexBalance; - this.weightFunction.updateAllocationConstraint(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, shardsPerIndexBalance); + updateWeightFunction(); } - private void setPreferPrimaryBalanceShardBuffer(float preferPrimaryBalanceShardBuffer) { - this.preferPrimaryBalanceShardBuffer = preferPrimaryBalanceShardBuffer; + private void setPrimaryBalanceShardBuffer(float primaryBalanceShardBuffer) { + this.primaryBalanceShardBuffer = primaryBalanceShardBuffer; updateWeightFunction(); } - private void setPreferPrimaryBalanceIndexBuffer(float preferPrimaryBalanceIndexBuffer) { - this.preferPrimaryBalanceIndexBuffer = preferPrimaryBalanceIndexBuffer; + private void setPrimaryBalanceIndexBuffer(float primaryBalanceIndexBuffer) { + this.primaryBalanceIndexBuffer = primaryBalanceIndexBuffer; updateWeightFunction(); } @@ -337,7 +336,7 @@ private void setShardsPerIndexBalanceBuffer(float shardsPerIndexBalanceBuffer) { private void setPreferPrimaryShardRebalance(boolean preferPrimaryShardRebalance) { this.preferPrimaryShardRebalance = preferPrimaryShardRebalance; - this.weightFunction.updateRebalanceConstraint(CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID, preferPrimaryShardRebalance); + updateWeightFunction(); } private void setThreshold(float threshold) { @@ -493,8 +492,8 @@ static class WeightFunction { float indexBalance, float shardBalance, float shardsPerIndexBalanceBuffer, - float preferPrimaryBalanceIndexBuffer, - float preferPrimaryBalanceShardBuffer, + float primaryBalanceIndexBuffer, + float primaryBalanceShardBuffer, float preferPrimaryBalanceBuffer ) { float sum = indexBalance + shardBalance; @@ -505,14 +504,14 @@ static class WeightFunction { theta0 = 0; theta1 = 0; } else { - throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum); + throw new IllegalArgumentException("Balance factors must sum to a value >= 0 but was: " + sum); } this.indexBalance = indexBalance; this.shardBalance = shardBalance; AllocationParameter allocationParameter = new AllocationParameter( shardsPerIndexBalanceBuffer, - preferPrimaryBalanceIndexBuffer, - preferPrimaryBalanceShardBuffer + primaryBalanceIndexBuffer, + primaryBalanceShardBuffer ); RebalanceParameter rebalanceParameter = new RebalanceParameter(preferPrimaryBalanceBuffer); this.constraints = new AllocationConstraints(allocationParameter); diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 3e26e69c0c020..6b3d0b5fe3375 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -263,8 +263,8 @@ public void apply(Settings value, Settings current, Settings previous) { BalancedShardsAllocator.PRIMARY_SHARD_REBALANCE_BUFFER, BalancedShardsAllocator.PREFER_PRIMARY_SHARD_BALANCE, BalancedShardsAllocator.PREFER_PRIMARY_SHARD_REBALANCE, - BalancedShardsAllocator.PREFER_PRIMARY_INDEX_BALANCE_BUFFER, - BalancedShardsAllocator.PREFER_PRIMARY_SHARD_BALANCE_BUFFER, + BalancedShardsAllocator.PRIMARY_INDEX_BALANCE_BUFFER, + BalancedShardsAllocator.PRIMARY_SHARD_BALANCE_BUFFER, BalancedShardsAllocator.SHARDS_PER_INDEX_BALANCE, BalancedShardsAllocator.SHARDS_PER_INDEX_BALANCE_BUFFER, BalancedShardsAllocator.SHARD_MOVE_PRIMARY_FIRST_SETTING, From b259bfbc5ad6c569beec7fce736fd34226eec5aa Mon Sep 17 00:00:00 2001 From: Gaurav Bafna Date: Wed, 26 Jun 2024 09:06:12 +0530 Subject: [PATCH 4/4] Address PR comments Signed-off-by: Gaurav Bafna --- .../routing/allocation/AllocationConstraints.java | 2 +- .../cluster/routing/allocation/Constraint.java | 2 +- .../cluster/routing/allocation/ConstraintTypes.java | 12 ++++++------ 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java index 7c7b7146661eb..e282e88d8bda0 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java @@ -53,7 +53,7 @@ public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode no return params.weight(constraints); } - public Constraint getAllocationConstraint(String name) { + Constraint getAllocationConstraint(String name) { return constraints.get(name); } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/Constraint.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/Constraint.java index 8ac2fdd779f7c..378a4eebc49c6 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/Constraint.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/Constraint.java @@ -40,7 +40,7 @@ public void setEnable(boolean enable) { this.enable = enable; } - public boolean isEnable() { + boolean isEnable() { return enable; } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java index 2a6299e5c043f..6aac694fc19ab 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java @@ -19,22 +19,22 @@ public class ConstraintTypes { public final static long CONSTRAINT_WEIGHT = 1000000L; /** - * Defines per index constraint which is breached when a node contains more than avg number of primary shards for an index + * Defines per index constraint which is breached when a node contains more than avg number of primary shards for an index with added buffer */ public final static String INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID = "index.primary.shard.balance.constraint"; /** - * Defines a cluster constraint which is breached when a node contains more than avg primary shards across all indices + * Defines a cluster constraint which is breached when a node contains more than avg primary shards across all indices with added buffer */ public final static String CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID = "cluster.primary.shard.balance.constraint"; /** - * Defines a cluster constraint which is breached when a node contains more than avg primary shards across all indices + * Defines a cluster constraint which is breached when a node contains more than avg primary shards across all indices with added buffer */ public final static String CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID = "cluster.primary.shard.rebalance.constraint"; /** - * Defines an index constraint which is breached when a node contains more than avg number of shards for an index + * Defines an index constraint which is breached when a node contains more than avg number of shards for an index with added buffer */ public final static String INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID = "index.shard.count.constraint"; @@ -50,7 +50,7 @@ public class ConstraintTypes { * on one node, often resulting in a hotspot on that node. *

* This constraint is breached when balancer attempts to allocate more than - * average shards per index per node. + * average shards per index per node with added buffer. */ public static Predicate isIndexShardsPerNodeBreached(float buffer) { return (params) -> { @@ -62,7 +62,7 @@ public static Predicate isIndexShardsPerNodeBreache /** * Defines a predicate which returns true when specific to an index, a node contains more than average number of primary - * shards. This constraint is used in weight calculation during allocation and rebalancing. When breached a high weight + * shards with added buffer. This constraint is used in weight calculation during allocation and rebalancing. When breached a high weight * {@link ConstraintTypes#CONSTRAINT_WEIGHT} is assigned to node resulting in lesser chances of node being selected * as allocation or rebalancing target */