Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Fix remote shards balancer when filtering throttled nodes #11724

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix tracing context propagation for local transport instrumentation ([#11490](https://github.com/opensearch-project/OpenSearch/pull/11490))
- Fix parsing of single line comments in `lang-painless` ([#11815](https://github.com/opensearch-project/OpenSearch/issues/11815))
- Fix memory leak issue in ReorganizingLongHash ([#11953](https://github.com/opensearch-project/OpenSearch/issues/11953))
- [BUG] Fix remote shards balancer when filtering throttled nodes ([#11724](https://github.com/opensearch-project/OpenSearch/pull/11724))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
private final Logger logger;
private final RoutingAllocation allocation;
private final RoutingNodes routingNodes;
// indicates if there are any nodes being throttled for allocating any unassigned shards
private boolean anyNodesThrottled = false;

public RemoteShardsBalancer(Logger logger, RoutingAllocation allocation) {
this.logger = logger;
Expand Down Expand Up @@ -358,12 +360,16 @@
}

private void ignoreRemainingShards(Map<String, UnassignedIndexShards> unassignedShardMap) {
// If any nodes are throttled during allocation, mark all remaining unassigned shards as THROTTLED
final UnassignedInfo.AllocationStatus status = anyNodesThrottled
bugmakerrrrrr marked this conversation as resolved.
Show resolved Hide resolved
? UnassignedInfo.AllocationStatus.DECIDERS_THROTTLED

Check warning on line 365 in server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java#L365

Added line #L365 was not covered by tests
: UnassignedInfo.AllocationStatus.DECIDERS_NO;
for (UnassignedIndexShards indexShards : unassignedShardMap.values()) {
for (ShardRouting shard : indexShards.getPrimaries()) {
routingNodes.unassigned().ignoreShard(shard, UnassignedInfo.AllocationStatus.DECIDERS_NO, allocation.changes());
routingNodes.unassigned().ignoreShard(shard, status, allocation.changes());

Check warning on line 369 in server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java#L369

Added line #L369 was not covered by tests
}
for (ShardRouting shard : indexShards.getReplicas()) {
routingNodes.unassigned().ignoreShard(shard, UnassignedInfo.AllocationStatus.DECIDERS_NO, allocation.changes());
routingNodes.unassigned().ignoreShard(shard, status, allocation.changes());

Check warning on line 372 in server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java#L372

Added line #L372 was not covered by tests
}
}
}
Expand Down Expand Up @@ -424,11 +430,11 @@
private void tryAllocateUnassignedShard(Queue<RoutingNode> nodeQueue, ShardRouting shard) {
boolean allocated = false;
boolean throttled = false;
Set<String> nodesCheckedForShard = new HashSet<>();
int numNodesToCheck = nodeQueue.size();
while (nodeQueue.isEmpty() == false) {
RoutingNode node = nodeQueue.poll();
--numNodesToCheck;
Decision allocateDecision = allocation.deciders().canAllocate(shard, node, allocation);
nodesCheckedForShard.add(node.nodeId());
if (allocateDecision.type() == Decision.Type.YES) {
if (logger.isTraceEnabled()) {
logger.trace("Assigned shard [{}] to [{}]", shardShortSummary(shard), node.nodeId());
Expand Down Expand Up @@ -467,6 +473,10 @@
}
nodeQueue.offer(node);
} else {
if (nodeLevelDecision.type() == Decision.Type.THROTTLE) {
bugmakerrrrrr marked this conversation as resolved.
Show resolved Hide resolved
anyNodesThrottled = true;

Check warning on line 477 in server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java#L477

Added line #L477 was not covered by tests
}

if (logger.isTraceEnabled()) {
logger.trace(
"Cannot allocate any shard to node: [{}]. Removing from queue. Node level decisions: [{}],[{}]",
Expand All @@ -478,14 +488,14 @@
}

// Break out if all nodes in the queue have been checked for this shard
if (nodeQueue.stream().allMatch(rn -> nodesCheckedForShard.contains(rn.nodeId()))) {
if (numNodesToCheck == 0) {
break;
}
}
}

if (allocated == false) {
UnassignedInfo.AllocationStatus status = throttled
UnassignedInfo.AllocationStatus status = (throttled || anyNodesThrottled)
? UnassignedInfo.AllocationStatus.DECIDERS_THROTTLED
: UnassignedInfo.AllocationStatus.DECIDERS_NO;
routingNodes.unassigned().ignoreShard(shard, status, allocation.changes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,11 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
return Decision.ALWAYS;
}
}

@Override
public Decision canAllocateAnyShardToNode(RoutingNode node, RoutingAllocation allocation) {
return throttle ? Decision.THROTTLE : Decision.YES;
}
});
Collections.shuffle(deciders, random());
return new AllocationDeciders(deciders);
Expand Down
Loading