Skip to content

Commit

Permalink
Add random allocation strategy
Browse files Browse the repository at this point in the history
Signed-off-by: Arpit Bandejiya <[email protected]>
  • Loading branch information
Arpit-Bandejiya committed Mar 11, 2024
1 parent 2b17902 commit 8582152
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,23 @@ public class BalancedShardsAllocator implements ShardsAllocator {
Property.NodeScope
);

/**
* This setting governs whether shards should be randomly allocated during assignment.
*/
public static final Setting<Boolean> PREFER_RANDOM_SHARD_ALLOCATION = Setting.boolSetting(
"cluster.routing.allocation.balance.prefer_random_allocation",
false,
Property.Dynamic,
Property.NodeScope
);

private volatile boolean movePrimaryFirst;
private volatile ShardMovementStrategy shardMovementStrategy;

private volatile boolean preferPrimaryShardBalance;
private volatile WeightFunction weightFunction;
private volatile float threshold;
private volatile boolean preferRandomShardAllocation;

public BalancedShardsAllocator(Settings settings) {
this(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
Expand All @@ -162,11 +173,13 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
setThreshold(THRESHOLD_SETTING.get(settings));
setPreferPrimaryShardBalance(PREFER_PRIMARY_SHARD_BALANCE.get(settings));
setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings));
setPreferRandomShardAllocation(PREFER_RANDOM_SHARD_ALLOCATION.get(settings));
clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE, this::setPreferPrimaryShardBalance);
clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst);
clusterSettings.addSettingsUpdateConsumer(SHARD_MOVEMENT_STRATEGY_SETTING, this::setShardMovementStrategy);
clusterSettings.addSettingsUpdateConsumer(INDEX_BALANCE_FACTOR_SETTING, SHARD_BALANCE_FACTOR_SETTING, this::setWeightFunction);
clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold);
clusterSettings.addSettingsUpdateConsumer(PREFER_RANDOM_SHARD_ALLOCATION, this::setPreferRandomShardAllocation);
}

/**
Expand Down Expand Up @@ -209,6 +222,9 @@ private void setThreshold(float threshold) {
this.threshold = threshold;
}

private void setPreferRandomShardAllocation(boolean preferRandomShardAllocation) {
this.preferRandomShardAllocation = preferRandomShardAllocation;
}
@Override
public void allocate(RoutingAllocation allocation) {
if (allocation.routingNodes().size() == 0) {
Expand All @@ -221,7 +237,8 @@ public void allocate(RoutingAllocation allocation) {
shardMovementStrategy,
weightFunction,
threshold,
preferPrimaryShardBalance
preferPrimaryShardBalance,
preferRandomShardAllocation
);
localShardsBalancer.allocateUnassigned();
localShardsBalancer.moveShards();
Expand All @@ -242,7 +259,8 @@ public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, f
shardMovementStrategy,
weightFunction,
threshold,
preferPrimaryShardBalance
preferPrimaryShardBalance,
preferRandomShardAllocation
);
AllocateUnassignedDecision allocateUnassignedDecision = AllocateUnassignedDecision.NOT_TAKEN;
MoveDecision moveDecision = MoveDecision.NOT_TAKEN;
Expand Down Expand Up @@ -495,7 +513,7 @@ public Balancer(
float threshold,
boolean preferPrimaryBalance
) {
super(logger, allocation, shardMovementStrategy, weight, threshold, preferPrimaryBalance);
super(logger, allocation, shardMovementStrategy, weight, threshold, preferPrimaryBalance, false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.opensearch.common.Randomness;
import org.opensearch.common.collect.Tuple;
import org.opensearch.gateway.PriorityComparator;

Expand Down Expand Up @@ -69,14 +70,16 @@ public class LocalShardsBalancer extends ShardsBalancer {
private final float avgPrimaryShardsPerNode;
private final BalancedShardsAllocator.NodeSorter sorter;
private final Set<RoutingNode> inEligibleTargetNode;
private final boolean preferRandomShardAllocation;

public LocalShardsBalancer(
Logger logger,
RoutingAllocation allocation,
ShardMovementStrategy shardMovementStrategy,
BalancedShardsAllocator.WeightFunction weight,
float threshold,
boolean preferPrimaryBalance
boolean preferPrimaryBalance,
boolean preferRandomShardAllocation
) {
this.logger = logger;
this.allocation = allocation;
Expand All @@ -92,6 +95,7 @@ public LocalShardsBalancer(
inEligibleTargetNode = new HashSet<>();
this.preferPrimaryBalance = preferPrimaryBalance;
this.shardMovementStrategy = shardMovementStrategy;
this.preferRandomShardAllocation = preferRandomShardAllocation;
}

/**
Expand Down Expand Up @@ -888,6 +892,7 @@ AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting shard) {
/* find an node with minimal weight we can allocate on*/
float minWeight = Float.POSITIVE_INFINITY;
BalancedShardsAllocator.ModelNode minNode = null;
List<BalancedShardsAllocator.ModelNode> minNodes = new ArrayList<>();
Decision decision = null;
/* Don't iterate over an identity hashset here the
* iteration order is different for each run and makes testing hard */
Expand Down Expand Up @@ -931,11 +936,26 @@ AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting shard) {
final int minNodeHigh = minNode.highestPrimary(shard.getIndexName());
updateMinNode = ((((nodeHigh > repId && minNodeHigh > repId) || (nodeHigh < repId && minNodeHigh < repId))
&& (nodeHigh < minNodeHigh)) || (nodeHigh > repId && minNodeHigh < repId));
minNodes.add(node);
} else {
updateMinNode = currentDecision.type() == Decision.Type.YES;
/* If updateMinNode is true, it means the earlier nodes had decision type THROTTLE. We will need to clear the list,
* and add new nodes to the list.
*/
if(updateMinNode) {
minNodes.clear();
minNodes.add(node);
}
}
} else {
updateMinNode = currentWeight < minWeight;
/* Since we have found nodes with less weight. We will need to clear the earlier minNodes list
* and add the new nodes to the list.
*/
if (updateMinNode) {
minNodes.clear();
minNodes.add(node);
}
}
if (updateMinNode) {
minNode = node;
Expand All @@ -959,6 +979,11 @@ AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting shard) {
nodeDecisions.add(new NodeAllocationResult(current.getNode(), current.getCanAllocateDecision(), ++weightRanking));
}
}

if(preferRandomShardAllocation && !minNodes.isEmpty()){
minNode = minNodes.get(Randomness.get().nextInt(minNodes.size()));
}

return AllocateUnassignedDecision.fromDecision(decision, minNode != null ? minNode.getRoutingNode().node() : null, nodeDecisions);
}

Expand Down

0 comments on commit 8582152

Please sign in to comment.