diff --git a/server/src/main/java/org/opensearch/cluster/ClusterModule.java b/server/src/main/java/org/opensearch/cluster/ClusterModule.java index 5bd0a661c381d..fc93e1be134b9 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterModule.java @@ -63,6 +63,7 @@ import org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.FilterAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider; +import org.opensearch.cluster.routing.allocation.decider.NodeLoadAwareAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.RebalanceOnlyWhenActiveAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider; @@ -256,6 +257,7 @@ public static Collection createAllocationDeciders(Settings se addAllocationDecider(deciders, new ThrottlingAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new ShardsLimitAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new AwarenessAllocationDecider(settings, clusterSettings)); + addAllocationDecider(deciders, new NodeLoadAwareAllocationDecider(settings, clusterSettings)); clusterPlugins.stream() .flatMap(p -> p.createAllocationDeciders(settings, clusterSettings).stream()) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/NodeLoadAwareAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/NodeLoadAwareAllocationDecider.java new file mode 100644 index 0000000000000..f0318d986899d --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/NodeLoadAwareAllocationDecider.java @@ -0,0 +1,133 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation.decider; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.RoutingAllocation; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.settings.Setting.Property; + +import java.util.function.BiPredicate; + +/** + * This {@link NodeLoadAwareAllocationDecider} controls shard over-allocation + * due to node failures or otherwise on the surviving nodes. The allocation limits + * are decided by the user provisioned capacity, to determine if there were lost nodes. + * The provisioned capacity as defined by the below settings needs to be updated on every + * cluster scale up and scale down operations. + *
+ * cluster.routing.allocation.overload_awareness.provisioned_capacity: N
+ * 
+ *

+ * and prevent allocation on the surviving nodes of the under capacity cluster + * based on overload factor defined as a percentage by + *

+ * cluster.routing.allocation.load_awareness.skew_factor: X
+ * 
+ * The total limit per node based on skew_factor doesn't limit primaries that previously + * existed on the disk as those shards are force allocated by + * {@link AllocationDeciders#canForceAllocatePrimary(ShardRouting, RoutingNode, RoutingAllocation)} + * however new primaries due to index creation, snapshot restore etc can be controlled via the below settings. + * Setting the value to true allows newly created primaries to get assigned while preventing the replica allocation + * breaching the skew factor. + * Note that setting this to false can result in the primaries not get assigned and the cluster turning RED + *
+ * cluster.routing.allocation.load_awareness.allow_unassigned_primaries
+ * 
+ */ +public class NodeLoadAwareAllocationDecider extends AllocationDecider { + + public static final String NAME = "load_awareness"; + + public static final Setting CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING = + Setting.intSetting("cluster.routing.allocation.load_awareness.provisioned_capacity", -1, -1, + Property.Dynamic, Property.NodeScope); + public static final Setting CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING = + Setting.doubleSetting("cluster.routing.allocation.load_awareness.skew_factor", 50, -1, Property.Dynamic, + Property.NodeScope); + public static final Setting CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING = + Setting.boolSetting("cluster.routing.allocation.load_awareness.allow_unassigned_primaries", + true, Setting.Property.Dynamic, Property.NodeScope); + + private volatile int provisionedCapacity; + + private volatile double skewFactor; + + private volatile boolean allowUnassignedPrimaries; + + private static final Logger logger = LogManager.getLogger(NodeLoadAwareAllocationDecider.class); + + public NodeLoadAwareAllocationDecider(Settings settings, ClusterSettings clusterSettings) { + this.skewFactor = CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.get(settings); + this.provisionedCapacity = CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.get(settings); + this.allowUnassignedPrimaries = CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING.get(settings); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING, + this::setSkewFactor); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING, + this::setProvisionedCapacity); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING, + this::setAllowUnassignedPrimaries); + } + + private void setAllowUnassignedPrimaries(boolean allowUnassignedPrimaries) { + this.allowUnassignedPrimaries = allowUnassignedPrimaries; + } + + private void setSkewFactor(double skewFactor) { + this.skewFactor = skewFactor; + } + + private void setProvisionedCapacity(int provisionedCapacity) { + this.provisionedCapacity = provisionedCapacity; + } + + @Override + public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + return underCapacity(shardRouting, node, allocation, (count, limit) -> count >= limit); + } + + @Override + public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + return underCapacity(shardRouting, node, allocation, (count, limit) -> count > limit); + } + + private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation, + BiPredicate decider) { + if (provisionedCapacity <= 0 || skewFactor < 0 ) { + return allocation.decision(Decision.YES, NAME, + "overload awareness allocation is not enabled, set cluster setting [%s] and cluster setting [%s] to enable it", + CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), + CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey()); + } + if (shardRouting.unassigned() && shardRouting.primary() && allowUnassignedPrimaries) { + return allocation.decision(Decision.YES, NAME, + "overload allocation awareness is allowed for unassigned primaries, set cluster setting [%s] to disable it", + CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING.getKey()); + } + Metadata metadata = allocation.metadata(); + float expectedAvgShardsPerNode = (float) metadata.getTotalNumberOfShards() / provisionedCapacity; + int nodeShardCount = node.numberOfOwningShards(); + int limit = (int) Math.ceil(expectedAvgShardsPerNode * (1 + skewFactor / 100.0)); + if (decider.test(nodeShardCount, limit)) { + logger.debug(() -> new ParameterizedMessage("Too many shards [{}] allocated to this node [{}]. Expected average shards" + + " per node [{}], overload factor [{}], node limit [{}]", nodeShardCount, node.nodeId(), expectedAvgShardsPerNode, + skewFactor, limit)); + return allocation.decision(Decision.NO, NAME, "too many shards [%d] allocated to this node, limit per node [%d] considering" + + " overload factor [%.2f] based on capacity [%d]", nodeShardCount, limit, skewFactor, provisionedCapacity); + } + return allocation.decision(Decision.YES, NAME, "node meets all skew awareness attribute requirements"); + } +} diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 446b3bb0bb59b..669ec43dca604 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -33,6 +33,7 @@ import org.apache.logging.log4j.LogManager; import org.opensearch.action.main.TransportMainAction; +import org.opensearch.cluster.routing.allocation.decider.NodeLoadAwareAllocationDecider; import org.opensearch.watcher.ResourceWatcherService; import org.opensearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction; import org.opensearch.action.admin.indices.close.TransportCloseIndexAction; @@ -581,7 +582,10 @@ public void apply(Settings value, Settings current, Settings previous) { FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING, FsHealthService.HEALTHY_TIMEOUT_SETTING, TransportMainAction.OVERRIDE_MAIN_RESPONSE_VERSION, - IndexingPressure.MAX_INDEXING_BYTES))); + IndexingPressure.MAX_INDEXING_BYTES, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING))); public static List> BUILT_IN_SETTING_UPGRADERS = Collections.unmodifiableList(Arrays.asList( SniffConnectionStrategy.SEARCH_REMOTE_CLUSTER_SEEDS_UPGRADER, diff --git a/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java b/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java index b9636472ffeb1..0066d6895eaa5 100644 --- a/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java +++ b/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java @@ -48,6 +48,7 @@ import org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.FilterAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider; +import org.opensearch.cluster.routing.allocation.decider.NodeLoadAwareAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.RebalanceOnlyWhenActiveAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider; @@ -230,7 +231,8 @@ public void testAllocationDeciderOrder() { DiskThresholdDecider.class, ThrottlingAllocationDecider.class, ShardsLimitAllocationDecider.class, - AwarenessAllocationDecider.class); + AwarenessAllocationDecider.class, + NodeLoadAwareAllocationDecider.class); Collection deciders = ClusterModule.createAllocationDeciders(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), Collections.emptyList()); Iterator iter = deciders.iterator(); diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/NodeLoadAwareAllocationTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/NodeLoadAwareAllocationTests.java new file mode 100644 index 0000000000000..649625c9dfb08 --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/NodeLoadAwareAllocationTests.java @@ -0,0 +1,1030 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.Version; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.OpenSearchAllocationTestCase; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.UnassignedInfo; +import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; +import org.opensearch.cluster.routing.allocation.decider.NodeLoadAwareAllocationDecider; +import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; +import org.opensearch.common.settings.Settings; +import org.opensearch.gateway.GatewayAllocator; +import org.opensearch.test.gateway.TestGatewayAllocator; + +import java.util.Map; + +import static java.util.Collections.singletonMap; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.sameInstance; +import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING; +import static org.opensearch.cluster.routing.ShardRoutingState.STARTED; +import static org.opensearch.cluster.routing.ShardRoutingState.UNASSIGNED; + +public class NodeLoadAwareAllocationTests extends OpenSearchAllocationTestCase { + + private final Logger logger = LogManager.getLogger(NodeLoadAwareAllocationTests.class); + + public void testNewUnassignedPrimaryAllocationOnOverload() { + AllocationService strategy = createAllocationServiceWithAdditionalSettings(org.opensearch.common.collect.Map.of( + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(), 5, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), 20, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING.getKey(), true) + ); + + logger.info("Building initial routing table for 'testNewUnassignedPrimaryAllocationOnOverload'"); + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(20).numberOfReplicas(0)) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder() + .addAsNew(metadata.index("test")) + .build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING + .getDefault(Settings.EMPTY)).metadata(metadata).routingTable(initialRoutingTable).build(); + + logger.info("--> adding five nodes on same zone and do rerouting"); + clusterState = addNodes(clusterState, strategy, "zone_1", "node1", "node2", "node3", "node4", "node5"); + + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(20)); + + logger.info("--> start the shards (primaries)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(20)); + + logger.info("--> Remove nodes from zone holding primaries"); + ClusterState newState = removeNodes(clusterState, strategy, "node1", "node2", "node3"); + + logger.info("add another index with 20 shards"); + metadata = Metadata.builder(newState.metadata()) + .put(IndexMetadata.builder("test1").settings(settings(Version.CURRENT) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 20) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + )) + .build(); + RoutingTable updatedRoutingTable = RoutingTable.builder(newState.routingTable()) + .addAsNew(metadata.index("test1")) + .build(); + + newState = ClusterState.builder(newState).metadata(metadata).routingTable(updatedRoutingTable).build(); + newState = strategy.reroute(newState, "reroute"); + + newState = startInitializingShardsAndReroute(strategy, newState); + + logger.info("no limits should be applied on newly created primaries"); + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(28)); + assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(12)); + for (ShardRouting shard : newState.getRoutingNodes().shardsWithState(UNASSIGNED)) { + assertEquals(shard.unassignedInfo().getReason(), UnassignedInfo.Reason.NODE_LEFT); + } + + newState = ClusterState.builder(newState).nodes(DiscoveryNodes.builder(newState.nodes()) + .add(newNode("node1", singletonMap("zone", "zone_1")))) + .build(); + + //4 existing shards from this node's local store get started + newState = strategy.reroute(newState, "reroute"); + newState = startInitializingShardsAndReroute(strategy, newState); + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(32)); + + //add back node2 when skewness is still breached + newState = ClusterState.builder(newState).nodes(DiscoveryNodes.builder(newState.nodes()) + .add(newNode("node2", singletonMap("zone", "zone_1")))) + .build(); + newState = strategy.reroute(newState, "reroute"); + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(36)); + + //add back node3 + newState = ClusterState.builder(newState).nodes(DiscoveryNodes.builder(newState.nodes()) + .add(newNode("node3", singletonMap("zone", "zone_1")))) + .build(); + newState = strategy.reroute(newState, "reroute"); + + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(40)); + } + + public void testNoAllocationLimitsOnOverloadForDisabledLoadFactor() { + AllocationService strategy = createAllocationServiceWithAdditionalSettings(org.opensearch.common.collect.Map.of( + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(), 5, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), -1, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING.getKey(), false) + ); + + logger.info("Building initial routing table for 'testNoAllocationLimitsOnOverloadForDisabledLoadFactor'"); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(20).numberOfReplicas(0)) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder() + .addAsNew(metadata.index("test")) + .build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING + .getDefault(Settings.EMPTY)).metadata(metadata).routingTable(initialRoutingTable).build(); + + logger.info("--> adding five nodes on same zone and do rerouting"); + clusterState = addNodes(clusterState, strategy, "zone_1", "node1", "node2", "node3", "node4", "node5"); + + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(20)); + + logger.info("--> start the shards (primaries)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(20)); + + logger.info("--> Remove nodes from zone holding primaries"); + ClusterState newState = removeNodes(clusterState, strategy, "node1", "node2", "node3" ); + + logger.info("add another index with 20 shards"); + metadata = Metadata.builder(newState.metadata()) + .put(IndexMetadata.builder("test1").settings(settings(Version.CURRENT) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 20) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + )) + .build(); + RoutingTable updatedRoutingTable = RoutingTable.builder(newState.routingTable()) + .addAsNew(metadata.index("test1")) + .build(); + + newState = ClusterState.builder(newState).metadata(metadata).routingTable(updatedRoutingTable).build(); + newState = strategy.reroute(newState, "reroute"); + + newState = startInitializingShardsAndReroute(strategy, newState); + + logger.info("no limits should be applied on newly created primaries"); + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(28)); + assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(12)); + for (ShardRouting shard : newState.getRoutingNodes().shardsWithState(UNASSIGNED)) { + assertEquals(shard.unassignedInfo().getReason(), UnassignedInfo.Reason.NODE_LEFT); + } + + newState = ClusterState.builder(newState).nodes(DiscoveryNodes.builder(newState.nodes()) + .add(newNode("node1", singletonMap("zone", "zone_1")))) + .build(); + + //4 existing shards from this node's local store get started + newState = strategy.reroute(newState, "reroute"); + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(32)); + + //add back node2 when skewness is still breached + newState = ClusterState.builder(newState).nodes(DiscoveryNodes.builder(newState.nodes()) + .add(newNode("node2", singletonMap("zone", "zone_1")))) + .build(); + newState = strategy.reroute(newState, "reroute"); + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(36)); + + //add back node3 + newState = ClusterState.builder(newState).nodes(DiscoveryNodes.builder(newState.nodes()) + .add(newNode("node3", singletonMap("zone", "zone_1")))) + .build(); + newState = strategy.reroute(newState, "reroute"); + + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(40)); + } + + public void testExistingPrimariesAllocationOnOverload() { + GatewayAllocator gatewayAllocator = new TestGatewayAllocator(); + AllocationService strategy = createAllocationServiceWithAdditionalSettings(org.opensearch.common.collect.Map.of( + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(), 5, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), 50, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING.getKey(), false), + gatewayAllocator); + + logger.info("Building initial routing table for 'testExistingPrimariesAllocationOnOverload'"); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(20).numberOfReplicas(0)) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder() + .addAsNew(metadata.index("test")) + .build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING + .getDefault(Settings.EMPTY)).metadata(metadata).routingTable(initialRoutingTable).build(); + + logger.info("--> adding five nodes on same zone and do rerouting"); + clusterState = addNodes(clusterState, strategy, "zone_1", "node1", "node2", "node3", "node4", "node5"); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(20)); + + logger.info("--> start the shards (primaries)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(20)); + + logger.info("--> Remove nodes from zone holding primaries"); + ClusterState newState = removeNodes(clusterState, strategy, "node1", "node2", "node3"); + + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(8)); + + logger.info("add another index with 20 shards"); + metadata = Metadata.builder(newState.metadata()) + .put(IndexMetadata.builder("test1").settings(settings(Version.CURRENT) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 20) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + )) + .build(); + RoutingTable updatedRoutingTable = RoutingTable.builder(newState.routingTable()) + .addAsNew(metadata.index("test1")) + .build(); + + newState = ClusterState.builder(newState).metadata(metadata).routingTable(updatedRoutingTable).build(); + newState = strategy.reroute(newState, "reroute"); + + newState = startInitializingShardsAndReroute(strategy, newState); + + logger.info("limits should be applied on newly create primaries"); + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(24)); + assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(16)); + + assertEquals(12L, newState.getRoutingNodes().shardsWithState(UNASSIGNED).stream(). + filter(r -> r.unassignedInfo().getReason() == UnassignedInfo.Reason.NODE_LEFT).count()); + + assertEquals(4L, newState.getRoutingNodes().shardsWithState(UNASSIGNED).stream(). + filter(r -> r.unassignedInfo().getReason() == UnassignedInfo.Reason.INDEX_CREATED).count()); + + assertThat(newState.getRoutingNodes().node("node4").size(), equalTo(12)); + + logger.info("--> Remove node4 from zone holding primaries"); + newState = removeNodes(newState, strategy, "node4"); + + logger.info("--> change the overload load factor to zero and verify if unassigned primaries on disk get assigned despite overload"); + strategy = createAllocationServiceWithAdditionalSettings(org.opensearch.common.collect.Map.of( + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(), 5, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), 0, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING.getKey(), false), + gatewayAllocator); + + newState = strategy.reroute(newState, "reroute"); + + logger.info("--> Add back node4 and ensure existing primaries are assigned"); + newState = ClusterState.builder(newState).nodes(DiscoveryNodes.builder(newState.nodes()) + .add(newNode("node4", singletonMap("zone", "zone_1")))) + .build(); + + newState = strategy.reroute(newState, "reroute"); + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + + logger.info("--> do another reroute, make sure nothing moves"); + assertThat(strategy.reroute(newState, "reroute").routingTable(), sameInstance(newState.routingTable())); + + assertThat(newState.getRoutingNodes().node("node4").size(), equalTo(12)); + assertThat(newState.getRoutingNodes().node("node5").size(), equalTo(12)); + + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(24)); + + newState = ClusterState.builder(newState).nodes(DiscoveryNodes.builder(newState.nodes()) + .add(newNode("node1", singletonMap("zone", "zone_1")))) + .build(); + + newState = strategy.reroute(newState, "reroute"); + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(32)); + + //add back node2 when skewness is still breached + newState = ClusterState.builder(newState).nodes(DiscoveryNodes.builder(newState.nodes()) + .add(newNode("node2", singletonMap("zone", "zone_1")))) + .build(); + newState = strategy.reroute(newState, "reroute"); + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(36)); + + //add back node3 + newState = ClusterState.builder(newState).nodes(DiscoveryNodes.builder(newState.nodes()) + .add(newNode("node3", singletonMap("zone", "zone_1")))) + .build(); + newState = strategy.reroute(newState, "reroute"); + + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(40)); + } + + public void testSingleZoneOneReplicaLimitsShardAllocationOnOverload() { + GatewayAllocator gatewayAllocator = new TestGatewayAllocator(); + AllocationService strategy = createAllocationServiceWithAdditionalSettings(org.opensearch.common.collect.Map.of( + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(), 5, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), 20, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING.getKey(), true), + gatewayAllocator); + + logger.info("Building initial routing table for 'testSingleZoneOneReplicaLimitsShardAllocationOnOverload'"); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(20).numberOfReplicas(1)) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder() + .addAsNew(metadata.index("test")) + .build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING + .getDefault(Settings.EMPTY)).metadata(metadata).routingTable(initialRoutingTable).build(); + + logger.info("--> adding five nodes on same zone and do rerouting"); + clusterState = addNodes(clusterState, strategy, "zone_1", "node1", "node2", "node3", "node4", "node5"); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(20)); + + logger.info("--> start the shards (primaries)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(20)); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(20)); + + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(40)); + + logger.info("--> Remove node1 from zone"); + ClusterState newState = removeNodes(clusterState, strategy, "node1"); + + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(40)); + + for (RoutingNode node : newState.getRoutingNodes()) { + assertThat(node.size(), equalTo(10)); + } + + logger.info("--> Remove node2 when the limit of overload is reached"); + newState = removeNodes(newState, strategy, "node2"); + newState = strategy.reroute(newState, "reroute"); + + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(30)); + + for (ShardRouting shard : newState.getRoutingNodes().shardsWithState(UNASSIGNED)) { + assertEquals(shard.unassignedInfo().getReason(), UnassignedInfo.Reason.NODE_LEFT); + assertFalse(shard.primary()); + } + + logger.info("add another index with 20 shards"); + metadata = Metadata.builder(newState.metadata()) + .put(IndexMetadata.builder("test1").settings(settings(Version.CURRENT) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 20) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + )) + .build(); + RoutingTable updatedRoutingTable = RoutingTable.builder(newState.routingTable()) + .addAsNew(metadata.index("test1")) + .build(); + //increases avg shard per node to 80/5 = 16, overload factor 1.2, total allowed 20 + newState = ClusterState.builder(newState).metadata(metadata).routingTable(updatedRoutingTable).build(); + newState = strategy.reroute(newState, "reroute"); + + newState = startInitializingShardsAndReroute(strategy, newState); + + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(60)); + assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(20)); + + logger.info("add another index with 60 shards"); + metadata = Metadata.builder(newState.metadata()) + .put(IndexMetadata.builder("test2").settings(settings(Version.CURRENT) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 60) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + )) + .build(); + updatedRoutingTable = RoutingTable.builder(newState.routingTable()) + .addAsNew(metadata.index("test2")) + .build(); + //increases avg shard per node to 140/5 = 28, overload factor 1.2, total allowed 34 per node but still ALL primaries get assigned + newState = ClusterState.builder(newState).metadata(metadata).routingTable(updatedRoutingTable).build(); + newState = strategy.reroute(newState, "reroute"); + + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(120)); + assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(20)); + + logger.info("change settings to allow unassigned primaries"); + strategy = createAllocationServiceWithAdditionalSettings(org.opensearch.common.collect.Map.of( + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(), 5, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), 20, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING.getKey(), false), + gatewayAllocator); + + for (RoutingNode node : newState.getRoutingNodes()) { + assertThat(node.size(), equalTo(40)); + } + + logger.info("add another index with 5 shards"); + metadata = Metadata.builder(newState.metadata()) + .put(IndexMetadata.builder("test3").settings(settings(Version.CURRENT) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 5) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + )) + .build(); + updatedRoutingTable = RoutingTable.builder(newState.routingTable()) + .addAsNew(metadata.index("test3")) + .build(); + //increases avg shard per node to 145/5 = 29, overload factor 1.2, total allowed 35 per node and NO primaries get assigned + //since total owning shards are 40 per node already + newState = ClusterState.builder(newState).metadata(metadata).routingTable(updatedRoutingTable).build(); + newState = strategy.reroute(newState, "reroute"); + + newState = startInitializingShardsAndReroute(strategy, newState); + + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(120)); + assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(25)); + assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).stream().filter(ShardRouting::primary).count(), equalTo(5L)); + } + + public void testThreeZoneTwoReplicaLimitsShardAllocationOnOverload() { + AllocationService strategy = createAllocationServiceWithAdditionalSettings(org.opensearch.common.collect.Map.of( + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(), 15, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), 20, + "cluster.routing.allocation.awareness.force.zone.values", "zone_1,zone_2,zone_3") + ); + + logger.info("Building initial routing table for 'testThreeZoneTwoReplicaLimitsShardAllocationOnOverload'"); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(20).numberOfReplicas(2)) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder() + .addAsNew(metadata.index("test")) + .build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING + .getDefault(Settings.EMPTY)).metadata(metadata).routingTable(initialRoutingTable).build(); + + logger.info("--> adding five nodes on same zone and do rerouting"); + clusterState = addNodes(clusterState, strategy, "zone1", "node1", "node2", "node3", "node4", "node5"); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(20)); + + logger.info("--> start the shards (primaries)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + logger.info("--> replica will not start because we have only one zone value"); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(20)); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(0)); + //replicas are unassigned + assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(40)); + + logger.info("--> add five new node in new zone and reroute"); + clusterState = addNodes(clusterState, strategy, "zone2", "node6", "node7", "node8", "node9", "node10"); + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(20)); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(20)); + + logger.info("--> complete relocation"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(40)); + + logger.info("--> do another reroute, make sure nothing moves"); + assertThat(strategy.reroute(clusterState, "reroute").routingTable(), sameInstance(clusterState.routingTable())); + + logger.info("--> add another five node in new zone and reroute"); + + ClusterState newState = addNodes(clusterState, strategy, "zone3", "node11", "node12", "node13", "node14", "node15"); + + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(60)); + + assertThat(newState.getRoutingNodes().node("node11").size(), equalTo(4)); + assertThat(newState.getRoutingNodes().node("node12").size(), equalTo(4)); + assertThat(newState.getRoutingNodes().node("node13").size(), equalTo(4)); + assertThat(newState.getRoutingNodes().node("node14").size(), equalTo(4)); + assertThat(newState.getRoutingNodes().node("node15").size(), equalTo(4)); + + logger.info("--> Remove three node from zone3 holding primary and replicas"); + + newState = removeNodes(newState, strategy, "node11", "node12", "node13"); + + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + + assertThat(newState.getRoutingNodes().node("node14").size(), equalTo(5)); + assertThat(newState.getRoutingNodes().node("node15").size(), equalTo(5)); + + //add the removed node + newState = addNodes(newState, strategy, "zone3", "node11"); + + assertThat(newState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(5)); + newState = startInitializingShardsAndReroute(strategy, newState); + assertThat(newState.getRoutingNodes().node("node11").size(), equalTo(5)); + + //add the removed node + newState = addNodes(newState, strategy, "zone3", "node12"); + + assertThat(newState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(5)); + newState = startInitializingShardsAndReroute(strategy, newState); + assertThat(newState.getRoutingNodes().node("node12").size(), equalTo(5)); + + //add the removed node + newState = addNodes(newState, strategy, "zone3", "node13"); + + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + assertThat(newState.getRoutingNodes().node("node13").size(), equalTo(4)); + assertThat(newState.getRoutingNodes().node("node12").size(), equalTo(4)); + assertThat(newState.getRoutingNodes().node("node11").size(), equalTo(4)); + //ensure all shards are assigned + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(60)); + assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0)); + } + + public void testThreeZoneOneReplicaLimitsShardAllocationOnOverload() { + AllocationService strategy = createAllocationServiceWithAdditionalSettings(org.opensearch.common.collect.Map.of( + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(), 15, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), 20, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING.getKey(), true, + "cluster.routing.allocation.awareness.force.zone.values", "zone_1,zone_2,zone_3") + ); + + logger.info("Building initial routing table for 'testThreeZoneOneReplicaLimitsShardAllocationOnOverload'"); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(30).numberOfReplicas(1)) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder() + .addAsNew(metadata.index("test")) + .build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING + .getDefault(Settings.EMPTY)).metadata(metadata).routingTable(initialRoutingTable).build(); + + logger.info("--> adding five nodes on same zone and do rerouting"); + clusterState = addNodes(clusterState, strategy, "zone1", "node1", "node2", "node3", "node4", "node5"); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(30)); + + logger.info("--> start the shards (primaries)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + logger.info("--> replica will not start because we have only one zone value"); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(30)); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(0)); + //replicas are unassigned + assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(30)); + + logger.info("--> add five new node in new zone and reroute"); + clusterState = addNodes(clusterState, strategy, "zone2", "node6", "node7", "node8", "node9", "node10"); + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(30)); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(25)); + + logger.info("--> complete relocation"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(55)); + + logger.info("--> do another reroute, make sure nothing moves"); + assertThat(strategy.reroute(clusterState, "reroute").routingTable(), sameInstance(clusterState.routingTable())); + + logger.info("--> add another five node in new zone and reroute"); + + ClusterState newState = addNodes(clusterState, strategy, "zone3", "node11", "node12", "node13", "node14", "node15"); + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(60)); + + assertThat(newState.getRoutingNodes().node("node11").size(), equalTo(4)); + assertThat(newState.getRoutingNodes().node("node12").size(), equalTo(4)); + assertThat(newState.getRoutingNodes().node("node13").size(), equalTo(4)); + assertThat(newState.getRoutingNodes().node("node14").size(), equalTo(4)); + assertThat(newState.getRoutingNodes().node("node15").size(), equalTo(4)); + + logger.info("--> Remove three node from zone3"); + + newState = removeNodes(newState, strategy, "node11", "node12", "node13"); + + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + + assertThat(newState.getRoutingNodes().node("node14").size(), equalTo(5)); + assertThat(newState.getRoutingNodes().node("node15").size(), equalTo(5)); + + //add the removed nodes + newState = addNodes(clusterState, strategy, "zone3", "node11", "node12", "node13"); + + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + assertThat(newState.getRoutingNodes().node("node13").size(), equalTo(4)); + assertThat(newState.getRoutingNodes().node("node12").size(), equalTo(4)); + assertThat(newState.getRoutingNodes().node("node11").size(), equalTo(4)); + //ensure all shards are assigned + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(60)); + assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0)); + } + + public void testThreeZoneTwoReplicaLimitsShardAllocationOnOverloadAcrossZones() { + AllocationService strategy = createAllocationServiceWithAdditionalSettings(org.opensearch.common.collect.Map.of( + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(), 9, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), 10, + "cluster.routing.allocation.awareness.force.zone.values", "zone_1,zone_2,zone_3") + ); + + logger.info("Building initial routing table for 'testThreeZoneTwoReplicaLimitsShardAllocationOnOverloadAcrossZones'"); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(21).numberOfReplicas(2)) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder() + .addAsNew(metadata.index("test")) + .build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING + .getDefault(Settings.EMPTY)).metadata(metadata).routingTable(initialRoutingTable).build(); + + logger.info("--> adding three nodes on same zone and do rerouting"); + clusterState = addNodes(clusterState, strategy, "zone_1", "node1", "node2", "node3"); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(21)); + + logger.info("--> start the shards (primaries)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + logger.info("--> replica will not start because we have only one rack value"); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(21)); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(0)); + + logger.info("--> add three new node with a new rack and reroute"); + clusterState = addNodes(clusterState, strategy, "zone_2", "node4", "node5", "node6"); + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(21)); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(21)); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), + equalTo("node4")); + + logger.info("--> complete relocation"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(42)); + + logger.info("--> do another reroute, make sure nothing moves"); + assertThat(strategy.reroute(clusterState, "reroute").routingTable(), sameInstance(clusterState.routingTable())); + + logger.info("--> add another node with a new rack, make sure nothing moves"); + + ClusterState newState = addNodes(clusterState, strategy, "zone_3", "node7", "node8", "node9"); + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(63)); + + logger.info("--> Remove two nodes from zones"); + //remove one nodes in one zone to cause distribution zone1->2 , zone2->3, zone3->2 + newState = removeNodes(newState, strategy, "node7", "node2"); + + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + //ensure minority zone doesn't get overloaded + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(53)); + assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(10)); + for (ShardRouting shard : newState.getRoutingNodes().shardsWithState(UNASSIGNED)) { + assertEquals(shard.unassignedInfo().getReason(), UnassignedInfo.Reason.NODE_LEFT); + } + + newState = ClusterState.builder(newState).nodes(DiscoveryNodes.builder(newState.nodes()) + .add(newNode("node7", singletonMap("zone", "zone_3"))) + .add(newNode("node2", singletonMap("zone", "zone_1"))) + ).build(); + newState = strategy.reroute(newState, "reroute"); + + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + + for (RoutingNode node : newState.getRoutingNodes()) { + assertThat(node.size(), equalTo(7)); + } + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(63)); + assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0)); + } + + public void testSingleZoneTwoReplicaLimitsReplicaAllocationOnOverload() { + AllocationService strategy = createAllocationServiceWithAdditionalSettings(org.opensearch.common.collect.Map.of( + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(), 3, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), 10) + ); + + logger.info("Building initial routing table for 'testSingleZoneTwoReplicaLimitsReplicaAllocationOnOverload'"); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(3).numberOfReplicas(2)) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder() + .addAsNew(metadata.index("test")) + .build(); + + ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING + .getDefault(Settings.EMPTY)).metadata(metadata).routingTable(initialRoutingTable).build(); + + logger.info("--> adding three nodes on same rack and do rerouting"); + clusterState = addNodes(clusterState, strategy, "zone1", "node1", "node2", "node3"); + + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(3)); + + logger.info("--> start the shards (primaries)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + logger.info("--> replicas are initializing"); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(3)); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(6)); + + logger.info("--> start the shards (replicas)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + logger.info("--> all shards are started"); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(9)); + + logger.info("--> do another reroute, make sure nothing moves"); + assertThat(strategy.reroute(clusterState, "reroute").routingTable(), sameInstance(clusterState.routingTable())); + + //remove one node to make zone1 skewed + clusterState = removeNodes(clusterState, strategy, randomFrom("node1", "node2", "node3")); + + while (clusterState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + } + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(6)); + assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(3)); + + for (ShardRouting shard : clusterState.getRoutingNodes().shardsWithState(UNASSIGNED)) { + assertEquals(shard.unassignedInfo().getReason(), UnassignedInfo.Reason.NODE_LEFT); + assertFalse(shard.primary()); + } + } + + public void testSingleZoneOneReplicaLimitsReplicaAllocationOnOverload() { + AllocationService strategy = createAllocationServiceWithAdditionalSettings(org.opensearch.common.collect.Map.of( + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(), 5, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), 10, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING.getKey(), true) + ); + + logger.info("Building initial routing table for 'testSingleZoneOneReplicaLimitsReplicaAllocationOnOverload'"); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(20).numberOfReplicas(1)) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder() + .addAsNew(metadata.index("test")) + .build(); + + ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING + .getDefault(Settings.EMPTY)).metadata(metadata).routingTable(initialRoutingTable).build(); + + logger.info("--> adding two nodes on same rack and do rerouting"); + clusterState = addNodes(clusterState, strategy, "zone1", "node1","node2"); + //skewness limit doesn't apply to primary + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(20)); + + logger.info("--> start the shards (primaries)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(20)); + //assert replicas are not assigned but primaries are + logger.info("--> replicas are not initializing"); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(0)); + + for (ShardRouting shard : clusterState.getRoutingNodes().shardsWithState(UNASSIGNED)) { + assertEquals(shard.unassignedInfo().getReason(), UnassignedInfo.Reason.INDEX_CREATED); + assertFalse(shard.primary()); + } + + logger.info("--> do another reroute, make sure nothing moves"); + assertThat(strategy.reroute(clusterState, "reroute").routingTable(), sameInstance(clusterState.routingTable())); + + //add the third and fourth node + clusterState = addNodes(clusterState, strategy, "zone1", "node3", "node4"); + + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(18)); + + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + logger.info("--> replicas are started"); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(38)); + + for (ShardRouting shard : clusterState.getRoutingNodes().shardsWithState(UNASSIGNED)) { + assertEquals(shard.unassignedInfo().getReason(), UnassignedInfo.Reason.INDEX_CREATED); + assertFalse(shard.primary()); + } + + clusterState = addNodes(clusterState, strategy, "zone1", "node5"); + + while (clusterState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + } + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(40)); + assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0)); + + for (RoutingNode node : clusterState.getRoutingNodes()) { + assertThat(node.size(), equalTo(8)); + } + } + + public void testThreeZoneTwoReplicaLimitsReplicaAllocationUnderFullZoneFailure(){ + AllocationService strategy = createAllocationServiceWithAdditionalSettings(org.opensearch.common.collect.Map.of( + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(), 15, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), 20)); + + logger.info("Building initial routing table for 'testThreeZoneTwoReplicaLimitsUnderFullZoneFailure'"); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(20).numberOfReplicas(2)) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder() + .addAsNew(metadata.index("test")) + .build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING + .getDefault(Settings.EMPTY)).metadata(metadata).routingTable(initialRoutingTable).build(); + + logger.info("--> adding five nodes on same zone and do rerouting"); + clusterState = addNodes(clusterState, strategy, "zone1", "node1", "node2", "node3", "node4", "node5"); + clusterState = strategy.reroute(clusterState, "reroute"); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(20)); + + logger.info("--> start the shards (primaries)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + logger.info("--> add five new node in new zone and reroute"); + clusterState = addNodes(clusterState, strategy, "zone2", "node6", "node7", "node8", "node9", "node10"); + + logger.info("--> complete relocation"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + ClusterState newState = addNodes(clusterState, strategy, "zone3", "node11", "node12", "node13", "node14", "node15"); + + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(60)); + assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0)); + + assertThat(newState.getRoutingNodes().node("node11").size(), equalTo(4)); + assertThat(newState.getRoutingNodes().node("node12").size(), equalTo(4)); + assertThat(newState.getRoutingNodes().node("node13").size(), equalTo(4)); + assertThat(newState.getRoutingNodes().node("node14").size(), equalTo(4)); + assertThat(newState.getRoutingNodes().node("node15").size(), equalTo(4)); + + logger.info("--> Remove complete zone3 holding primary and replicas"); + newState = removeNodes(newState, strategy, "node11", "node12", "node13", "node14", "node15"); + + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(50)); + assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(10)); + + for (RoutingNode node : newState.getRoutingNodes()) { + assertThat(node.size(), equalTo(5)); + } + + //add the removed node + newState = ClusterState.builder(newState).nodes(DiscoveryNodes.builder(newState.nodes()) + .add(newNode("node11", singletonMap("zone", "zone3")))) + .build(); + newState = strategy.reroute(newState, "reroute"); + + assertThat(newState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(5)); + newState = startInitializingShardsAndReroute(strategy, newState); + assertThat(newState.getRoutingNodes().node("node11").size(), equalTo(5)); + + //add the removed node + newState = ClusterState.builder(newState).nodes(DiscoveryNodes.builder(newState.nodes()) + .add(newNode("node12", singletonMap("zone", "zone3")))) + .build(); + newState = strategy.reroute(newState, "reroute"); + + assertThat(newState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(5)); + newState = startInitializingShardsAndReroute(strategy, newState); + assertThat(newState.getRoutingNodes().node("node12").size(), equalTo(5)); + + //add the removed node + newState = ClusterState.builder(newState).nodes(DiscoveryNodes.builder(newState.nodes()) + .add(newNode("node13", singletonMap("zone", "zone3")))) + .build(); + newState = strategy.reroute(newState, "reroute"); + + //add the removed node + newState = ClusterState.builder(newState).nodes(DiscoveryNodes.builder(newState.nodes()) + .add(newNode("node14", singletonMap("zone", "zone3")))) + .build(); + newState = strategy.reroute(newState, "reroute"); + + //add the removed node + newState = ClusterState.builder(newState).nodes(DiscoveryNodes.builder(newState.nodes()) + .add(newNode("node15", singletonMap("zone", "zone3")))) + .build(); + newState = strategy.reroute(newState, "reroute"); + + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + assertThat(newState.getRoutingNodes().node("node13").size(), equalTo(4)); + assertThat(newState.getRoutingNodes().node("node12").size(), equalTo(4)); + assertThat(newState.getRoutingNodes().node("node11").size(), equalTo(4)); + //ensure all shards are assigned + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(60)); + assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0)); + } + + private ClusterState removeNodes(ClusterState clusterState, AllocationService allocationService, String... nodeIds) { + DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(clusterState.getNodes()); + org.opensearch.common.collect.List.of(nodeIds).forEach(nodeId -> nodeBuilder.remove(nodeId)); + return allocationService.disassociateDeadNodes(ClusterState.builder(clusterState) + .nodes(nodeBuilder).build(), true, "reroute"); + } + + private ClusterState addNodes(ClusterState clusterState, AllocationService allocationService, String zone, String... nodeIds) { + DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(clusterState.nodes()); + org.opensearch.common.collect.List.of(nodeIds).forEach(nodeId -> nodeBuilder.add(newNode(nodeId, singletonMap("zone", zone)))); + clusterState = ClusterState.builder(clusterState).nodes(nodeBuilder).build(); + return allocationService.reroute(clusterState, "reroute"); + } + + private AllocationService createAllocationServiceWithAdditionalSettings(Map settingsValue) { + return createAllocationService(buildSettings(settingsValue)); + } + + private AllocationService createAllocationServiceWithAdditionalSettings(Map settingsValue, + GatewayAllocator gatewayAllocator) { + return createAllocationService(buildSettings(settingsValue), gatewayAllocator); + } + + private Settings buildSettings(Map settingsValue) { + Settings.Builder settingsBuilder = Settings.builder() + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), 20) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey(), 20) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 20) + .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") + .put("cluster.routing.allocation.awareness.attributes", "zone"); + settingsValue.forEach((k, v) -> { + if (v instanceof Integer) + settingsBuilder.put(k, (Integer)(v)); + else if (v instanceof Boolean) + settingsBuilder.put(k, (Boolean)(v)); + else if (v instanceof String) + settingsBuilder.put(k, (String)(v)); + else { + throw new UnsupportedOperationException("Unsupported type for key :" + k); + } + }); + return settingsBuilder.build(); + } +}