Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

skip zone awareness when auto-expand set to all #16031

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Implement WithFieldName interface in ValuesSourceAggregationBuilder & FieldSortBuilder ([#15916](https://github.com/opensearch-project/OpenSearch/pull/15916))
- Add successfulSearchShardIndices in searchRequestContext ([#15967](https://github.com/opensearch-project/OpenSearch/pull/15967))
- Remove identity-related feature flagged code from the RestController ([#15430](https://github.com/opensearch-project/OpenSearch/pull/15430))
- Skip zone awareness when auto-expand set to all ([#16031](https://github.com/opensearch-project/OpenSearch/pull/16031))

### Dependencies
- Bump `com.azure:azure-identity` from 1.13.0 to 1.13.2 ([#15578](https://github.com/opensearch-project/OpenSearch/pull/15578))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,4 +504,68 @@ public void testThreeZoneOneReplicaWithForceZoneValueAndLoadAwareness() throws E
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2 * numOfShards * (numOfReplica + 1)));
assertThat(health.isTimedOut(), equalTo(false));
}

public void testAwarenessZonesWithAutoExpand() {
Settings commonSettings = Settings.builder()
.put(AwarenessReplicaBalance.CLUSTER_ROUTING_ALLOCATION_AWARENESS_BALANCE_SETTING.getKey(), true)
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.getKey() + "zone.values", "a")
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone")
.build();

logger.info("--> starting 2 nodes on same zone");
List<String> nodes = internalCluster().startNodes(
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build(),
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build()
);
String A = nodes.get(0);
String B = nodes.get(1);

logger.info("--> waiting for nodes to form a cluster");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));

createIndex(
"test",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2)
.put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-all")
.build()
);

if (randomBoolean()) {
assertAcked(client().admin().indices().prepareClose("test"));
}

logger.info("--> waiting for shards to be allocated");
health = client().admin()
.cluster()
.prepareHealth()
.setIndices("test")
.setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus()
.setWaitForNoRelocatingShards(true)
.execute()
.actionGet();
assertThat(health.isTimedOut(), equalTo(false));

ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(4));

final Map<String, Integer> counts = new HashMap<>();
int replicaCount = 0;

for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) {
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
for (ShardRouting shardRouting : indexShardRoutingTable) {
if (shardRouting.primary()) {
replicaCount++;
}
counts.merge(clusterState.nodes().get(shardRouting.currentNodeId()).getName(), 1, Integer::sum);
}
}
}
assertThat(counts.get(A), anyOf(equalTo(1), equalTo(2)));
assertThat(counts.get(B), anyOf(equalTo(1), equalTo(2)));
assertThat(replicaCount, equalTo(2));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ public boolean isEnabled() {
return enabled;
}

public boolean autoExpandToAll() {
return maxReplicas == Integer.MAX_VALUE;
}

private OptionalInt getDesiredNumberOfReplicas(IndexMetadata indexMetadata, RoutingAllocation allocation) {
if (enabled) {
int numMatchingDataNodes = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.function.Function;

import static java.util.Collections.emptyList;
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING;

/**
* This {@link AllocationDecider} controls shard allocation based on
Expand Down Expand Up @@ -160,6 +161,11 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout
}

IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shardRouting.index());

if (INDEX_AUTO_EXPAND_REPLICAS_SETTING.get(indexMetadata.getSettings()).autoExpandToAll()) {
return allocation.decision(Decision.YES, NAME, "allocation awareness is ignored, this index is set to auto-expand to all");
}

int shardCount = indexMetadata.getNumberOfReplicas() + 1; // 1 for primary
for (String awarenessAttribute : awarenessAttributes) {
// the node the shard exists on must be associated with an awareness attribute.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1063,4 +1063,39 @@ public void testAllocationExplainForUnassignedShardsWithUnbalancedZones() {

}
}

public void testIgnoredByAutoExpandReplicasToAll() {
final Settings settings = Settings.builder()
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone")
.build();

final AllocationService strategy = createAllocationService(settings);

final IndexMetadata.Builder metadataBuilder = IndexMetadata.builder("test")
.settings(
settings(Version.CURRENT).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 100)
.put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-all")
);

final Metadata metadata = Metadata.builder().put(metadataBuilder).build();

final DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(newNode("A-0", singletonMap("zone", "a")))
.add(newNode("A-1", singletonMap("zone", "a")))
.add(newNode("A-2", singletonMap("zone", "a")))
.add(newNode("B-0", singletonMap("zone", "b")))
.build();

final ClusterState clusterState = applyStartedShardsUntilNoChange(
ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(Settings.EMPTY))
.metadata(metadata)
.routingTable(RoutingTable.builder().addAsNew(metadata.index("test")).build())
.nodes(nodes)
.build(),
strategy
);

assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0));
}
}
Loading