Skip to content

Commit

Permalink
Skip zone awareness when auto-expand set to all
Browse files Browse the repository at this point in the history
Signed-off-by: amberzsy <[email protected]>
  • Loading branch information
zshuyi authored and amberzsy committed Sep 13, 2024
1 parent 12ff5ed commit d25ca2a
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- MultiTermQueries in keyword fields now default to `indexed` approach and gated behind cluster setting ([#15637](https://github.com/opensearch-project/OpenSearch/pull/15637))
- [Workload Management] QueryGroup resource cancellation framework changes ([#15651](https://github.com/opensearch-project/OpenSearch/pull/15651))
- Fallback to Remote cluster-state on Term-Version check mismatch - ([#15424](https://github.com/opensearch-project/OpenSearch/pull/15424))
- Skip zone awareness when auto-expand set to all - ([#15424](https://github.com/opensearch-project/OpenSearch/pull/14619))

### Dependencies
- Bump `com.azure:azure-identity` from 1.13.0 to 1.13.2 ([#15578](https://github.com/opensearch-project/OpenSearch/pull/15578))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,4 +504,68 @@ public void testThreeZoneOneReplicaWithForceZoneValueAndLoadAwareness() throws E
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2 * numOfShards * (numOfReplica + 1)));
assertThat(health.isTimedOut(), equalTo(false));
}

public void testAwarenessZonesWithAutoExpand() {
Settings commonSettings = Settings.builder()
.put(AwarenessReplicaBalance.CLUSTER_ROUTING_ALLOCATION_AWARENESS_BALANCE_SETTING.getKey(), true)
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.getKey() + "zone.values", "a")
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone")
.build();

logger.info("--> starting 2 nodes on same zone");
List<String> nodes = internalCluster().startNodes(
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build(),
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build()
);
String A = nodes.get(0);
String B = nodes.get(1);

logger.info("--> waiting for nodes to form a cluster");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));

createIndex(
"test",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2)
.put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-all")
.build()
);

if (randomBoolean()) {
assertAcked(client().admin().indices().prepareClose("test"));
}

logger.info("--> waiting for shards to be allocated");
health = client().admin()
.cluster()
.prepareHealth()
.setIndices("test")
.setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus()
.setWaitForNoRelocatingShards(true)
.execute()
.actionGet();
assertThat(health.isTimedOut(), equalTo(false));

ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(4));

final Map<String, Integer> counts = new HashMap<>();
int replicaCount = 0;

for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) {
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
for (ShardRouting shardRouting : indexShardRoutingTable) {
if (shardRouting.primary()) {
replicaCount++;
}
counts.merge(clusterState.nodes().get(shardRouting.currentNodeId()).getName(), 1, Integer::sum);
}
}
}
assertThat(counts.get(A), anyOf(equalTo(1), equalTo(2)));
assertThat(counts.get(B), anyOf(equalTo(1), equalTo(2)));
assertThat(replicaCount, equalTo(2));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ public boolean isEnabled() {
return enabled;
}

public boolean autoExpandToAll() {
return maxReplicas == Integer.MAX_VALUE;
}

private OptionalInt getDesiredNumberOfReplicas(IndexMetadata indexMetadata, RoutingAllocation allocation) {
if (enabled) {
int numMatchingDataNodes = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.function.Function;

import static java.util.Collections.emptyList;
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING;

/**
* This {@link AllocationDecider} controls shard allocation based on
Expand Down Expand Up @@ -160,6 +161,11 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout
}

IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shardRouting.index());

if (INDEX_AUTO_EXPAND_REPLICAS_SETTING.get(indexMetadata.getSettings()).autoExpandToAll()) {
return allocation.decision(Decision.YES, NAME, "allocation awareness is ignored, this index is set to auto-expand to all");
}

int shardCount = indexMetadata.getNumberOfReplicas() + 1; // 1 for primary
for (String awarenessAttribute : awarenessAttributes) {
// the node the shard exists on must be associated with an awareness attribute.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1063,4 +1063,39 @@ public void testAllocationExplainForUnassignedShardsWithUnbalancedZones() {

}
}

public void testIgnoredByAutoExpandReplicasToAll() {
final Settings settings = Settings.builder()
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone")
.build();

final AllocationService strategy = createAllocationService(settings);

final IndexMetadata.Builder metadataBuilder = IndexMetadata.builder("test")
.settings(
settings(Version.CURRENT).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 100)
.put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-all")
);

final Metadata metadata = Metadata.builder().put(metadataBuilder).build();

final DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(newNode("A-0", singletonMap("zone", "a")))
.add(newNode("A-1", singletonMap("zone", "a")))
.add(newNode("A-2", singletonMap("zone", "a")))
.add(newNode("B-0", singletonMap("zone", "b")))
.build();

final ClusterState clusterState = applyStartedShardsUntilNoChange(
ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(Settings.EMPTY))
.metadata(metadata)
.routingTable(RoutingTable.builder().addAsNew(metadata.index("test")).build())
.nodes(nodes)
.build(),
strategy
);

assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0));
}
}

0 comments on commit d25ca2a

Please sign in to comment.