Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Arpit Bandejiya <[email protected]>
  • Loading branch information
Arpit-Bandejiya committed Mar 27, 2024
1 parent 60ccd7b commit f376d72
Show file tree
Hide file tree
Showing 8 changed files with 254 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import java.util.stream.Collectors;

import static org.opensearch.cluster.routing.ShardRoutingState.STARTED;
import static org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.PREFER_PRIMARY_SHARD_BALANCE;
import static org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.PREFER_PRIMARY_SHARD_REBALANCE;
import static org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.PRIMARY_SHARD_REBALANCE_BUFFER;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
Expand Down Expand Up @@ -58,6 +61,20 @@ public void enablePreferPrimaryBalance() {
);
}

public void setAllocationRelocationStrategy(boolean preferPrimaryBalance, boolean preferPrimaryRebalance, float buffer) {
assertAcked(
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(
Settings.builder()
.put(PREFER_PRIMARY_SHARD_BALANCE.getKey(), preferPrimaryBalance)
.put(PREFER_PRIMARY_SHARD_REBALANCE.getKey(), preferPrimaryRebalance)
.put(PRIMARY_SHARD_REBALANCE_BUFFER.getKey(), buffer)
)
);
}

/**
* This test verifies that the overall primary balance is attained during allocation. This test verifies primary
* balance per index and across all indices is maintained.
Expand Down Expand Up @@ -87,7 +104,7 @@ public void testGlobalPrimaryAllocation() throws Exception {
state = client().admin().cluster().prepareState().execute().actionGet().getState();
logger.info(ShardAllocations.printShardDistribution(state));
verifyPerIndexPrimaryBalance();
verifyPrimaryBalance();
verifyPrimaryBalance(0.0f);
}

/**
Expand Down Expand Up @@ -224,6 +241,70 @@ public void testAllocationWithDisruption() throws Exception {
verifyPerIndexPrimaryBalance();
}

/**
* Similar to testSingleIndexShardAllocation test but creates multiple indices, multiple nodes adding in and getting
* removed. The test asserts post each such event that primary shard distribution is balanced for each index as well as across the nodes
* when the PREFER_PRIMARY_SHARD_REBALANCE is set to true
*/
public void testAllocationAndRebalanceWithDisruption() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final int maxReplicaCount = 2;
final int maxShardCount = 2;
// Create higher number of nodes than number of shards to reduce chances of SameShardAllocationDecider kicking-in
// and preventing primary relocations
final int nodeCount = randomIntBetween(5, 10);
final int numberOfIndices = randomIntBetween(1, 10);
final float buffer = randomIntBetween(1, 4) * 0.10f;

logger.info("--> Creating {} nodes", nodeCount);
final List<String> nodeNames = new ArrayList<>();
for (int i = 0; i < nodeCount; i++) {
nodeNames.add(internalCluster().startNode());
}
setAllocationRelocationStrategy(true, true, buffer);

int shardCount, replicaCount;
ClusterState state;
for (int i = 0; i < numberOfIndices; i++) {
shardCount = randomIntBetween(1, maxShardCount);
replicaCount = randomIntBetween(1, maxReplicaCount);
logger.info("--> Creating index test{} with primary {} and replica {}", i, shardCount, replicaCount);
createIndex("test" + i, shardCount, replicaCount, i % 2 == 0);
ensureGreen(TimeValue.timeValueSeconds(60));
if (logger.isTraceEnabled()) {
state = client().admin().cluster().prepareState().execute().actionGet().getState();
logger.info(ShardAllocations.printShardDistribution(state));
}
}
state = client().admin().cluster().prepareState().execute().actionGet().getState();
logger.info(ShardAllocations.printShardDistribution(state));
verifyPerIndexPrimaryBalance();
verifyPrimaryBalance(buffer);

final int additionalNodeCount = randomIntBetween(1, 5);
logger.info("--> Adding {} nodes", additionalNodeCount);

internalCluster().startNodes(additionalNodeCount);
ensureGreen(TimeValue.timeValueSeconds(60));
state = client().admin().cluster().prepareState().execute().actionGet().getState();
logger.info(ShardAllocations.printShardDistribution(state));
verifyPerIndexPrimaryBalance();
verifyPrimaryBalance(buffer);

int nodeCountToStop = additionalNodeCount;
while (nodeCountToStop > 0) {
internalCluster().stopRandomDataNode();
// give replica a chance to promote as primary before terminating node containing the replica
ensureGreen(TimeValue.timeValueSeconds(60));
nodeCountToStop--;
}
state = client().admin().cluster().prepareState().execute().actionGet().getState();
logger.info("--> Cluster state post nodes stop {}", state);
logger.info(ShardAllocations.printShardDistribution(state));
verifyPerIndexPrimaryBalance();
verifyPrimaryBalance(buffer);
}

/**
* Utility method which ensures cluster has balanced primary shard distribution across a single index.
* @throws Exception exception
Expand Down Expand Up @@ -263,7 +344,7 @@ private void verifyPerIndexPrimaryBalance() throws Exception {
}, 60, TimeUnit.SECONDS);
}

private void verifyPrimaryBalance() throws Exception {
private void verifyPrimaryBalance(float buffer) throws Exception {
assertBusy(() -> {
final ClusterState currentState = client().admin().cluster().prepareState().execute().actionGet().getState();
RoutingNodes nodes = currentState.getRoutingNodes();
Expand All @@ -278,7 +359,7 @@ private void verifyPrimaryBalance() throws Exception {
.filter(ShardRouting::primary)
.collect(Collectors.toList())
.size();
assertTrue(primaryCount <= avgPrimaryShardsPerNode);
assertTrue(primaryCount <= (avgPrimaryShardsPerNode * (1 + buffer)));
}
}, 60, TimeUnit.SECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,11 @@
public class AllocationConstraints {
private Map<String, Constraint> constraints;

public AllocationConstraints(AllocationParameter allocationParameter) {
public AllocationConstraints() {
this.constraints = new HashMap<>();
this.constraints.putIfAbsent(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, new Constraint(isIndexShardsPerNodeBreached()));
this.constraints.putIfAbsent(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPerIndexPrimaryShardsPerNodeBreached()));
this.constraints.putIfAbsent(
CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID,
new Constraint(isPrimaryShardsPerNodeBreached(allocationParameter.getPreferPrimaryBalanceBuffer()))
);
this.constraints.putIfAbsent(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPrimaryShardsPerNodeBreached(0.0f)));
}

public void updateAllocationConstraint(String constraint, boolean enable) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public static Predicate<Constraint.ConstraintParams> isPrimaryShardsPerNodeBreac
return (params) -> {
int primaryShardCount = params.getNode().numPrimaryShards();
int allowedPrimaryShardCount = (int) Math.ceil(params.getBalancer().avgPrimaryShardsPerNode() * (1 + buffer));
return primaryShardCount > allowedPrimaryShardCount;
return primaryShardCount >= allowedPrimaryShardCount;
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.opensearch.cluster.routing.UnassignedInfo.AllocationStatus;
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.AllocationConstraints;
import org.opensearch.cluster.routing.allocation.AllocationParameter;
import org.opensearch.cluster.routing.allocation.ConstraintTypes;
import org.opensearch.cluster.routing.allocation.MoveDecision;
import org.opensearch.cluster.routing.allocation.RebalanceConstraints;
Expand Down Expand Up @@ -439,9 +438,8 @@ static class WeightFunction {
theta1 = indexBalance / sum;
this.indexBalance = indexBalance;
this.shardBalance = shardBalance;
AllocationParameter allocationParameter = new AllocationParameter(0.0f);
RebalanceParameter rebalanceParameter = new RebalanceParameter(preferPrimaryBalanceBuffer);
this.constraints = new AllocationConstraints(allocationParameter);
this.constraints = new AllocationConstraints();
this.rebalanceConstraints = new RebalanceConstraints(rebalanceParameter);
// Enable index shard per node breach constraint
updateAllocationConstraint(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.opensearch.common.Randomness;
import org.opensearch.common.collect.Tuple;
import org.opensearch.gateway.PriorityComparator;

Expand Down Expand Up @@ -71,7 +70,7 @@ public class LocalShardsBalancer extends ShardsBalancer {
private final float avgPrimaryShardsPerNode;
private final BalancedShardsAllocator.NodeSorter sorter;
private final Set<RoutingNode> inEligibleTargetNode;
private final boolean preferRandomShardAllocation;
private final boolean allowRandomAllocation;

public LocalShardsBalancer(
Logger logger,
Expand All @@ -81,7 +80,7 @@ public LocalShardsBalancer(
float threshold,
boolean preferPrimaryBalance,
boolean preferPrimaryRebalance,
boolean preferRandomShardAllocation
boolean allowRandomAllocation
) {
this.logger = logger;
this.allocation = allocation;
Expand All @@ -98,7 +97,7 @@ public LocalShardsBalancer(
this.preferPrimaryBalance = preferPrimaryBalance;
this.preferPrimaryRebalance = preferPrimaryRebalance;
this.shardMovementStrategy = shardMovementStrategy;
this.preferRandomShardAllocation = preferRandomShardAllocation;
this.allowRandomAllocation = allowRandomAllocation;
}

/**
Expand Down Expand Up @@ -872,60 +871,7 @@ void allocateUnassigned() {
// clear everything we have either added it or moved to ignoreUnassigned
}

private class MinWeightedNodeDecision {
public float minWeight;
public Decision decision;
public List<BalancedShardsAllocator.ModelNode> nodes;

public MinWeightedNodeDecision(List<BalancedShardsAllocator.ModelNode> nodes, float minWeight, Decision decision) {
this.minWeight = minWeight;
this.decision = decision;
this.nodes = nodes;
}

public void updateMinNode(BalancedShardsAllocator.ModelNode node, boolean allowRandomAllocation) {
if (allowRandomAllocation) {
nodes.add(node);
} else {
if (nodes.isEmpty()) {
nodes.add(node);
} else {
nodes.set(0, node);
}
}
}

public void updateMinNode(BalancedShardsAllocator.ModelNode node, float weight, Decision decision, boolean allowRandomAllocation) {
if (allowRandomAllocation) {
nodes.add(node);
} else {
if (nodes.isEmpty()) {
nodes.add(node);
} else {
nodes.set(0, node);
}
}
minWeight = weight;
this.decision = decision;
}

public void clearAndUpdateMinNode(BalancedShardsAllocator.ModelNode node, float weight, Decision decision) {
nodes.clear();
nodes.add(node);
minWeight = weight;
this.decision = decision;
}

public BalancedShardsAllocator.ModelNode getMinNode(boolean allowRandomAllocation) {
if (allowRandomAllocation) {
return nodes.isEmpty() ? null : nodes.get(Randomness.get().nextInt(nodes.size()));
} else {
return nodes.isEmpty() ? null : nodes.get(0);
}
}
}

MinWeightedNodeDecision updateEligibleNodes(
MinWeightedNodeDecision findMinWeightNode(
MinWeightedNodeDecision minWeightedNodeDecision,
Decision currentDecision,
float currentWeight,
Expand All @@ -946,8 +892,8 @@ MinWeightedNodeDecision updateEligibleNodes(
* 2.b.1 Update the list if new decision is YES
* 2.b.2 Append to list if new decision is THROTTLE
*/
if (currentWeight == minWeightedNodeDecision.minWeight) {
if (currentDecision.type() == minWeightedNodeDecision.decision.type()) {
if (currentWeight == minWeightedNodeDecision.getMinWeight()) {
if (currentDecision.type() == minWeightedNodeDecision.getDecision().type()) {
minWeightedNodeDecision.updateMinNode(node, allowRandomAllocation);
} else {
if (currentDecision.type() == Decision.Type.YES) {
Expand All @@ -956,17 +902,15 @@ MinWeightedNodeDecision updateEligibleNodes(
}
}
} else {
if (currentWeight < minWeightedNodeDecision.minWeight) {
if (currentWeight < minWeightedNodeDecision.getMinWeight()) {
minWeightedNodeDecision.clearAndUpdateMinNode(node, currentWeight, currentDecision);
}
}
} else {
final boolean updateMinNode;
BalancedShardsAllocator.ModelNode minNode = null;
if (!minWeightedNodeDecision.nodes.isEmpty()) {
minNode = minWeightedNodeDecision.nodes.get(0);
}
if (currentWeight == minWeightedNodeDecision.minWeight) {
minNode = minWeightedNodeDecision.getMinNode(allowRandomAllocation);
if (currentWeight == minWeightedNodeDecision.getMinWeight()) {
/* we have an equal weight tie breaking:
* 1. if one decision is YES prefer it
* 2. prefer the node that holds the primary for this index with the next id in the ring ie.
Expand All @@ -978,7 +922,7 @@ MinWeightedNodeDecision updateEligibleNodes(
* than the id of the shard we need to assign. This works find when new indices are created since
* primaries are added first and we only add one shard set a time in this algorithm.
*/
if (currentDecision.type() == minWeightedNodeDecision.decision.type()) {
if (currentDecision.type() == minWeightedNodeDecision.getDecision().type()) {
final int repId = shard.id();
final int nodeHigh = node.highestPrimary(shard.index().getName());
final int minNodeHigh = minNode.highestPrimary(shard.getIndexName());
Expand All @@ -988,7 +932,7 @@ MinWeightedNodeDecision updateEligibleNodes(
updateMinNode = currentDecision.type() == Decision.Type.YES;
}
} else {
updateMinNode = currentWeight < minWeightedNodeDecision.minWeight;
updateMinNode = currentWeight < minWeightedNodeDecision.getMinWeight();
}
if (updateMinNode) {
minWeightedNodeDecision.updateMinNode(node, currentWeight, currentDecision, allowRandomAllocation);
Expand Down Expand Up @@ -1036,7 +980,7 @@ AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting shard) {
// weight of this index currently on the node
float currentWeight = weight.weightWithAllocationConstraints(this, node, shard.getIndexName());
// moving the shard would not improve the balance, and we are not in explain mode, so short circuit
if (currentWeight > minWeightedNodeDecision.minWeight && explain == false) {
if (currentWeight > minWeightedNodeDecision.getMinWeight() && explain == false) {
continue;
}

Expand All @@ -1045,19 +989,19 @@ AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting shard) {
nodeExplanationMap.put(node.getNodeId(), new NodeAllocationResult(node.getRoutingNode().node(), currentDecision, 0));
nodeWeights.add(Tuple.tuple(node.getNodeId(), currentWeight));
}
minWeightedNodeDecision = updateEligibleNodes(
minWeightedNodeDecision = findMinWeightNode(
minWeightedNodeDecision,
currentDecision,
currentWeight,
preferRandomShardAllocation,
allowRandomAllocation,
shard,
node
);
}

if (minWeightedNodeDecision.decision == null) {
if (minWeightedNodeDecision.getDecision() == null) {
// decision was not set and a node was not assigned, so treat it as a NO decision
minWeightedNodeDecision.decision = Decision.NO;
minWeightedNodeDecision.setDecision(Decision.NO);
}
List<NodeAllocationResult> nodeDecisions = null;
if (explain) {
Expand All @@ -1071,9 +1015,9 @@ AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting shard) {
}
}

BalancedShardsAllocator.ModelNode minNode = minWeightedNodeDecision.getMinNode(preferRandomShardAllocation);
BalancedShardsAllocator.ModelNode minNode = minWeightedNodeDecision.getMinNode(allowRandomAllocation);
return AllocateUnassignedDecision.fromDecision(
minWeightedNodeDecision.decision,
minWeightedNodeDecision.getDecision(),
minNode != null ? minNode.getRoutingNode().node() : null,
nodeDecisions
);
Expand Down
Loading

0 comments on commit f376d72

Please sign in to comment.