diff --git a/server/src/main/java/org/opensearch/cluster/ClusterModule.java b/server/src/main/java/org/opensearch/cluster/ClusterModule.java index 173bb32cbd8f7..c85691b80d7c3 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterModule.java @@ -63,6 +63,7 @@ import org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.FilterAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider; +import org.opensearch.cluster.routing.allocation.decider.NodeLoadAwareAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.RebalanceOnlyWhenActiveAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider; @@ -332,6 +333,7 @@ public static Collection createAllocationDeciders( addAllocationDecider(deciders, new ThrottlingAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new ShardsLimitAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new AwarenessAllocationDecider(settings, clusterSettings)); + addAllocationDecider(deciders, new NodeLoadAwareAllocationDecider(settings, clusterSettings)); clusterPlugins.stream() .flatMap(p -> p.createAllocationDeciders(settings, clusterSettings).stream()) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/NodeLoadAwareAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/NodeLoadAwareAllocationDecider.java new file mode 100644 index 0000000000000..642a16a547519 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/NodeLoadAwareAllocationDecider.java @@ -0,0 +1,173 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation.decider; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.RoutingAllocation; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.settings.Setting.Property; + +import java.util.function.BiPredicate; + +/** + * This {@link NodeLoadAwareAllocationDecider} controls shard over-allocation + * due to node failures or otherwise on the surviving nodes. The allocation limits + * are decided by the user provisioned capacity, to determine if there were lost nodes. + * The provisioned capacity as defined by the below settings needs to be updated on every + * cluster scale up and scale down operations. + *
+ * cluster.routing.allocation.overload_awareness.provisioned_capacity: N
+ * 
+ *

+ * and prevent allocation on the surviving nodes of the under capacity cluster + * based on overload factor defined as a percentage by + *

+ * cluster.routing.allocation.load_awareness.skew_factor: X
+ * 
+ * The total limit per node based on skew_factor doesn't limit primaries that previously + * existed on the disk as those shards are force allocated by + * {@link AllocationDeciders#canForceAllocatePrimary(ShardRouting, RoutingNode, RoutingAllocation)} + * however new primaries due to index creation, snapshot restore etc can be controlled via the below settings. + * Setting the value to true allows newly created primaries to get assigned while preventing the replica allocation + * breaching the skew factor. + * Note that setting this to false can result in the primaries not get assigned and the cluster turning RED + *
+ * cluster.routing.allocation.load_awareness.allow_unassigned_primaries
+ * 
+ */ +public class NodeLoadAwareAllocationDecider extends AllocationDecider { + + public static final String NAME = "load_awareness"; + + public static final Setting CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING = Setting.intSetting( + "cluster.routing.allocation.load_awareness.provisioned_capacity", + -1, + -1, + Property.Dynamic, + Property.NodeScope + ); + public static final Setting CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING = Setting.doubleSetting( + "cluster.routing.allocation.load_awareness.skew_factor", + 50, + -1, + Property.Dynamic, + Property.NodeScope + ); + public static final Setting CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING = Setting.boolSetting( + "cluster.routing.allocation.load_awareness.allow_unassigned_primaries", + true, + Setting.Property.Dynamic, + Property.NodeScope + ); + + private volatile int provisionedCapacity; + + private volatile double skewFactor; + + private volatile boolean allowUnassignedPrimaries; + + private static final Logger logger = LogManager.getLogger(NodeLoadAwareAllocationDecider.class); + + public NodeLoadAwareAllocationDecider(Settings settings, ClusterSettings clusterSettings) { + this.skewFactor = CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.get(settings); + this.provisionedCapacity = CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.get(settings); + this.allowUnassignedPrimaries = CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING.get(settings); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING, this::setSkewFactor); + clusterSettings.addSettingsUpdateConsumer( + CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING, + this::setProvisionedCapacity + ); + clusterSettings.addSettingsUpdateConsumer( + CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING, + this::setAllowUnassignedPrimaries + ); + } + + private void setAllowUnassignedPrimaries(boolean allowUnassignedPrimaries) { + this.allowUnassignedPrimaries = allowUnassignedPrimaries; + } + + private void setSkewFactor(double skewFactor) { + this.skewFactor = skewFactor; + } + + private void setProvisionedCapacity(int provisionedCapacity) { + this.provisionedCapacity = provisionedCapacity; + } + + @Override + public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + return underCapacity(shardRouting, node, allocation, (count, limit) -> count >= limit); + } + + @Override + public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + return underCapacity(shardRouting, node, allocation, (count, limit) -> count > limit); + } + + private Decision underCapacity( + ShardRouting shardRouting, + RoutingNode node, + RoutingAllocation allocation, + BiPredicate decider + ) { + if (provisionedCapacity <= 0 || skewFactor < 0) { + return allocation.decision( + Decision.YES, + NAME, + "overload awareness allocation is not enabled, set cluster setting [%s] and cluster setting [%s] to enable it", + CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), + CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey() + ); + } + if (shardRouting.unassigned() && shardRouting.primary() && allowUnassignedPrimaries) { + return allocation.decision( + Decision.YES, + NAME, + "overload allocation awareness is allowed for unassigned primaries, set cluster setting [%s] to disable it", + CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING.getKey() + ); + } + Metadata metadata = allocation.metadata(); + float expectedAvgShardsPerNode = (float) metadata.getTotalNumberOfShards() / provisionedCapacity; + int nodeShardCount = node.numberOfOwningShards(); + int limit = (int) Math.ceil(expectedAvgShardsPerNode * (1 + skewFactor / 100.0)); + if (decider.test(nodeShardCount, limit)) { + logger.debug( + () -> new ParameterizedMessage( + "Too many shards [{}] allocated to this node [{}]. Expected average shards" + + " per node [{}], overload factor [{}], node limit [{}]", + nodeShardCount, + node.nodeId(), + expectedAvgShardsPerNode, + skewFactor, + limit + ) + ); + return allocation.decision( + Decision.NO, + NAME, + "too many shards [%d] allocated to this node, limit per node [%d] considering" + + " overload factor [%.2f] based on capacity [%d]", + nodeShardCount, + limit, + skewFactor, + provisionedCapacity + ); + } + return allocation.decision(Decision.YES, NAME, "node meets all skew awareness attribute requirements"); + } +} diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 2207f08aba259..58f2375449b9c 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -78,6 +78,7 @@ import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider; import org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.FilterAllocationDecider; +import org.opensearch.cluster.routing.allocation.decider.NodeLoadAwareAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.SameShardAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; @@ -597,7 +598,10 @@ public void apply(Settings value, Settings current, Settings previous) { ShardIndexingPressureMemoryManager.NODE_SOFT_LIMIT, ShardIndexingPressureMemoryManager.THROUGHPUT_DEGRADATION_LIMITS, ShardIndexingPressureMemoryManager.SUCCESSFUL_REQUEST_ELAPSED_TIMEOUT, - ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS + ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING ) ) ); diff --git a/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java b/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java index b1a4bee7a42a9..8869c2857aa4f 100644 --- a/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java +++ b/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java @@ -48,6 +48,7 @@ import org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.FilterAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider; +import org.opensearch.cluster.routing.allocation.decider.NodeLoadAwareAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.RebalanceOnlyWhenActiveAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider; @@ -236,7 +237,8 @@ public void testAllocationDeciderOrder() { DiskThresholdDecider.class, ThrottlingAllocationDecider.class, ShardsLimitAllocationDecider.class, - AwarenessAllocationDecider.class + AwarenessAllocationDecider.class, + NodeLoadAwareAllocationDecider.class ); Collection deciders = ClusterModule.createAllocationDeciders( Settings.EMPTY, diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/NodeLoadAwareAllocationTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/NodeLoadAwareAllocationTests.java new file mode 100644 index 0000000000000..d2e7e0e7e636a --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/NodeLoadAwareAllocationTests.java @@ -0,0 +1,1112 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.Version; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.OpenSearchAllocationTestCase; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.UnassignedInfo; +import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; +import org.opensearch.cluster.routing.allocation.decider.NodeLoadAwareAllocationDecider; +import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; +import org.opensearch.common.settings.Settings; +import org.opensearch.gateway.GatewayAllocator; +import org.opensearch.test.gateway.TestGatewayAllocator; + +import java.util.Map; + +import static java.util.Collections.singletonMap; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.sameInstance; +import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING; +import static org.opensearch.cluster.routing.ShardRoutingState.STARTED; +import static org.opensearch.cluster.routing.ShardRoutingState.UNASSIGNED; + +public class NodeLoadAwareAllocationTests extends OpenSearchAllocationTestCase { + + private final Logger logger = LogManager.getLogger(NodeLoadAwareAllocationTests.class); + + public void testNewUnassignedPrimaryAllocationOnOverload() { + AllocationService strategy = createAllocationServiceWithAdditionalSettings( + org.opensearch.common.collect.Map.of( + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(), + 5, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), + 20, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING.getKey(), + true + ) + ); + + logger.info("Building initial routing table for 'testNewUnassignedPrimaryAllocationOnOverload'"); + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(20).numberOfReplicas(0)) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(initialRoutingTable) + .build(); + + logger.info("--> adding five nodes on same zone and do rerouting"); + clusterState = addNodes(clusterState, strategy, "zone_1", "node1", "node2", "node3", "node4", "node5"); + + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(20)); + + logger.info("--> start the shards (primaries)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(20)); + + logger.info("--> Remove nodes from zone holding primaries"); + ClusterState newState = removeNodes(clusterState, strategy, "node1", "node2", "node3"); + + logger.info("add another index with 20 shards"); + metadata = Metadata.builder(newState.metadata()) + .put( + IndexMetadata.builder("test1") + .settings( + settings(Version.CURRENT).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 20) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ) + ) + .build(); + RoutingTable updatedRoutingTable = RoutingTable.builder(newState.routingTable()).addAsNew(metadata.index("test1")).build(); + + newState = ClusterState.builder(newState).metadata(metadata).routingTable(updatedRoutingTable).build(); + newState = strategy.reroute(newState, "reroute"); + + newState = startInitializingShardsAndReroute(strategy, newState); + + logger.info("no limits should be applied on newly created primaries"); + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(28)); + assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(12)); + for (ShardRouting shard : newState.getRoutingNodes().shardsWithState(UNASSIGNED)) { + assertEquals(shard.unassignedInfo().getReason(), UnassignedInfo.Reason.NODE_LEFT); + } + + newState = ClusterState.builder(newState) + .nodes(DiscoveryNodes.builder(newState.nodes()).add(newNode("node1", singletonMap("zone", "zone_1")))) + .build(); + + // 4 existing shards from this node's local store get started + newState = strategy.reroute(newState, "reroute"); + newState = startInitializingShardsAndReroute(strategy, newState); + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(32)); + + // add back node2 when skewness is still breached + newState = ClusterState.builder(newState) + .nodes(DiscoveryNodes.builder(newState.nodes()).add(newNode("node2", singletonMap("zone", "zone_1")))) + .build(); + newState = strategy.reroute(newState, "reroute"); + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(36)); + + // add back node3 + newState = ClusterState.builder(newState) + .nodes(DiscoveryNodes.builder(newState.nodes()).add(newNode("node3", singletonMap("zone", "zone_1")))) + .build(); + newState = strategy.reroute(newState, "reroute"); + + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(40)); + } + + public void testNoAllocationLimitsOnOverloadForDisabledLoadFactor() { + AllocationService strategy = createAllocationServiceWithAdditionalSettings( + org.opensearch.common.collect.Map.of( + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(), + 5, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), + -1, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING.getKey(), + false + ) + ); + + logger.info("Building initial routing table for 'testNoAllocationLimitsOnOverloadForDisabledLoadFactor'"); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(20).numberOfReplicas(0)) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(initialRoutingTable) + .build(); + + logger.info("--> adding five nodes on same zone and do rerouting"); + clusterState = addNodes(clusterState, strategy, "zone_1", "node1", "node2", "node3", "node4", "node5"); + + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(20)); + + logger.info("--> start the shards (primaries)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(20)); + + logger.info("--> Remove nodes from zone holding primaries"); + ClusterState newState = removeNodes(clusterState, strategy, "node1", "node2", "node3"); + + logger.info("add another index with 20 shards"); + metadata = Metadata.builder(newState.metadata()) + .put( + IndexMetadata.builder("test1") + .settings( + settings(Version.CURRENT).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 20) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ) + ) + .build(); + RoutingTable updatedRoutingTable = RoutingTable.builder(newState.routingTable()).addAsNew(metadata.index("test1")).build(); + + newState = ClusterState.builder(newState).metadata(metadata).routingTable(updatedRoutingTable).build(); + newState = strategy.reroute(newState, "reroute"); + + newState = startInitializingShardsAndReroute(strategy, newState); + + logger.info("no limits should be applied on newly created primaries"); + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(28)); + assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(12)); + for (ShardRouting shard : newState.getRoutingNodes().shardsWithState(UNASSIGNED)) { + assertEquals(shard.unassignedInfo().getReason(), UnassignedInfo.Reason.NODE_LEFT); + } + + newState = ClusterState.builder(newState) + .nodes(DiscoveryNodes.builder(newState.nodes()).add(newNode("node1", singletonMap("zone", "zone_1")))) + .build(); + + // 4 existing shards from this node's local store get started + newState = strategy.reroute(newState, "reroute"); + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(32)); + + // add back node2 when skewness is still breached + newState = ClusterState.builder(newState) + .nodes(DiscoveryNodes.builder(newState.nodes()).add(newNode("node2", singletonMap("zone", "zone_1")))) + .build(); + newState = strategy.reroute(newState, "reroute"); + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(36)); + + // add back node3 + newState = ClusterState.builder(newState) + .nodes(DiscoveryNodes.builder(newState.nodes()).add(newNode("node3", singletonMap("zone", "zone_1")))) + .build(); + newState = strategy.reroute(newState, "reroute"); + + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(40)); + } + + public void testExistingPrimariesAllocationOnOverload() { + GatewayAllocator gatewayAllocator = new TestGatewayAllocator(); + AllocationService strategy = createAllocationServiceWithAdditionalSettings( + org.opensearch.common.collect.Map.of( + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(), + 5, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), + 50, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING.getKey(), + false + ), + gatewayAllocator + ); + + logger.info("Building initial routing table for 'testExistingPrimariesAllocationOnOverload'"); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(20).numberOfReplicas(0)) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(initialRoutingTable) + .build(); + + logger.info("--> adding five nodes on same zone and do rerouting"); + clusterState = addNodes(clusterState, strategy, "zone_1", "node1", "node2", "node3", "node4", "node5"); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(20)); + + logger.info("--> start the shards (primaries)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(20)); + + logger.info("--> Remove nodes from zone holding primaries"); + ClusterState newState = removeNodes(clusterState, strategy, "node1", "node2", "node3"); + + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(8)); + + logger.info("add another index with 20 shards"); + metadata = Metadata.builder(newState.metadata()) + .put( + IndexMetadata.builder("test1") + .settings( + settings(Version.CURRENT).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 20) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ) + ) + .build(); + RoutingTable updatedRoutingTable = RoutingTable.builder(newState.routingTable()).addAsNew(metadata.index("test1")).build(); + + newState = ClusterState.builder(newState).metadata(metadata).routingTable(updatedRoutingTable).build(); + newState = strategy.reroute(newState, "reroute"); + + newState = startInitializingShardsAndReroute(strategy, newState); + + logger.info("limits should be applied on newly create primaries"); + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(24)); + assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(16)); + + assertEquals( + 12L, + newState.getRoutingNodes() + .shardsWithState(UNASSIGNED) + .stream() + .filter(r -> r.unassignedInfo().getReason() == UnassignedInfo.Reason.NODE_LEFT) + .count() + ); + + assertEquals( + 4L, + newState.getRoutingNodes() + .shardsWithState(UNASSIGNED) + .stream() + .filter(r -> r.unassignedInfo().getReason() == UnassignedInfo.Reason.INDEX_CREATED) + .count() + ); + + assertThat(newState.getRoutingNodes().node("node4").size(), equalTo(12)); + + logger.info("--> Remove node4 from zone holding primaries"); + newState = removeNodes(newState, strategy, "node4"); + + logger.info("--> change the overload load factor to zero and verify if unassigned primaries on disk get assigned despite overload"); + strategy = createAllocationServiceWithAdditionalSettings( + org.opensearch.common.collect.Map.of( + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(), + 5, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), + 0, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING.getKey(), + false + ), + gatewayAllocator + ); + + newState = strategy.reroute(newState, "reroute"); + + logger.info("--> Add back node4 and ensure existing primaries are assigned"); + newState = ClusterState.builder(newState) + .nodes(DiscoveryNodes.builder(newState.nodes()).add(newNode("node4", singletonMap("zone", "zone_1")))) + .build(); + + newState = strategy.reroute(newState, "reroute"); + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + + logger.info("--> do another reroute, make sure nothing moves"); + assertThat(strategy.reroute(newState, "reroute").routingTable(), sameInstance(newState.routingTable())); + + assertThat(newState.getRoutingNodes().node("node4").size(), equalTo(12)); + assertThat(newState.getRoutingNodes().node("node5").size(), equalTo(12)); + + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(24)); + + newState = ClusterState.builder(newState) + .nodes(DiscoveryNodes.builder(newState.nodes()).add(newNode("node1", singletonMap("zone", "zone_1")))) + .build(); + + newState = strategy.reroute(newState, "reroute"); + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(32)); + + // add back node2 when skewness is still breached + newState = ClusterState.builder(newState) + .nodes(DiscoveryNodes.builder(newState.nodes()).add(newNode("node2", singletonMap("zone", "zone_1")))) + .build(); + newState = strategy.reroute(newState, "reroute"); + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(36)); + + // add back node3 + newState = ClusterState.builder(newState) + .nodes(DiscoveryNodes.builder(newState.nodes()).add(newNode("node3", singletonMap("zone", "zone_1")))) + .build(); + newState = strategy.reroute(newState, "reroute"); + + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(40)); + } + + public void testSingleZoneOneReplicaLimitsShardAllocationOnOverload() { + GatewayAllocator gatewayAllocator = new TestGatewayAllocator(); + AllocationService strategy = createAllocationServiceWithAdditionalSettings( + org.opensearch.common.collect.Map.of( + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(), + 5, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), + 20, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING.getKey(), + true + ), + gatewayAllocator + ); + + logger.info("Building initial routing table for 'testSingleZoneOneReplicaLimitsShardAllocationOnOverload'"); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(20).numberOfReplicas(1)) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(initialRoutingTable) + .build(); + + logger.info("--> adding five nodes on same zone and do rerouting"); + clusterState = addNodes(clusterState, strategy, "zone_1", "node1", "node2", "node3", "node4", "node5"); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(20)); + + logger.info("--> start the shards (primaries)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(20)); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(20)); + + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(40)); + + logger.info("--> Remove node1 from zone"); + ClusterState newState = removeNodes(clusterState, strategy, "node1"); + + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(40)); + + for (RoutingNode node : newState.getRoutingNodes()) { + assertThat(node.size(), equalTo(10)); + } + + logger.info("--> Remove node2 when the limit of overload is reached"); + newState = removeNodes(newState, strategy, "node2"); + newState = strategy.reroute(newState, "reroute"); + + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(30)); + + for (ShardRouting shard : newState.getRoutingNodes().shardsWithState(UNASSIGNED)) { + assertEquals(shard.unassignedInfo().getReason(), UnassignedInfo.Reason.NODE_LEFT); + assertFalse(shard.primary()); + } + + logger.info("add another index with 20 shards"); + metadata = Metadata.builder(newState.metadata()) + .put( + IndexMetadata.builder("test1") + .settings( + settings(Version.CURRENT).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 20) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + ) + ) + .build(); + RoutingTable updatedRoutingTable = RoutingTable.builder(newState.routingTable()).addAsNew(metadata.index("test1")).build(); + // increases avg shard per node to 80/5 = 16, overload factor 1.2, total allowed 20 + newState = ClusterState.builder(newState).metadata(metadata).routingTable(updatedRoutingTable).build(); + newState = strategy.reroute(newState, "reroute"); + + newState = startInitializingShardsAndReroute(strategy, newState); + + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(60)); + assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(20)); + + logger.info("add another index with 60 shards"); + metadata = Metadata.builder(newState.metadata()) + .put( + IndexMetadata.builder("test2") + .settings( + settings(Version.CURRENT).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 60) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ) + ) + .build(); + updatedRoutingTable = RoutingTable.builder(newState.routingTable()).addAsNew(metadata.index("test2")).build(); + // increases avg shard per node to 140/5 = 28, overload factor 1.2, total allowed 34 per node but still ALL primaries get assigned + newState = ClusterState.builder(newState).metadata(metadata).routingTable(updatedRoutingTable).build(); + newState = strategy.reroute(newState, "reroute"); + + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(120)); + assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(20)); + + logger.info("change settings to allow unassigned primaries"); + strategy = createAllocationServiceWithAdditionalSettings( + org.opensearch.common.collect.Map.of( + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(), + 5, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), + 20, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING.getKey(), + false + ), + gatewayAllocator + ); + + for (RoutingNode node : newState.getRoutingNodes()) { + assertThat(node.size(), equalTo(40)); + } + + logger.info("add another index with 5 shards"); + metadata = Metadata.builder(newState.metadata()) + .put( + IndexMetadata.builder("test3") + .settings( + settings(Version.CURRENT).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 5) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ) + ) + .build(); + updatedRoutingTable = RoutingTable.builder(newState.routingTable()).addAsNew(metadata.index("test3")).build(); + // increases avg shard per node to 145/5 = 29, overload factor 1.2, total allowed 35 per node and NO primaries get assigned + // since total owning shards are 40 per node already + newState = ClusterState.builder(newState).metadata(metadata).routingTable(updatedRoutingTable).build(); + newState = strategy.reroute(newState, "reroute"); + + newState = startInitializingShardsAndReroute(strategy, newState); + + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(120)); + assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(25)); + assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).stream().filter(ShardRouting::primary).count(), equalTo(5L)); + } + + public void testThreeZoneTwoReplicaLimitsShardAllocationOnOverload() { + AllocationService strategy = createAllocationServiceWithAdditionalSettings( + org.opensearch.common.collect.Map.of( + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(), + 15, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), + 20, + "cluster.routing.allocation.awareness.force.zone.values", + "zone_1,zone_2,zone_3" + ) + ); + + logger.info("Building initial routing table for 'testThreeZoneTwoReplicaLimitsShardAllocationOnOverload'"); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(20).numberOfReplicas(2)) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(initialRoutingTable) + .build(); + + logger.info("--> adding five nodes on same zone and do rerouting"); + clusterState = addNodes(clusterState, strategy, "zone1", "node1", "node2", "node3", "node4", "node5"); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(20)); + + logger.info("--> start the shards (primaries)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + logger.info("--> replica will not start because we have only one zone value"); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(20)); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(0)); + // replicas are unassigned + assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(40)); + + logger.info("--> add five new node in new zone and reroute"); + clusterState = addNodes(clusterState, strategy, "zone2", "node6", "node7", "node8", "node9", "node10"); + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(20)); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(20)); + + logger.info("--> complete relocation"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(40)); + + logger.info("--> do another reroute, make sure nothing moves"); + assertThat(strategy.reroute(clusterState, "reroute").routingTable(), sameInstance(clusterState.routingTable())); + + logger.info("--> add another five node in new zone and reroute"); + + ClusterState newState = addNodes(clusterState, strategy, "zone3", "node11", "node12", "node13", "node14", "node15"); + + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(60)); + + assertThat(newState.getRoutingNodes().node("node11").size(), equalTo(4)); + assertThat(newState.getRoutingNodes().node("node12").size(), equalTo(4)); + assertThat(newState.getRoutingNodes().node("node13").size(), equalTo(4)); + assertThat(newState.getRoutingNodes().node("node14").size(), equalTo(4)); + assertThat(newState.getRoutingNodes().node("node15").size(), equalTo(4)); + + logger.info("--> Remove three node from zone3 holding primary and replicas"); + + newState = removeNodes(newState, strategy, "node11", "node12", "node13"); + + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + + assertThat(newState.getRoutingNodes().node("node14").size(), equalTo(5)); + assertThat(newState.getRoutingNodes().node("node15").size(), equalTo(5)); + + // add the removed node + newState = addNodes(newState, strategy, "zone3", "node11"); + + assertThat(newState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(5)); + newState = startInitializingShardsAndReroute(strategy, newState); + assertThat(newState.getRoutingNodes().node("node11").size(), equalTo(5)); + + // add the removed node + newState = addNodes(newState, strategy, "zone3", "node12"); + + assertThat(newState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(5)); + newState = startInitializingShardsAndReroute(strategy, newState); + assertThat(newState.getRoutingNodes().node("node12").size(), equalTo(5)); + + // add the removed node + newState = addNodes(newState, strategy, "zone3", "node13"); + + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + assertThat(newState.getRoutingNodes().node("node13").size(), equalTo(4)); + assertThat(newState.getRoutingNodes().node("node12").size(), equalTo(4)); + assertThat(newState.getRoutingNodes().node("node11").size(), equalTo(4)); + // ensure all shards are assigned + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(60)); + assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0)); + } + + public void testThreeZoneOneReplicaLimitsShardAllocationOnOverload() { + AllocationService strategy = createAllocationServiceWithAdditionalSettings( + org.opensearch.common.collect.Map.of( + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(), + 15, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), + 20, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING.getKey(), + true, + "cluster.routing.allocation.awareness.force.zone.values", + "zone_1,zone_2,zone_3" + ) + ); + + logger.info("Building initial routing table for 'testThreeZoneOneReplicaLimitsShardAllocationOnOverload'"); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(30).numberOfReplicas(1)) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(initialRoutingTable) + .build(); + + logger.info("--> adding five nodes on same zone and do rerouting"); + clusterState = addNodes(clusterState, strategy, "zone1", "node1", "node2", "node3", "node4", "node5"); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(30)); + + logger.info("--> start the shards (primaries)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + logger.info("--> replica will not start because we have only one zone value"); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(30)); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(0)); + // replicas are unassigned + assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(30)); + + logger.info("--> add five new node in new zone and reroute"); + clusterState = addNodes(clusterState, strategy, "zone2", "node6", "node7", "node8", "node9", "node10"); + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(30)); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(25)); + + logger.info("--> complete relocation"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(55)); + + logger.info("--> do another reroute, make sure nothing moves"); + assertThat(strategy.reroute(clusterState, "reroute").routingTable(), sameInstance(clusterState.routingTable())); + + logger.info("--> add another five node in new zone and reroute"); + + ClusterState newState = addNodes(clusterState, strategy, "zone3", "node11", "node12", "node13", "node14", "node15"); + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(60)); + + assertThat(newState.getRoutingNodes().node("node11").size(), equalTo(4)); + assertThat(newState.getRoutingNodes().node("node12").size(), equalTo(4)); + assertThat(newState.getRoutingNodes().node("node13").size(), equalTo(4)); + assertThat(newState.getRoutingNodes().node("node14").size(), equalTo(4)); + assertThat(newState.getRoutingNodes().node("node15").size(), equalTo(4)); + + logger.info("--> Remove three node from zone3"); + + newState = removeNodes(newState, strategy, "node11", "node12", "node13"); + + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + + assertThat(newState.getRoutingNodes().node("node14").size(), equalTo(5)); + assertThat(newState.getRoutingNodes().node("node15").size(), equalTo(5)); + + // add the removed nodes + newState = addNodes(clusterState, strategy, "zone3", "node11", "node12", "node13"); + + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + assertThat(newState.getRoutingNodes().node("node13").size(), equalTo(4)); + assertThat(newState.getRoutingNodes().node("node12").size(), equalTo(4)); + assertThat(newState.getRoutingNodes().node("node11").size(), equalTo(4)); + // ensure all shards are assigned + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(60)); + assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0)); + } + + public void testThreeZoneTwoReplicaLimitsShardAllocationOnOverloadAcrossZones() { + AllocationService strategy = createAllocationServiceWithAdditionalSettings( + org.opensearch.common.collect.Map.of( + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(), + 9, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), + 10, + "cluster.routing.allocation.awareness.force.zone.values", + "zone_1,zone_2,zone_3" + ) + ); + + logger.info("Building initial routing table for 'testThreeZoneTwoReplicaLimitsShardAllocationOnOverloadAcrossZones'"); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(21).numberOfReplicas(2)) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(initialRoutingTable) + .build(); + + logger.info("--> adding three nodes on same zone and do rerouting"); + clusterState = addNodes(clusterState, strategy, "zone_1", "node1", "node2", "node3"); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(21)); + + logger.info("--> start the shards (primaries)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + logger.info("--> replica will not start because we have only one rack value"); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(21)); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(0)); + + logger.info("--> add three new node with a new rack and reroute"); + clusterState = addNodes(clusterState, strategy, "zone_2", "node4", "node5", "node6"); + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(21)); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(21)); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo("node4")); + + logger.info("--> complete relocation"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(42)); + + logger.info("--> do another reroute, make sure nothing moves"); + assertThat(strategy.reroute(clusterState, "reroute").routingTable(), sameInstance(clusterState.routingTable())); + + logger.info("--> add another node with a new rack, make sure nothing moves"); + + ClusterState newState = addNodes(clusterState, strategy, "zone_3", "node7", "node8", "node9"); + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(63)); + + logger.info("--> Remove two nodes from zones"); + // remove one nodes in one zone to cause distribution zone1->2 , zone2->3, zone3->2 + newState = removeNodes(newState, strategy, "node7", "node2"); + + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + // ensure minority zone doesn't get overloaded + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(53)); + assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(10)); + for (ShardRouting shard : newState.getRoutingNodes().shardsWithState(UNASSIGNED)) { + assertEquals(shard.unassignedInfo().getReason(), UnassignedInfo.Reason.NODE_LEFT); + } + + newState = ClusterState.builder(newState) + .nodes( + DiscoveryNodes.builder(newState.nodes()) + .add(newNode("node7", singletonMap("zone", "zone_3"))) + .add(newNode("node2", singletonMap("zone", "zone_1"))) + ) + .build(); + newState = strategy.reroute(newState, "reroute"); + + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + + for (RoutingNode node : newState.getRoutingNodes()) { + assertThat(node.size(), equalTo(7)); + } + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(63)); + assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0)); + } + + public void testSingleZoneTwoReplicaLimitsReplicaAllocationOnOverload() { + AllocationService strategy = createAllocationServiceWithAdditionalSettings( + org.opensearch.common.collect.Map.of( + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(), + 3, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), + 10 + ) + ); + + logger.info("Building initial routing table for 'testSingleZoneTwoReplicaLimitsReplicaAllocationOnOverload'"); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(3).numberOfReplicas(2)) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + + ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(initialRoutingTable) + .build(); + + logger.info("--> adding three nodes on same rack and do rerouting"); + clusterState = addNodes(clusterState, strategy, "zone1", "node1", "node2", "node3"); + + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(3)); + + logger.info("--> start the shards (primaries)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + logger.info("--> replicas are initializing"); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(3)); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(6)); + + logger.info("--> start the shards (replicas)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + logger.info("--> all shards are started"); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(9)); + + logger.info("--> do another reroute, make sure nothing moves"); + assertThat(strategy.reroute(clusterState, "reroute").routingTable(), sameInstance(clusterState.routingTable())); + + // remove one node to make zone1 skewed + clusterState = removeNodes(clusterState, strategy, randomFrom("node1", "node2", "node3")); + + while (clusterState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + } + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(6)); + assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(3)); + + for (ShardRouting shard : clusterState.getRoutingNodes().shardsWithState(UNASSIGNED)) { + assertEquals(shard.unassignedInfo().getReason(), UnassignedInfo.Reason.NODE_LEFT); + assertFalse(shard.primary()); + } + } + + public void testSingleZoneOneReplicaLimitsReplicaAllocationOnOverload() { + AllocationService strategy = createAllocationServiceWithAdditionalSettings( + org.opensearch.common.collect.Map.of( + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(), + 5, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), + 10, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING.getKey(), + true + ) + ); + + logger.info("Building initial routing table for 'testSingleZoneOneReplicaLimitsReplicaAllocationOnOverload'"); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(20).numberOfReplicas(1)) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + + ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(initialRoutingTable) + .build(); + + logger.info("--> adding two nodes on same rack and do rerouting"); + clusterState = addNodes(clusterState, strategy, "zone1", "node1", "node2"); + // skewness limit doesn't apply to primary + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(20)); + + logger.info("--> start the shards (primaries)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(20)); + // assert replicas are not assigned but primaries are + logger.info("--> replicas are not initializing"); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(0)); + + for (ShardRouting shard : clusterState.getRoutingNodes().shardsWithState(UNASSIGNED)) { + assertEquals(shard.unassignedInfo().getReason(), UnassignedInfo.Reason.INDEX_CREATED); + assertFalse(shard.primary()); + } + + logger.info("--> do another reroute, make sure nothing moves"); + assertThat(strategy.reroute(clusterState, "reroute").routingTable(), sameInstance(clusterState.routingTable())); + + // add the third and fourth node + clusterState = addNodes(clusterState, strategy, "zone1", "node3", "node4"); + + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(18)); + + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + logger.info("--> replicas are started"); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(38)); + + for (ShardRouting shard : clusterState.getRoutingNodes().shardsWithState(UNASSIGNED)) { + assertEquals(shard.unassignedInfo().getReason(), UnassignedInfo.Reason.INDEX_CREATED); + assertFalse(shard.primary()); + } + + clusterState = addNodes(clusterState, strategy, "zone1", "node5"); + + while (clusterState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + } + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(40)); + assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0)); + + for (RoutingNode node : clusterState.getRoutingNodes()) { + assertThat(node.size(), equalTo(8)); + } + } + + public void testThreeZoneTwoReplicaLimitsReplicaAllocationUnderFullZoneFailure() { + AllocationService strategy = createAllocationServiceWithAdditionalSettings( + org.opensearch.common.collect.Map.of( + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(), + 15, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), + 20 + ) + ); + + logger.info("Building initial routing table for 'testThreeZoneTwoReplicaLimitsUnderFullZoneFailure'"); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(20).numberOfReplicas(2)) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(initialRoutingTable) + .build(); + + logger.info("--> adding five nodes on same zone and do rerouting"); + clusterState = addNodes(clusterState, strategy, "zone1", "node1", "node2", "node3", "node4", "node5"); + clusterState = strategy.reroute(clusterState, "reroute"); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(20)); + + logger.info("--> start the shards (primaries)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + logger.info("--> add five new node in new zone and reroute"); + clusterState = addNodes(clusterState, strategy, "zone2", "node6", "node7", "node8", "node9", "node10"); + + logger.info("--> complete relocation"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + ClusterState newState = addNodes(clusterState, strategy, "zone3", "node11", "node12", "node13", "node14", "node15"); + + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(60)); + assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0)); + + assertThat(newState.getRoutingNodes().node("node11").size(), equalTo(4)); + assertThat(newState.getRoutingNodes().node("node12").size(), equalTo(4)); + assertThat(newState.getRoutingNodes().node("node13").size(), equalTo(4)); + assertThat(newState.getRoutingNodes().node("node14").size(), equalTo(4)); + assertThat(newState.getRoutingNodes().node("node15").size(), equalTo(4)); + + logger.info("--> Remove complete zone3 holding primary and replicas"); + newState = removeNodes(newState, strategy, "node11", "node12", "node13", "node14", "node15"); + + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(50)); + assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(10)); + + for (RoutingNode node : newState.getRoutingNodes()) { + assertThat(node.size(), equalTo(5)); + } + + // add the removed node + newState = ClusterState.builder(newState) + .nodes(DiscoveryNodes.builder(newState.nodes()).add(newNode("node11", singletonMap("zone", "zone3")))) + .build(); + newState = strategy.reroute(newState, "reroute"); + + assertThat(newState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(5)); + newState = startInitializingShardsAndReroute(strategy, newState); + assertThat(newState.getRoutingNodes().node("node11").size(), equalTo(5)); + + // add the removed node + newState = ClusterState.builder(newState) + .nodes(DiscoveryNodes.builder(newState.nodes()).add(newNode("node12", singletonMap("zone", "zone3")))) + .build(); + newState = strategy.reroute(newState, "reroute"); + + assertThat(newState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(5)); + newState = startInitializingShardsAndReroute(strategy, newState); + assertThat(newState.getRoutingNodes().node("node12").size(), equalTo(5)); + + // add the removed node + newState = ClusterState.builder(newState) + .nodes(DiscoveryNodes.builder(newState.nodes()).add(newNode("node13", singletonMap("zone", "zone3")))) + .build(); + newState = strategy.reroute(newState, "reroute"); + + // add the removed node + newState = ClusterState.builder(newState) + .nodes(DiscoveryNodes.builder(newState.nodes()).add(newNode("node14", singletonMap("zone", "zone3")))) + .build(); + newState = strategy.reroute(newState, "reroute"); + + // add the removed node + newState = ClusterState.builder(newState) + .nodes(DiscoveryNodes.builder(newState.nodes()).add(newNode("node15", singletonMap("zone", "zone3")))) + .build(); + newState = strategy.reroute(newState, "reroute"); + + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + assertThat(newState.getRoutingNodes().node("node13").size(), equalTo(4)); + assertThat(newState.getRoutingNodes().node("node12").size(), equalTo(4)); + assertThat(newState.getRoutingNodes().node("node11").size(), equalTo(4)); + // ensure all shards are assigned + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(60)); + assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0)); + } + + private ClusterState removeNodes(ClusterState clusterState, AllocationService allocationService, String... nodeIds) { + DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(clusterState.getNodes()); + org.opensearch.common.collect.List.of(nodeIds).forEach(nodeId -> nodeBuilder.remove(nodeId)); + return allocationService.disassociateDeadNodes(ClusterState.builder(clusterState).nodes(nodeBuilder).build(), true, "reroute"); + } + + private ClusterState addNodes(ClusterState clusterState, AllocationService allocationService, String zone, String... nodeIds) { + DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(clusterState.nodes()); + org.opensearch.common.collect.List.of(nodeIds).forEach(nodeId -> nodeBuilder.add(newNode(nodeId, singletonMap("zone", zone)))); + clusterState = ClusterState.builder(clusterState).nodes(nodeBuilder).build(); + return allocationService.reroute(clusterState, "reroute"); + } + + private AllocationService createAllocationServiceWithAdditionalSettings(Map settingsValue) { + return createAllocationService(buildSettings(settingsValue)); + } + + private AllocationService createAllocationServiceWithAdditionalSettings( + Map settingsValue, + GatewayAllocator gatewayAllocator + ) { + return createAllocationService(buildSettings(settingsValue), gatewayAllocator); + } + + private Settings buildSettings(Map settingsValue) { + Settings.Builder settingsBuilder = Settings.builder() + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), 20) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey(), 20) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 20) + .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") + .put("cluster.routing.allocation.awareness.attributes", "zone"); + settingsValue.forEach((k, v) -> { + if (v instanceof Integer) settingsBuilder.put(k, (Integer) (v)); + else if (v instanceof Boolean) settingsBuilder.put(k, (Boolean) (v)); + else if (v instanceof String) settingsBuilder.put(k, (String) (v)); + else { + throw new UnsupportedOperationException("Unsupported type for key :" + k); + } + }); + return settingsBuilder.build(); + } +}