Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add buffer for allocation constraints and a switch to disable index b… #14515

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,20 @@
public class AllocationConstraints {
private Map<String, Constraint> constraints;

public AllocationConstraints() {
public AllocationConstraints(AllocationParameter allocationParameter) {
this.constraints = new HashMap<>();
this.constraints.put(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, new Constraint(isIndexShardsPerNodeBreached()));
this.constraints.put(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPerIndexPrimaryShardsPerNodeBreached()));
this.constraints.put(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPrimaryShardsPerNodeBreached(0.0f)));
this.constraints.put(
INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID,
new Constraint(isIndexShardsPerNodeBreached(allocationParameter.getShardBalanceBuffer()))
);
this.constraints.put(
INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID,
new Constraint(isPerIndexPrimaryShardsPerNodeBreached(allocationParameter.getPreferPrimaryBalanceIndexBuffer()))
);
this.constraints.put(
CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID,
new Constraint(isPrimaryShardsPerNodeBreached(allocationParameter.getPreferPrimaryBalanceShardBuffer()))
);
}

public void updateAllocationConstraint(String constraint, boolean enable) {
Expand All @@ -43,4 +52,8 @@ public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode no
Constraint.ConstraintParams params = new Constraint.ConstraintParams(balancer, node, index);
return params.weight(constraints);
}

public Constraint getAllocationConstraint(String name) {
return constraints.get(name);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing.allocation;

public class AllocationParameter {
private float preferPrimaryBalanceShardBuffer;

private float preferPrimaryBalanceIndexBuffer;

private float shardBalanceBuffer;

public AllocationParameter(float preferPrimaryBalanceShardBuffer, float preferPrimaryBalanceIndexBuffer, float shardBalanceBuffer) {
this.preferPrimaryBalanceShardBuffer = preferPrimaryBalanceShardBuffer;
this.preferPrimaryBalanceIndexBuffer = preferPrimaryBalanceIndexBuffer;
this.shardBalanceBuffer = shardBalanceBuffer;
}

public float getPreferPrimaryBalanceShardBuffer() {
return preferPrimaryBalanceShardBuffer;
}

public float getPreferPrimaryBalanceIndexBuffer() {
return preferPrimaryBalanceIndexBuffer;
}

public float getShardBalanceBuffer() {
return shardBalanceBuffer;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ public void setEnable(boolean enable) {
this.enable = enable;
}

public boolean isEnable() {
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
return enable;
}

static class ConstraintParams {
private ShardsBalancer balancer;
private BalancedShardsAllocator.ModelNode node;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ public class ConstraintTypes {
* This constraint is breached when balancer attempts to allocate more than
* average shards per index per node.
*/
public static Predicate<Constraint.ConstraintParams> isIndexShardsPerNodeBreached() {
public static Predicate<Constraint.ConstraintParams> isIndexShardsPerNodeBreached(float buffer) {
return (params) -> {
int currIndexShardsOnNode = params.getNode().numShards(params.getIndex());
int allowedIndexShardsPerNode = (int) Math.ceil(params.getBalancer().avgShardsPerNode(params.getIndex()));
int allowedIndexShardsPerNode = (int) Math.ceil(params.getBalancer().avgShardsPerNode(params.getIndex()) * (1 + buffer));
return (currIndexShardsOnNode >= allowedIndexShardsPerNode);
};
}
Expand All @@ -66,11 +66,14 @@ public static Predicate<Constraint.ConstraintParams> isIndexShardsPerNodeBreache
* {@link ConstraintTypes#CONSTRAINT_WEIGHT} is assigned to node resulting in lesser chances of node being selected
* as allocation or rebalancing target
*/
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
public static Predicate<Constraint.ConstraintParams> isPerIndexPrimaryShardsPerNodeBreached() {
public static Predicate<Constraint.ConstraintParams> isPerIndexPrimaryShardsPerNodeBreached(float buffer) {
return (params) -> {
int perIndexPrimaryShardCount = params.getNode().numPrimaryShards(params.getIndex());
int perIndexAllowedPrimaryShardCount = (int) Math.ceil(params.getBalancer().avgPrimaryShardsPerNode(params.getIndex()));
return perIndexPrimaryShardCount > perIndexAllowedPrimaryShardCount;
int perIndexAllowedPrimaryShardCount = (int) Math.ceil(
params.getBalancer().avgPrimaryShardsPerNode(params.getIndex()) * (1 + buffer)
);

return perIndexPrimaryShardCount >= perIndexAllowedPrimaryShardCount;
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class RebalanceConstraints {

public RebalanceConstraints(RebalanceParameter rebalanceParameter) {
this.constraints = new HashMap<>();
this.constraints.put(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPerIndexPrimaryShardsPerNodeBreached()));
this.constraints.put(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPerIndexPrimaryShardsPerNodeBreached(0.0f)));
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
this.constraints.put(
CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID,
new Constraint(isPrimaryShardsPerNodeBreached(rebalanceParameter.getPreferPrimaryBalanceBuffer()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.cluster.routing.UnassignedInfo.AllocationStatus;
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.AllocationConstraints;
import org.opensearch.cluster.routing.allocation.AllocationParameter;
import org.opensearch.cluster.routing.allocation.ConstraintTypes;
import org.opensearch.cluster.routing.allocation.MoveDecision;
import org.opensearch.cluster.routing.allocation.RebalanceConstraints;
Expand Down Expand Up @@ -135,7 +136,7 @@ public class BalancedShardsAllocator implements ShardsAllocator {
);

/**
* This setting governs whether primary shards balance is desired during allocation. This is used by {@link ConstraintTypes#isPerIndexPrimaryShardsPerNodeBreached()}
* This setting governs whether primary shards balance is desired during allocation. This is used by
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
* and {@link ConstraintTypes#isPrimaryShardsPerNodeBreached} which is used during unassigned shard allocation
* {@link LocalShardsBalancer#allocateUnassigned()} and shard re-balance/relocation to a different node via {@link LocalShardsBalancer#balance()} .
*/
Expand All @@ -147,13 +148,44 @@ public class BalancedShardsAllocator implements ShardsAllocator {
Property.NodeScope
);

public static final Setting<Boolean> SHARDS_PER_INDEX_BALANCE = Setting.boolSetting(
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
"cluster.routing.allocation.balance.constraint.shard.enable",
true,
Property.Dynamic,
Property.NodeScope
);

public static final Setting<Boolean> PREFER_PRIMARY_SHARD_REBALANCE = Setting.boolSetting(
"cluster.routing.allocation.rebalance.primary.enable",
false,
Property.Dynamic,
Property.NodeScope
);

public static final Setting<Float> PREFER_PRIMARY_SHARD_BALANCE_BUFFER = Setting.floatSetting(
"cluster.routing.allocation.balance.primary.shard.buffer",
0.0f,
0.0f,
Property.Dynamic,
Property.NodeScope
);

public static final Setting<Float> PREFER_PRIMARY_INDEX_BALANCE_BUFFER = Setting.floatSetting(
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
"cluster.routing.allocation.balance.primary.index.buffer",
0.0f,
0.0f,
Property.Dynamic,
Property.NodeScope
);

public static final Setting<Float> SHARDS_PER_INDEX_BALANCE_BUFFER = Setting.floatSetting(
"cluster.routing.allocation.balance.constraint.shard.buffer",
0.0f,
0.0f,
Property.Dynamic,
Property.NodeScope
);

public static final Setting<Float> PRIMARY_SHARD_REBALANCE_BUFFER = Setting.floatSetting(
"cluster.routing.allocation.rebalance.primary.buffer",
0.10f,
Expand All @@ -166,7 +198,15 @@ public class BalancedShardsAllocator implements ShardsAllocator {
private volatile ShardMovementStrategy shardMovementStrategy;

private volatile boolean preferPrimaryShardBalance;

private volatile boolean shardsPerIndexBalance;
private volatile boolean preferPrimaryShardRebalance;

private volatile float preferPrimaryBalanceShardBuffer;

private volatile float preferPrimaryBalanceIndexBuffer;

private volatile float shardsPerIndexBalanceBuffer;
private volatile float preferPrimaryShardRebalanceBuffer;
private volatile float indexBalanceFactor;
private volatile float shardBalanceFactor;
Expand All @@ -181,17 +221,25 @@ public BalancedShardsAllocator(Settings settings) {
public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSettings) {
setShardBalanceFactor(SHARD_BALANCE_FACTOR_SETTING.get(settings));
setIndexBalanceFactor(INDEX_BALANCE_FACTOR_SETTING.get(settings));
setPreferPrimaryBalanceShardBuffer(PREFER_PRIMARY_SHARD_BALANCE_BUFFER.get(settings));
setPreferPrimaryBalanceIndexBuffer(PREFER_PRIMARY_INDEX_BALANCE_BUFFER.get(settings));
setShardsPerIndexBalanceBuffer(SHARDS_PER_INDEX_BALANCE_BUFFER.get(settings));
setPreferPrimaryShardRebalanceBuffer(PRIMARY_SHARD_REBALANCE_BUFFER.get(settings));
updateWeightFunction();
setThreshold(THRESHOLD_SETTING.get(settings));
setPreferPrimaryShardBalance(PREFER_PRIMARY_SHARD_BALANCE.get(settings));
setPreferPrimaryShardRebalance(PREFER_PRIMARY_SHARD_REBALANCE.get(settings));
setShardsPerIndexBalance(SHARDS_PER_INDEX_BALANCE.get(settings));
setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings));
clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE, this::setPreferPrimaryShardBalance);
clusterSettings.addSettingsUpdateConsumer(SHARDS_PER_INDEX_BALANCE, this::setShardsPerIndexBalance);
clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst);
clusterSettings.addSettingsUpdateConsumer(SHARD_MOVEMENT_STRATEGY_SETTING, this::setShardMovementStrategy);
clusterSettings.addSettingsUpdateConsumer(INDEX_BALANCE_FACTOR_SETTING, this::updateIndexBalanceFactor);
clusterSettings.addSettingsUpdateConsumer(SHARD_BALANCE_FACTOR_SETTING, this::updateShardBalanceFactor);
clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE_BUFFER, this::setPreferPrimaryBalanceShardBuffer);
clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_INDEX_BALANCE_BUFFER, this::setPreferPrimaryBalanceIndexBuffer);
clusterSettings.addSettingsUpdateConsumer(SHARDS_PER_INDEX_BALANCE_BUFFER, this::setShardsPerIndexBalanceBuffer);
clusterSettings.addSettingsUpdateConsumer(PRIMARY_SHARD_REBALANCE_BUFFER, this::updatePreferPrimaryShardBalanceBuffer);
clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_REBALANCE, this::setPreferPrimaryShardRebalance);
clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold);
Expand Down Expand Up @@ -246,7 +294,14 @@ private void updatePreferPrimaryShardBalanceBuffer(float preferPrimaryShardBalan
}

private void updateWeightFunction() {
weightFunction = new WeightFunction(this.indexBalanceFactor, this.shardBalanceFactor, this.preferPrimaryShardRebalanceBuffer);
weightFunction = new WeightFunction(
this.indexBalanceFactor,
this.shardBalanceFactor,
this.preferPrimaryBalanceShardBuffer,
this.preferPrimaryBalanceIndexBuffer,
this.shardsPerIndexBalanceBuffer,
this.preferPrimaryShardRebalanceBuffer
);
}

/**
Expand All @@ -260,6 +315,26 @@ private void setPreferPrimaryShardBalance(boolean preferPrimaryShardBalance) {
this.weightFunction.updateRebalanceConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance);
}

private void setShardsPerIndexBalance(boolean shardsPerIndexBalance) {
this.shardsPerIndexBalance = shardsPerIndexBalance;
this.weightFunction.updateAllocationConstraint(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, shardsPerIndexBalance);
}

private void setPreferPrimaryBalanceShardBuffer(float preferPrimaryBalanceShardBuffer) {
this.preferPrimaryBalanceShardBuffer = preferPrimaryBalanceShardBuffer;
updateWeightFunction();
}

private void setPreferPrimaryBalanceIndexBuffer(float preferPrimaryBalanceIndexBuffer) {
this.preferPrimaryBalanceIndexBuffer = preferPrimaryBalanceIndexBuffer;
updateWeightFunction();
}

private void setShardsPerIndexBalanceBuffer(float shardsPerIndexBalanceBuffer) {
this.shardsPerIndexBalanceBuffer = shardsPerIndexBalanceBuffer;
updateWeightFunction();
}

private void setPreferPrimaryShardRebalance(boolean preferPrimaryShardRebalance) {
this.preferPrimaryShardRebalance = preferPrimaryShardRebalance;
this.weightFunction.updateRebalanceConstraint(CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID, preferPrimaryShardRebalance);
Expand Down Expand Up @@ -375,6 +450,10 @@ public boolean getPreferPrimaryBalance() {
return preferPrimaryShardBalance;
}

public AllocationConstraints getAllocationParam() {
return weightFunction.constraints;
}

/**
* This class is the primary weight function used to create balanced over nodes and shards in the cluster.
* Currently this function has 3 properties:
Expand Down Expand Up @@ -410,20 +489,34 @@ static class WeightFunction {
private AllocationConstraints constraints;
private RebalanceConstraints rebalanceConstraints;

WeightFunction(float indexBalance, float shardBalance, float preferPrimaryBalanceBuffer) {
WeightFunction(
float indexBalance,
float shardBalance,
float preferPrimaryBalanceShardBuffer,
float preferPrimaryBalanceIndexBuffer,
float shardsPerIndexBalanceBuffer,
float preferPrimaryBalanceBuffer
) {
float sum = indexBalance + shardBalance;
if (sum <= 0.0f) {
if (sum > 0.0f) {
theta0 = shardBalance / sum;
theta1 = indexBalance / sum;
} else if (sum == 0.0f) {
theta0 = 0;
theta1 = 0;
} else {
throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum);
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
}
theta0 = shardBalance / sum;
theta1 = indexBalance / sum;
this.indexBalance = indexBalance;
this.shardBalance = shardBalance;
AllocationParameter allocationParameter = new AllocationParameter(
preferPrimaryBalanceShardBuffer,
preferPrimaryBalanceIndexBuffer,
shardsPerIndexBalanceBuffer
);
RebalanceParameter rebalanceParameter = new RebalanceParameter(preferPrimaryBalanceBuffer);
this.constraints = new AllocationConstraints();
this.constraints = new AllocationConstraints(allocationParameter);
this.rebalanceConstraints = new RebalanceConstraints(rebalanceParameter);
// Enable index shard per node breach constraint
updateAllocationConstraint(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, true);
}

public float weightWithAllocationConstraints(ShardsBalancer balancer, ModelNode node, String index) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,10 @@ public void apply(Settings value, Settings current, Settings previous) {
BalancedShardsAllocator.PRIMARY_SHARD_REBALANCE_BUFFER,
BalancedShardsAllocator.PREFER_PRIMARY_SHARD_BALANCE,
BalancedShardsAllocator.PREFER_PRIMARY_SHARD_REBALANCE,
BalancedShardsAllocator.PREFER_PRIMARY_INDEX_BALANCE_BUFFER,
BalancedShardsAllocator.PREFER_PRIMARY_SHARD_BALANCE_BUFFER,
BalancedShardsAllocator.SHARDS_PER_INDEX_BALANCE,
BalancedShardsAllocator.SHARDS_PER_INDEX_BALANCE_BUFFER,
BalancedShardsAllocator.SHARD_MOVE_PRIMARY_FIRST_SETTING,
BalancedShardsAllocator.SHARD_MOVEMENT_STRATEGY_SETTING,
BalancedShardsAllocator.THRESHOLD_SETTING,
Expand Down
Loading
Loading