From e7c7be84c6095b6c449c304b0c258a7d65e59d7b Mon Sep 17 00:00:00 2001 From: panguixin Date: Fri, 26 Jan 2024 06:50:37 +0800 Subject: [PATCH] [BUG] Fix remote shards balancer when filtering throttled nodes (#11724) * fix remote shards balancer Signed-off-by: panguixin * add change log Signed-off-by: panguixin --------- Signed-off-by: panguixin Signed-off-by: Andrew Ross Co-authored-by: Andrew Ross Signed-off-by: Shivansh Arora --- CHANGELOG.md | 1 + .../allocator/RemoteShardsBalancer.java | 22 ++++++++++++++----- .../RemoteShardsBalancerBaseTestCase.java | 5 +++++ 3 files changed, 22 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1f847cf3a70ef..aa31819ffae97 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -243,6 +243,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix parsing of single line comments in `lang-painless` ([#11815](https://github.com/opensearch-project/OpenSearch/issues/11815)) - Fix memory leak issue in ReorganizingLongHash ([#11953](https://github.com/opensearch-project/OpenSearch/issues/11953)) - Prevent setting remote_snapshot store type on index creation ([#11867](https://github.com/opensearch-project/OpenSearch/pull/11867)) +- [BUG] Fix remote shards balancer when filtering throttled nodes ([#11724](https://github.com/opensearch-project/OpenSearch/pull/11724)) ### Security diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java index e2f24e5f503df..a05938c176678 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java @@ -43,6 +43,8 @@ public final class RemoteShardsBalancer extends ShardsBalancer { private final Logger logger; private final RoutingAllocation allocation; private final RoutingNodes routingNodes; + // indicates if there are any nodes being throttled for allocating any unassigned shards + private boolean anyNodesThrottled = false; public RemoteShardsBalancer(Logger logger, RoutingAllocation allocation) { this.logger = logger; @@ -358,12 +360,16 @@ private void allocateUnassignedReplicas(Queue nodeQueue, Map unassignedShardMap) { + // If any nodes are throttled during allocation, mark all remaining unassigned shards as THROTTLED + final UnassignedInfo.AllocationStatus status = anyNodesThrottled + ? UnassignedInfo.AllocationStatus.DECIDERS_THROTTLED + : UnassignedInfo.AllocationStatus.DECIDERS_NO; for (UnassignedIndexShards indexShards : unassignedShardMap.values()) { for (ShardRouting shard : indexShards.getPrimaries()) { - routingNodes.unassigned().ignoreShard(shard, UnassignedInfo.AllocationStatus.DECIDERS_NO, allocation.changes()); + routingNodes.unassigned().ignoreShard(shard, status, allocation.changes()); } for (ShardRouting shard : indexShards.getReplicas()) { - routingNodes.unassigned().ignoreShard(shard, UnassignedInfo.AllocationStatus.DECIDERS_NO, allocation.changes()); + routingNodes.unassigned().ignoreShard(shard, status, allocation.changes()); } } } @@ -424,11 +430,11 @@ private void allocateUnassignedShards( private void tryAllocateUnassignedShard(Queue nodeQueue, ShardRouting shard) { boolean allocated = false; boolean throttled = false; - Set nodesCheckedForShard = new HashSet<>(); + int numNodesToCheck = nodeQueue.size(); while (nodeQueue.isEmpty() == false) { RoutingNode node = nodeQueue.poll(); + --numNodesToCheck; Decision allocateDecision = allocation.deciders().canAllocate(shard, node, allocation); - nodesCheckedForShard.add(node.nodeId()); if (allocateDecision.type() == Decision.Type.YES) { if (logger.isTraceEnabled()) { logger.trace("Assigned shard [{}] to [{}]", shardShortSummary(shard), node.nodeId()); @@ -467,6 +473,10 @@ private void tryAllocateUnassignedShard(Queue nodeQueue, ShardRouti } nodeQueue.offer(node); } else { + if (nodeLevelDecision.type() == Decision.Type.THROTTLE) { + anyNodesThrottled = true; + } + if (logger.isTraceEnabled()) { logger.trace( "Cannot allocate any shard to node: [{}]. Removing from queue. Node level decisions: [{}],[{}]", @@ -478,14 +488,14 @@ private void tryAllocateUnassignedShard(Queue nodeQueue, ShardRouti } // Break out if all nodes in the queue have been checked for this shard - if (nodeQueue.stream().allMatch(rn -> nodesCheckedForShard.contains(rn.nodeId()))) { + if (numNodesToCheck == 0) { break; } } } if (allocated == false) { - UnassignedInfo.AllocationStatus status = throttled + UnassignedInfo.AllocationStatus status = (throttled || anyNodesThrottled) ? UnassignedInfo.AllocationStatus.DECIDERS_THROTTLED : UnassignedInfo.AllocationStatus.DECIDERS_NO; routingNodes.unassigned().ignoreShard(shard, status, allocation.changes()); diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java index a1db6cd83ab6c..6a03a1f79bcde 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java @@ -229,6 +229,11 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing return Decision.ALWAYS; } } + + @Override + public Decision canAllocateAnyShardToNode(RoutingNode node, RoutingAllocation allocation) { + return throttle ? Decision.THROTTLE : Decision.YES; + } }); Collections.shuffle(deciders, random()); return new AllocationDeciders(deciders);