-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[RW Separation] Introduce allocation filter to control placement of s…
…earch only replicas (#15455) * Introduce allocation filter to control placement of search only replicas Signed-off-by: Marc Handalian <[email protected]> * Add a new decider rather than updating the existing FilterAllocationDecider Signed-off-by: Marc Handalian <[email protected]> * Fix license header and description on SearchReplicaAllocationDecider Signed-off-by: Marc Handalian <[email protected]> * Pr feedback. Signed-off-by: Marc Handalian <[email protected]> * Fix class name to pass precommit checks Signed-off-by: Marc Handalian <[email protected]> * Refactor all search replica create/update tests to a single OpenSearchSingleNodeTestCase. Signed-off-by: Marc Handalian <[email protected]> * remove changelog entry Signed-off-by: Marc Handalian <[email protected]> --------- Signed-off-by: Marc Handalian <[email protected]>
- Loading branch information
Showing
9 changed files
with
546 additions
and
133 deletions.
There are no files selected for viewing
125 changes: 125 additions & 0 deletions
125
...lusterTest/java/org/opensearch/cluster/allocation/SearchReplicaFilteringAllocationIT.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.cluster.allocation; | ||
|
||
import org.opensearch.cluster.metadata.IndexMetadata; | ||
import org.opensearch.cluster.routing.IndexShardRoutingTable; | ||
import org.opensearch.cluster.routing.ShardRouting; | ||
import org.opensearch.common.settings.Settings; | ||
import org.opensearch.common.util.FeatureFlags; | ||
import org.opensearch.indices.replication.common.ReplicationType; | ||
import org.opensearch.test.OpenSearchIntegTestCase; | ||
|
||
import java.util.List; | ||
import java.util.stream.Collectors; | ||
|
||
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; | ||
import static org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING; | ||
|
||
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) | ||
public class SearchReplicaFilteringAllocationIT extends OpenSearchIntegTestCase { | ||
|
||
@Override | ||
protected Settings featureFlagSettings() { | ||
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build(); | ||
} | ||
|
||
public void testSearchReplicaDedicatedIncludes() { | ||
List<String> nodesIds = internalCluster().startNodes(3); | ||
final String node_0 = nodesIds.get(0); | ||
final String node_1 = nodesIds.get(1); | ||
final String node_2 = nodesIds.get(2); | ||
assertEquals(3, cluster().size()); | ||
|
||
client().admin() | ||
.cluster() | ||
.prepareUpdateSettings() | ||
.setTransientSettings( | ||
Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", node_1 + "," + node_0) | ||
) | ||
.execute() | ||
.actionGet(); | ||
|
||
createIndex( | ||
"test", | ||
Settings.builder() | ||
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) | ||
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) | ||
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1) | ||
.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) | ||
.build() | ||
); | ||
ensureGreen("test"); | ||
// ensure primary is not on node 0 or 1, | ||
IndexShardRoutingTable routingTable = getRoutingTable(); | ||
assertEquals(node_2, getNodeName(routingTable.primaryShard().currentNodeId())); | ||
|
||
String existingSearchReplicaNode = getNodeName(routingTable.searchOnlyReplicas().get(0).currentNodeId()); | ||
String emptyAllowedNode = existingSearchReplicaNode.equals(node_0) ? node_1 : node_0; | ||
|
||
// set the included nodes to the other open node, search replica should relocate to that node. | ||
client().admin() | ||
.cluster() | ||
.prepareUpdateSettings() | ||
.setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", emptyAllowedNode)) | ||
.execute() | ||
.actionGet(); | ||
ensureGreen("test"); | ||
|
||
routingTable = getRoutingTable(); | ||
assertEquals(node_2, getNodeName(routingTable.primaryShard().currentNodeId())); | ||
assertEquals(emptyAllowedNode, getNodeName(routingTable.searchOnlyReplicas().get(0).currentNodeId())); | ||
} | ||
|
||
public void testSearchReplicaDedicatedIncludes_DoNotAssignToOtherNodes() { | ||
List<String> nodesIds = internalCluster().startNodes(3); | ||
final String node_0 = nodesIds.get(0); | ||
final String node_1 = nodesIds.get(1); | ||
final String node_2 = nodesIds.get(2); | ||
assertEquals(3, cluster().size()); | ||
|
||
// set filter on 1 node and set search replica count to 2 - should leave 1 unassigned | ||
client().admin() | ||
.cluster() | ||
.prepareUpdateSettings() | ||
.setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", node_1)) | ||
.execute() | ||
.actionGet(); | ||
|
||
logger.info("--> creating an index with no replicas"); | ||
createIndex( | ||
"test", | ||
Settings.builder() | ||
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) | ||
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) | ||
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 2) | ||
.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) | ||
.build() | ||
); | ||
ensureYellowAndNoInitializingShards("test"); | ||
IndexShardRoutingTable routingTable = getRoutingTable(); | ||
assertEquals(2, routingTable.searchOnlyReplicas().size()); | ||
List<ShardRouting> assignedSearchShards = routingTable.searchOnlyReplicas() | ||
.stream() | ||
.filter(ShardRouting::assignedToNode) | ||
.collect(Collectors.toList()); | ||
assertEquals(1, assignedSearchShards.size()); | ||
assertEquals(node_1, getNodeName(assignedSearchShards.get(0).currentNodeId())); | ||
assertEquals(1, routingTable.searchOnlyReplicas().stream().filter(ShardRouting::unassigned).count()); | ||
} | ||
|
||
private IndexShardRoutingTable getRoutingTable() { | ||
IndexShardRoutingTable routingTable = getClusterState().routingTable().index("test").getShards().get(0); | ||
return routingTable; | ||
} | ||
|
||
private String getNodeName(String id) { | ||
return getClusterState().nodes().get(id).getName(); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
99 changes: 99 additions & 0 deletions
99
...ava/org/opensearch/cluster/routing/allocation/decider/SearchReplicaAllocationDecider.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.cluster.routing.allocation.decider; | ||
|
||
import org.opensearch.cluster.node.DiscoveryNode; | ||
import org.opensearch.cluster.node.DiscoveryNodeFilters; | ||
import org.opensearch.cluster.routing.RoutingNode; | ||
import org.opensearch.cluster.routing.ShardRouting; | ||
import org.opensearch.cluster.routing.allocation.RoutingAllocation; | ||
import org.opensearch.common.settings.ClusterSettings; | ||
import org.opensearch.common.settings.Setting; | ||
import org.opensearch.common.settings.Setting.Property; | ||
import org.opensearch.common.settings.Settings; | ||
import org.opensearch.node.remotestore.RemoteStoreNodeService; | ||
|
||
import java.util.Map; | ||
|
||
import static org.opensearch.cluster.node.DiscoveryNodeFilters.IP_VALIDATOR; | ||
import static org.opensearch.cluster.node.DiscoveryNodeFilters.OpType.OR; | ||
|
||
/** | ||
* This allocation decider is similar to FilterAllocationDecider but provides | ||
* the option to filter specifically for search replicas. | ||
* The filter behaves similar to an include for any defined node attribute. | ||
* A search replica can be allocated to only nodes with one of the specified attributes while | ||
* other shard types will be rejected from nodes with any othe attributes. | ||
* @opensearch.internal | ||
*/ | ||
public class SearchReplicaAllocationDecider extends AllocationDecider { | ||
|
||
public static final String NAME = "filter"; | ||
private static final String SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX = "cluster.routing.allocation.search.replica.dedicated.include"; | ||
public static final Setting.AffixSetting<String> SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING = Setting.prefixKeySetting( | ||
SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX + ".", | ||
key -> Setting.simpleString(key, value -> IP_VALIDATOR.accept(key, value), Property.Dynamic, Property.NodeScope) | ||
); | ||
|
||
private volatile DiscoveryNodeFilters searchReplicaIncludeFilters; | ||
|
||
private volatile RemoteStoreNodeService.Direction migrationDirection; | ||
private volatile RemoteStoreNodeService.CompatibilityMode compatibilityMode; | ||
|
||
public SearchReplicaAllocationDecider(Settings settings, ClusterSettings clusterSettings) { | ||
setSearchReplicaIncludeFilters(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getAsMap(settings)); | ||
clusterSettings.addAffixMapUpdateConsumer( | ||
SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING, | ||
this::setSearchReplicaIncludeFilters, | ||
(a, b) -> {} | ||
); | ||
} | ||
|
||
@Override | ||
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { | ||
return shouldFilter(shardRouting, node.node(), allocation); | ||
} | ||
|
||
@Override | ||
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { | ||
return shouldFilter(shardRouting, node.node(), allocation); | ||
} | ||
|
||
private Decision shouldFilter(ShardRouting shardRouting, DiscoveryNode node, RoutingAllocation allocation) { | ||
if (searchReplicaIncludeFilters != null) { | ||
final boolean match = searchReplicaIncludeFilters.match(node); | ||
if (match == false && shardRouting.isSearchOnly()) { | ||
return allocation.decision( | ||
Decision.NO, | ||
NAME, | ||
"node does not match shard setting [%s] filters [%s]", | ||
SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX, | ||
searchReplicaIncludeFilters | ||
); | ||
} | ||
// filter will only apply to search replicas | ||
if (shardRouting.isSearchOnly() == false && match) { | ||
return allocation.decision( | ||
Decision.NO, | ||
NAME, | ||
"only search replicas can be allocated to node with setting [%s] filters [%s]", | ||
SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX, | ||
searchReplicaIncludeFilters | ||
); | ||
} | ||
} | ||
return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require filters"); | ||
} | ||
|
||
private void setSearchReplicaIncludeFilters(Map<String, String> filters) { | ||
searchReplicaIncludeFilters = DiscoveryNodeFilters.trimTier( | ||
DiscoveryNodeFilters.buildOrUpdateFromKeyValue(searchReplicaIncludeFilters, OR, filters) | ||
); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.