Skip to content

Commit

Permalink
Optmising AwarenessAllocationDecider for hashmap.get call
Browse files Browse the repository at this point in the history
Signed-off-by: RS146BIJAY <[email protected]>
  • Loading branch information
RS146BIJAY committed Jul 23, 2024
1 parent b9d5804 commit 4c551ca
Showing 1 changed file with 58 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ public class AwarenessAllocationDecider extends AllocationDecider {
);

private volatile List<String> awarenessAttributes;

private volatile Map<String, List<String>> forcedAwarenessAttributes;

public AwarenessAllocationDecider(Settings settings, ClusterSettings clusterSettings) {
Expand Down Expand Up @@ -163,8 +162,8 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout
IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shardRouting.index());
int shardCount = indexMetadata.getNumberOfReplicas() + 1; // 1 for primary
for (String awarenessAttribute : awarenessAttributes) {
// the node the shard exists on must be associated with an awareness attribute
if (node.node().getAttributes().containsKey(awarenessAttribute) == false) {
// the node the shard exists on must be associated with an awareness attribute.
if (isAwarenessAttributeAssociatedWithNode(node, awarenessAttribute) == false) {
return allocation.decision(
Decision.NO,
NAME,
Expand All @@ -175,36 +174,10 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout
);
}

int currentNodeCount = getCurrentNodeCountForAttribute(shardRouting, node, allocation, moveToNode, awarenessAttribute);

// build attr_value -> nodes map
Set<String> nodesPerAttribute = allocation.routingNodes().nodesPerAttributesCounts(awarenessAttribute);

// build the count of shards per attribute value
Map<String, Integer> shardPerAttribute = new HashMap<>();
for (ShardRouting assignedShard : allocation.routingNodes().assignedShards(shardRouting.shardId())) {
if (assignedShard.started() || assignedShard.initializing()) {
// Note: this also counts relocation targets as that will be the new location of the shard.
// Relocation sources should not be counted as the shard is moving away
RoutingNode routingNode = allocation.routingNodes().node(assignedShard.currentNodeId());
shardPerAttribute.merge(routingNode.node().getAttributes().get(awarenessAttribute), 1, Integer::sum);
}
}

if (moveToNode) {
if (shardRouting.assignedToNode()) {
String nodeId = shardRouting.relocating() ? shardRouting.relocatingNodeId() : shardRouting.currentNodeId();
if (node.nodeId().equals(nodeId) == false) {
// we work on different nodes, move counts around
shardPerAttribute.compute(
allocation.routingNodes().node(nodeId).node().getAttributes().get(awarenessAttribute),
(k, v) -> (v == null) ? 0 : v - 1
);
shardPerAttribute.merge(node.node().getAttributes().get(awarenessAttribute), 1, Integer::sum);
}
} else {
shardPerAttribute.merge(node.node().getAttributes().get(awarenessAttribute), 1, Integer::sum);
}
}

int numberOfAttributes = nodesPerAttribute.size();
List<String> fullValues = forcedAwarenessAttributes.get(awarenessAttribute);

Expand All @@ -216,9 +189,8 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout
}
numberOfAttributes = attributesSet.size();
}
// TODO should we remove ones that are not part of full list?

final int currentNodeCount = shardPerAttribute.get(node.node().getAttributes().get(awarenessAttribute));
// TODO should we remove ones that are not part of full list?
final int maximumNodeCount = (shardCount + numberOfAttributes - 1) / numberOfAttributes; // ceil(shardCount/numberOfAttributes)
if (currentNodeCount > maximumNodeCount) {
return allocation.decision(
Expand All @@ -238,4 +210,57 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout

return allocation.decision(Decision.YES, NAME, "node meets all awareness attribute requirements");
}

private int getCurrentNodeCountForAttribute(
ShardRouting shardRouting,
RoutingNode node,
RoutingAllocation allocation,
boolean moveToNode,
String awarenessAttribute
) {
// build the count of shards per attribute value
final String shardAttributeForNode = getAttributeValueForNode(node, awarenessAttribute);
int currentNodeCount = 0;
final List<ShardRouting> assignedShards = allocation.routingNodes().assignedShards(shardRouting.shardId());

for (ShardRouting assignedShard : assignedShards) {
if (assignedShard.started() || assignedShard.initializing()) {
// Note: this also counts relocation targets as that will be the new location of the shard.
// Relocation sources should not be counted as the shard is moving away
RoutingNode routingNode = allocation.routingNodes().node(assignedShard.currentNodeId());
// Increase node count when
if (getAttributeValueForNode(routingNode, awarenessAttribute).equals(shardAttributeForNode)) {
++currentNodeCount;
}
}
}

if (moveToNode) {
if (shardRouting.assignedToNode()) {
String nodeId = shardRouting.relocating() ? shardRouting.relocatingNodeId() : shardRouting.currentNodeId();
if (node.nodeId().equals(nodeId) == false) {
// we work on different nodes, move counts around
if (getAttributeValueForNode(allocation.routingNodes().node(nodeId), awarenessAttribute).equals(shardAttributeForNode)
&& currentNodeCount > 0) {
--currentNodeCount;
}

++currentNodeCount;
}
} else {
++currentNodeCount;
}
}

return currentNodeCount;
}

private boolean isAwarenessAttributeAssociatedWithNode(RoutingNode node, String awarenessAttribute) {
return node.node().getAttributes().containsKey(awarenessAttribute);
}

private String getAttributeValueForNode(final RoutingNode node, final String awarenessAttribute) {
return node.node().getAttributes().get(awarenessAttribute);
}

}

0 comments on commit 4c551ca

Please sign in to comment.