Skip to content

Commit

Permalink
fix remote shards balancer
Browse files Browse the repository at this point in the history
Signed-off-by: panguixin <[email protected]>
  • Loading branch information
bugmakerrrrrr committed Jan 23, 2024
1 parent b0eab75 commit c9ecc72
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public final class RemoteShardsBalancer extends ShardsBalancer {
private final Logger logger;
private final RoutingAllocation allocation;
private final RoutingNodes routingNodes;
// indicates if there are any nodes being throttled for allocating any unassigned shards
private boolean anyNodesThrottled = false;

public RemoteShardsBalancer(Logger logger, RoutingAllocation allocation) {
this.logger = logger;
Expand Down Expand Up @@ -358,12 +360,16 @@ private void allocateUnassignedReplicas(Queue<RoutingNode> nodeQueue, Map<String
}

private void ignoreRemainingShards(Map<String, UnassignedIndexShards> unassignedShardMap) {
// If any nodes are throttled during allocation, mark all remaining unassigned shards as THROTTLED
final UnassignedInfo.AllocationStatus status = anyNodesThrottled
? UnassignedInfo.AllocationStatus.DECIDERS_THROTTLED
: UnassignedInfo.AllocationStatus.DECIDERS_NO;
for (UnassignedIndexShards indexShards : unassignedShardMap.values()) {
for (ShardRouting shard : indexShards.getPrimaries()) {
routingNodes.unassigned().ignoreShard(shard, UnassignedInfo.AllocationStatus.DECIDERS_NO, allocation.changes());
routingNodes.unassigned().ignoreShard(shard, status, allocation.changes());
}
for (ShardRouting shard : indexShards.getReplicas()) {
routingNodes.unassigned().ignoreShard(shard, UnassignedInfo.AllocationStatus.DECIDERS_NO, allocation.changes());
routingNodes.unassigned().ignoreShard(shard, status, allocation.changes());
}
}
}
Expand Down Expand Up @@ -424,11 +430,11 @@ private void allocateUnassignedShards(
private void tryAllocateUnassignedShard(Queue<RoutingNode> nodeQueue, ShardRouting shard) {
boolean allocated = false;
boolean throttled = false;
Set<String> nodesCheckedForShard = new HashSet<>();
int numNodesToCheck = nodeQueue.size();
while (nodeQueue.isEmpty() == false) {
RoutingNode node = nodeQueue.poll();
--numNodesToCheck;
Decision allocateDecision = allocation.deciders().canAllocate(shard, node, allocation);
nodesCheckedForShard.add(node.nodeId());
if (allocateDecision.type() == Decision.Type.YES) {
if (logger.isTraceEnabled()) {
logger.trace("Assigned shard [{}] to [{}]", shardShortSummary(shard), node.nodeId());
Expand Down Expand Up @@ -467,6 +473,10 @@ private void tryAllocateUnassignedShard(Queue<RoutingNode> nodeQueue, ShardRouti
}
nodeQueue.offer(node);
} else {
if (nodeLevelDecision.type() == Decision.Type.THROTTLE) {
anyNodesThrottled = true;
}

if (logger.isTraceEnabled()) {
logger.trace(
"Cannot allocate any shard to node: [{}]. Removing from queue. Node level decisions: [{}],[{}]",
Expand All @@ -478,14 +488,14 @@ private void tryAllocateUnassignedShard(Queue<RoutingNode> nodeQueue, ShardRouti
}

// Break out if all nodes in the queue have been checked for this shard
if (nodeQueue.stream().allMatch(rn -> nodesCheckedForShard.contains(rn.nodeId()))) {
if (numNodesToCheck == 0) {
break;
}
}
}

if (allocated == false) {
UnassignedInfo.AllocationStatus status = throttled
UnassignedInfo.AllocationStatus status = (throttled || anyNodesThrottled)
? UnassignedInfo.AllocationStatus.DECIDERS_THROTTLED
: UnassignedInfo.AllocationStatus.DECIDERS_NO;
routingNodes.unassigned().ignoreShard(shard, status, allocation.changes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,11 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
return Decision.ALWAYS;
}
}

@Override
public Decision canAllocateAnyShardToNode(RoutingNode node, RoutingAllocation allocation) {
return throttle ? Decision.THROTTLE : Decision.YES;
}
});
Collections.shuffle(deciders, random());
return new AllocationDeciders(deciders);
Expand Down

0 comments on commit c9ecc72

Please sign in to comment.