Skip to content

Commit

Permalink
Add timeout setting for allocate unassigned
Browse files Browse the repository at this point in the history
Signed-off-by: Rishab Nahata <[email protected]>
  • Loading branch information
imRishN committed Jul 7, 2024
1 parent 7a760e1 commit 14f29ea
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -828,11 +828,6 @@ public void allocateUnassigned(
unassignedAllocationHandler.removeAndIgnore(AllocationStatus.NO_VALID_SHARD_COPY, allocation.changes());
}

@Override
public TimeValue getAllocatorTimeout() {
return null;
}

@Override
public AllocateUnassignedDecision explainUnassignedShardAllocation(ShardRouting unassignedShard, RoutingAllocation allocation) {
assert unassignedShard.unassigned();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;

import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -162,6 +163,13 @@ public class BalancedShardsAllocator implements ShardsAllocator {
Property.NodeScope
);

public static final Setting<TimeValue> ALLOCATE_UNASSIGNED_TIMEOUT_SETTING = Setting.timeSetting(
"cluster.routing.allocation.allocate_unassigned_timeout",
TimeValue.timeValueSeconds(60),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

private volatile boolean movePrimaryFirst;
private volatile ShardMovementStrategy shardMovementStrategy;

Expand All @@ -172,6 +180,7 @@ public class BalancedShardsAllocator implements ShardsAllocator {
private volatile float shardBalanceFactor;
private volatile WeightFunction weightFunction;
private volatile float threshold;
private volatile TimeValue allocateUnassignedTimeout;

public BalancedShardsAllocator(Settings settings) {
this(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
Expand All @@ -187,6 +196,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
setPreferPrimaryShardBalance(PREFER_PRIMARY_SHARD_BALANCE.get(settings));
setPreferPrimaryShardRebalance(PREFER_PRIMARY_SHARD_REBALANCE.get(settings));
setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings));
setAllocateUnassignedTimeout(ALLOCATE_UNASSIGNED_TIMEOUT_SETTING.get(settings));
clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE, this::setPreferPrimaryShardBalance);
clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst);
clusterSettings.addSettingsUpdateConsumer(SHARD_MOVEMENT_STRATEGY_SETTING, this::setShardMovementStrategy);
Expand All @@ -195,6 +205,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
clusterSettings.addSettingsUpdateConsumer(PRIMARY_SHARD_REBALANCE_BUFFER, this::updatePreferPrimaryShardBalanceBuffer);
clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_REBALANCE, this::setPreferPrimaryShardRebalance);
clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold);
clusterSettings.addSettingsUpdateConsumer(ALLOCATE_UNASSIGNED_TIMEOUT_SETTING, this::setAllocateUnassignedTimeout);
}

/**
Expand Down Expand Up @@ -260,6 +271,10 @@ private void setPreferPrimaryShardBalance(boolean preferPrimaryShardBalance) {
this.weightFunction.updateRebalanceConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance);
}

private void setAllocateUnassignedTimeout(TimeValue allocateUnassignedTimeout) {
this.allocateUnassignedTimeout = allocateUnassignedTimeout;
}

private void setPreferPrimaryShardRebalance(boolean preferPrimaryShardRebalance) {
this.preferPrimaryShardRebalance = preferPrimaryShardRebalance;
this.weightFunction.updateRebalanceConstraint(CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID, preferPrimaryShardRebalance);
Expand All @@ -282,7 +297,8 @@ public void allocate(RoutingAllocation allocation) {
weightFunction,
threshold,
preferPrimaryShardBalance,
preferPrimaryShardRebalance
preferPrimaryShardRebalance,
allocateUnassignedTimeout
);
localShardsBalancer.allocateUnassigned();
localShardsBalancer.moveShards();
Expand All @@ -305,7 +321,8 @@ public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, f
weightFunction,
threshold,
preferPrimaryShardBalance,
preferPrimaryShardRebalance
preferPrimaryShardRebalance,
allocateUnassignedTimeout
);
AllocateUnassignedDecision allocateUnassignedDecision = AllocateUnassignedDecision.NOT_TAKEN;
MoveDecision moveDecision = MoveDecision.NOT_TAKEN;
Expand Down Expand Up @@ -559,7 +576,7 @@ public Balancer(
float threshold,
boolean preferPrimaryBalance
) {
super(logger, allocation, shardMovementStrategy, weight, threshold, preferPrimaryBalance, false);
super(logger, allocation, shardMovementStrategy, weight, threshold, preferPrimaryBalance, false, TimeValue.MAX_VALUE);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class LocalShardsBalancer extends ShardsBalancer {

private final boolean preferPrimaryBalance;
private final boolean preferPrimaryRebalance;
private final TimeValue allocateUnassignedTimeout;
private final BalancedShardsAllocator.WeightFunction weight;

private final float threshold;
Expand All @@ -72,7 +73,8 @@ public LocalShardsBalancer(
BalancedShardsAllocator.WeightFunction weight,
float threshold,
boolean preferPrimaryBalance,
boolean preferPrimaryRebalance
boolean preferPrimaryRebalance,
TimeValue allocateUnassignedTimeout
) {
this.logger = logger;
this.allocation = allocation;
Expand All @@ -89,6 +91,7 @@ public LocalShardsBalancer(
this.preferPrimaryBalance = preferPrimaryBalance;
this.preferPrimaryRebalance = preferPrimaryRebalance;
this.shardMovementStrategy = shardMovementStrategy;
this.allocateUnassignedTimeout = allocateUnassignedTimeout;
}

/**
Expand Down Expand Up @@ -879,7 +882,7 @@ AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting shard, lo
return AllocateUnassignedDecision.NOT_TAKEN;
}

if (System.nanoTime() - startTime > TimeValue.timeValueSeconds(30).nanos()) {
if (System.nanoTime() - startTime > allocateUnassignedTimeout.nanos()) {
logger.info("Timed out while running Local shard balancer allocate unassigned - outer loop");
return AllocateUnassignedDecision.throttle(null);
}
Expand Down

0 comments on commit 14f29ea

Please sign in to comment.