diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 785636fa7ff2a..a973193c76dce 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -62,9 +62,11 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.Locale; import java.util.Map; import java.util.Set; +import static org.opensearch.cluster.action.shard.ShardStateAction.FOLLOW_UP_REROUTE_PRIORITY_SETTING; import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID; import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID; import static org.opensearch.cluster.routing.allocation.ConstraintTypes.INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID; @@ -191,6 +193,32 @@ public class BalancedShardsAllocator implements ShardsAllocator { Setting.Property.Dynamic ); + /** + * Adjusts the priority of the followup reroute task when current round times out. NORMAL is right for reasonable clusters, + * but for a cluster in a messed up state which is starving NORMAL priority tasks, it might be necessary to raise this higher + * to allocate shards. + */ + public static final Setting FOLLOW_UP_REROUTE_PRIORITY_SETTING = new Setting<>( + "cluster.routing.allocation.balanced_shards_allocator.schedule_reroute.priority", + Priority.NORMAL.toString(), + BalancedShardsAllocator::parseReroutePriority, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + private static Priority parseReroutePriority(String priorityString) { + final Priority priority = Priority.valueOf(priorityString.toUpperCase(Locale.ROOT)); + switch (priority) { + case NORMAL: + case HIGH: + case URGENT: + return priority; + } + throw new IllegalArgumentException( + "priority [" + priority + "] not supported for [" + FOLLOW_UP_REROUTE_PRIORITY_SETTING.getKey() + "]" + ); + } + private volatile boolean movePrimaryFirst; private volatile ShardMovementStrategy shardMovementStrategy; @@ -204,6 +232,7 @@ public class BalancedShardsAllocator implements ShardsAllocator { private volatile boolean ignoreThrottleInRestore; private volatile TimeValue allocatorTimeout; + private volatile Priority followUpRerouteTaskPriority; private long startTime; private RerouteService rerouteService; @@ -223,6 +252,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting setPreferPrimaryShardRebalance(PREFER_PRIMARY_SHARD_REBALANCE.get(settings)); setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings)); setAllocatorTimeout(ALLOCATOR_TIMEOUT_SETTING.get(settings)); + setFollowUpRerouteTaskPriority(FOLLOW_UP_REROUTE_PRIORITY_SETTING.get(settings)); clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE, this::setPreferPrimaryShardBalance); clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst); clusterSettings.addSettingsUpdateConsumer(SHARD_MOVEMENT_STRATEGY_SETTING, this::setShardMovementStrategy); @@ -233,6 +263,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold); clusterSettings.addSettingsUpdateConsumer(IGNORE_THROTTLE_FOR_REMOTE_RESTORE, this::setIgnoreThrottleInRestore); clusterSettings.addSettingsUpdateConsumer(ALLOCATOR_TIMEOUT_SETTING, this::setAllocatorTimeout); + clusterSettings.addSettingsUpdateConsumer(FOLLOW_UP_REROUTE_PRIORITY_SETTING, this::setFollowUpRerouteTaskPriority); } @Override @@ -321,6 +352,10 @@ private void setAllocatorTimeout(TimeValue allocatorTimeout) { this.allocatorTimeout = allocatorTimeout; } + private void setFollowUpRerouteTaskPriority(Priority followUpRerouteTaskPriority) { + this.followUpRerouteTaskPriority = followUpRerouteTaskPriority; + } + protected boolean allocatorTimedOut() { if (allocatorTimeout.equals(TimeValue.MINUS_ONE)) { if (logger.isTraceEnabled()) { @@ -417,10 +452,13 @@ private void failAllocationOfNewPrimaries(RoutingAllocation allocation) { private void scheduleRerouteIfAllocatorTimedOut() { if (allocatorTimedOut()) { - assert rerouteService != null : "RerouteService not set to schedule reroute after allocator time out"; + if (rerouteService == null) { + logger.info("RerouteService not set to schedule reroute after allocator time out"); + return; + } rerouteService.reroute( "reroute after balanced shards allocator timed out", - Priority.HIGH, + followUpRerouteTaskPriority, ActionListener.wrap( r -> logger.trace("reroute after balanced shards allocator timed out completed"), e -> logger.debug("reroute after balanced shards allocator timed out failed", e) diff --git a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java index 9c38ea1df8a41..da00a0fb686d3 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java @@ -308,8 +308,8 @@ public void onComplete() { logger.trace("scheduling reroute after existing shards allocator timed out for primary shards"); assert rerouteService != null; rerouteService.reroute( - "reroute after existing shards allocator timed out", - Priority.HIGH, + "reroute after existing shards allocator [P] timed out", + Priority.NORMAL, ActionListener.wrap( r -> logger.trace("reroute after existing shards allocator timed out completed"), e -> logger.debug("reroute after existing shards allocator timed out failed", e) @@ -343,8 +343,8 @@ public void onComplete() { logger.trace("scheduling reroute after existing shards allocator timed out for replica shards"); assert rerouteService != null; rerouteService.reroute( - "reroute after existing shards allocator timed out", - Priority.HIGH, + "reroute after existing shards allocator [R] timed out", + Priority.NORMAL, ActionListener.wrap( r -> logger.trace("reroute after existing shards allocator timed out completed"), e -> logger.debug("reroute after existing shards allocator timed out failed", e)