From c7ed191b7e2c85f9e32203bc3854e5c60ba64c66 Mon Sep 17 00:00:00 2001 From: Arpit Bandejiya Date: Mon, 25 Mar 2024 00:08:28 +0530 Subject: [PATCH] refactor code Signed-off-by: Arpit Bandejiya --- .../allocation/AllocationConstraints.java | 4 +- .../allocation/AllocationParameter.java | 24 ----------- .../routing/allocation/ConstraintTypes.java | 2 +- .../allocator/BalancedShardsAllocator.java | 4 +- .../allocation/BalanceConfigurationTests.java | 41 ++++++++++++++----- 5 files changed, 34 insertions(+), 41 deletions(-) delete mode 100644 server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationParameter.java diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java index a7732cde0d7f0..1090ff87359d5 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java @@ -28,13 +28,13 @@ public class AllocationConstraints { private Map constraints; - public AllocationConstraints(AllocationParameter allocationParameter) { + public AllocationConstraints() { this.constraints = new HashMap<>(); this.constraints.putIfAbsent(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, new Constraint(isIndexShardsPerNodeBreached())); this.constraints.putIfAbsent(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPerIndexPrimaryShardsPerNodeBreached())); this.constraints.putIfAbsent( CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, - new Constraint(isPrimaryShardsPerNodeBreached(allocationParameter.getPreferPrimaryBalanceBuffer())) + new Constraint(isPrimaryShardsPerNodeBreached(0.0f)) ); } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationParameter.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationParameter.java deleted file mode 100644 index 2444f6405278e..0000000000000 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationParameter.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.cluster.routing.allocation; - -/** - * RebalanceConstraint Params - */ -public class AllocationParameter { - private final float preferPrimaryBalanceBuffer; - - public AllocationParameter(float preferPrimaryBalanceBuffer) { - this.preferPrimaryBalanceBuffer = preferPrimaryBalanceBuffer; - } - - public float getPreferPrimaryBalanceBuffer() { - return preferPrimaryBalanceBuffer; - } -} diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java index fa2bff875a6d5..08fe8f92d1f80 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java @@ -83,7 +83,7 @@ public static Predicate isPrimaryShardsPerNodeBreac return (params) -> { int primaryShardCount = params.getNode().numPrimaryShards(); int allowedPrimaryShardCount = (int) Math.ceil(params.getBalancer().avgPrimaryShardsPerNode() * (1 + buffer)); - return primaryShardCount > allowedPrimaryShardCount; + return primaryShardCount >= allowedPrimaryShardCount; }; } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index ef971d9cd5650..19b5347df911a 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -43,7 +43,6 @@ import org.opensearch.cluster.routing.UnassignedInfo.AllocationStatus; import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.opensearch.cluster.routing.allocation.AllocationConstraints; -import org.opensearch.cluster.routing.allocation.AllocationParameter; import org.opensearch.cluster.routing.allocation.ConstraintTypes; import org.opensearch.cluster.routing.allocation.MoveDecision; import org.opensearch.cluster.routing.allocation.RebalanceConstraints; @@ -439,9 +438,8 @@ static class WeightFunction { theta1 = indexBalance / sum; this.indexBalance = indexBalance; this.shardBalance = shardBalance; - AllocationParameter allocationParameter = new AllocationParameter(0.0f); RebalanceParameter rebalanceParameter = new RebalanceParameter(preferPrimaryBalanceBuffer); - this.constraints = new AllocationConstraints(allocationParameter); + this.constraints = new AllocationConstraints(); this.rebalanceConstraints = new RebalanceConstraints(rebalanceParameter); // Enable index shard per node breach constraint updateAllocationConstraint(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, true); diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java index 90a978646430e..05f9650b0a4da 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java @@ -260,7 +260,7 @@ public void testPrimaryBalanceWithPreferPrimaryReBalanceSetting() { final int numberOfShards = 4; final int numberOfReplicas = 1; final int numberOfRuns = 5; - final float buffer = 0.05f; + final float buffer = 0.10f; int balanceFailed = 0; AllocationService strategy = createAllocationService(getSettingsBuilderForPrimaryReBalance().build(), new TestGatewayAllocator()); @@ -418,18 +418,15 @@ public void testGlobalPrimaryBalance() throws Exception { * @throws Exception generic exception */ public void testGlobalPrimaryBalanceWithNodeDrops() throws Exception { - final float buffer = 0.05f; + final float buffer = 0.10f; AllocationService strategy = createAllocationService(getSettingsBuilderForPrimaryReBalance().build(), new TestGatewayAllocator()); ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).build(); - clusterState = addNode(clusterState, strategy); - clusterState = addNode(clusterState, strategy); - clusterState = addNode(clusterState, strategy); - clusterState = addNode(clusterState, strategy); - clusterState = addNode(clusterState, strategy); + clusterState = addNodes(clusterState, strategy, 5); - clusterState = addIndex(clusterState, strategy, "test-index1", 5, 1); - clusterState = addIndex(clusterState, strategy, "test-index2", 5, 1); - clusterState = addIndex(clusterState, strategy, "test-index3", 5, 1); + clusterState = addIndices(clusterState, strategy, 5, 1, 8); + + logger.info(ShardAllocations.printShardDistribution(clusterState)); + verifyPrimaryBalance(clusterState, buffer); clusterState = removeOneNode(clusterState, strategy); @@ -610,7 +607,7 @@ private void verifyPrimaryBalance(ClusterState clusterState, float buffer) throw .filter(ShardRouting::primary) .collect(Collectors.toList()) .size(); - assertTrue(primaryCount < (avgPrimaryShardsPerNode * (1 + buffer))); + assertTrue(primaryCount <= (avgPrimaryShardsPerNode * (1 + buffer))); } }, 60, TimeUnit.SECONDS); } @@ -712,6 +709,28 @@ private ClusterState addIndex( return applyAllocationUntilNoChange(clusterState, strategy); } + private ClusterState addIndices(ClusterState clusterState, AllocationService strategy, int numberOfShards, int numberOfReplicas, int numberOfIndices) { + Metadata.Builder metadataBuilder = Metadata.builder(clusterState.getMetadata()); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(clusterState.routingTable()); + + for (int i = 0; i < numberOfIndices; i++) { + IndexMetadata.Builder index = IndexMetadata.builder("test" + i) + .settings(settings(Version.CURRENT)) + .numberOfShards(numberOfShards) + .numberOfReplicas(numberOfReplicas); + + metadataBuilder = metadataBuilder.put(index); + routingTableBuilder.addAsNew(index.build()); + } + + clusterState = ClusterState.builder(clusterState) + .metadata(metadataBuilder.build()) + .routingTable(routingTableBuilder.build()) + .build(); + clusterState = strategy.reroute(clusterState, "indices-created"); + return applyAllocationUntilNoChange(clusterState, strategy); + } + private ClusterState initCluster( AllocationService strategy, int numberOfIndices,