From c9ecc7260110ab619f7e3a9dfcc6af9d8d84690a Mon Sep 17 00:00:00 2001 From: panguixin Date: Wed, 3 Jan 2024 17:56:52 +0800 Subject: [PATCH 1/2] fix remote shards balancer Signed-off-by: panguixin --- .../allocator/RemoteShardsBalancer.java | 22 ++++++++++++++----- .../RemoteShardsBalancerBaseTestCase.java | 5 +++++ 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java index e2f24e5f503df..a05938c176678 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java @@ -43,6 +43,8 @@ public final class RemoteShardsBalancer extends ShardsBalancer { private final Logger logger; private final RoutingAllocation allocation; private final RoutingNodes routingNodes; + // indicates if there are any nodes being throttled for allocating any unassigned shards + private boolean anyNodesThrottled = false; public RemoteShardsBalancer(Logger logger, RoutingAllocation allocation) { this.logger = logger; @@ -358,12 +360,16 @@ private void allocateUnassignedReplicas(Queue nodeQueue, Map unassignedShardMap) { + // If any nodes are throttled during allocation, mark all remaining unassigned shards as THROTTLED + final UnassignedInfo.AllocationStatus status = anyNodesThrottled + ? UnassignedInfo.AllocationStatus.DECIDERS_THROTTLED + : UnassignedInfo.AllocationStatus.DECIDERS_NO; for (UnassignedIndexShards indexShards : unassignedShardMap.values()) { for (ShardRouting shard : indexShards.getPrimaries()) { - routingNodes.unassigned().ignoreShard(shard, UnassignedInfo.AllocationStatus.DECIDERS_NO, allocation.changes()); + routingNodes.unassigned().ignoreShard(shard, status, allocation.changes()); } for (ShardRouting shard : indexShards.getReplicas()) { - routingNodes.unassigned().ignoreShard(shard, UnassignedInfo.AllocationStatus.DECIDERS_NO, allocation.changes()); + routingNodes.unassigned().ignoreShard(shard, status, allocation.changes()); } } } @@ -424,11 +430,11 @@ private void allocateUnassignedShards( private void tryAllocateUnassignedShard(Queue nodeQueue, ShardRouting shard) { boolean allocated = false; boolean throttled = false; - Set nodesCheckedForShard = new HashSet<>(); + int numNodesToCheck = nodeQueue.size(); while (nodeQueue.isEmpty() == false) { RoutingNode node = nodeQueue.poll(); + --numNodesToCheck; Decision allocateDecision = allocation.deciders().canAllocate(shard, node, allocation); - nodesCheckedForShard.add(node.nodeId()); if (allocateDecision.type() == Decision.Type.YES) { if (logger.isTraceEnabled()) { logger.trace("Assigned shard [{}] to [{}]", shardShortSummary(shard), node.nodeId()); @@ -467,6 +473,10 @@ private void tryAllocateUnassignedShard(Queue nodeQueue, ShardRouti } nodeQueue.offer(node); } else { + if (nodeLevelDecision.type() == Decision.Type.THROTTLE) { + anyNodesThrottled = true; + } + if (logger.isTraceEnabled()) { logger.trace( "Cannot allocate any shard to node: [{}]. Removing from queue. Node level decisions: [{}],[{}]", @@ -478,14 +488,14 @@ private void tryAllocateUnassignedShard(Queue nodeQueue, ShardRouti } // Break out if all nodes in the queue have been checked for this shard - if (nodeQueue.stream().allMatch(rn -> nodesCheckedForShard.contains(rn.nodeId()))) { + if (numNodesToCheck == 0) { break; } } } if (allocated == false) { - UnassignedInfo.AllocationStatus status = throttled + UnassignedInfo.AllocationStatus status = (throttled || anyNodesThrottled) ? UnassignedInfo.AllocationStatus.DECIDERS_THROTTLED : UnassignedInfo.AllocationStatus.DECIDERS_NO; routingNodes.unassigned().ignoreShard(shard, status, allocation.changes()); diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java index a1db6cd83ab6c..6a03a1f79bcde 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java @@ -229,6 +229,11 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing return Decision.ALWAYS; } } + + @Override + public Decision canAllocateAnyShardToNode(RoutingNode node, RoutingAllocation allocation) { + return throttle ? Decision.THROTTLE : Decision.YES; + } }); Collections.shuffle(deciders, random()); return new AllocationDeciders(deciders); From ec30c5835e52b109a4f7589d96d1a949675e1fea Mon Sep 17 00:00:00 2001 From: panguixin Date: Tue, 9 Jan 2024 19:31:19 +0800 Subject: [PATCH 2/2] add change log Signed-off-by: panguixin --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3ab7ef4f10620..0a1c730ff7d54 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -240,6 +240,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix tracing context propagation for local transport instrumentation ([#11490](https://github.com/opensearch-project/OpenSearch/pull/11490)) - Fix parsing of single line comments in `lang-painless` ([#11815](https://github.com/opensearch-project/OpenSearch/issues/11815)) - Fix memory leak issue in ReorganizingLongHash ([#11953](https://github.com/opensearch-project/OpenSearch/issues/11953)) +- [BUG] Fix remote shards balancer when filtering throttled nodes ([#11724](https://github.com/opensearch-project/OpenSearch/pull/11724)) ### Security