Skip to content

Commit

Permalink
support index level allocation filtering
Browse files Browse the repository at this point in the history
Signed-off-by: panguixin <[email protected]>
  • Loading branch information
bugmakerrrrrr committed Jan 8, 2024
1 parent 89e4727 commit 48eedd4
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
Expand All @@ -28,14 +33,17 @@
import org.opensearch.cluster.routing.GroupShardsIterator;
import org.opensearch.cluster.routing.ShardIterator;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.Priority;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.index.Index;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.indices.IndicesService;
import org.opensearch.monitor.fs.FsInfo;
import org.opensearch.node.Node;
import org.opensearch.repositories.fs.FsRepository;
Expand Down Expand Up @@ -234,6 +242,47 @@ public void testSearchableSnapshotAllocationForLocalAndRemoteShardsOnSameNode()
assertDocCount(indexName, 100L);
}

public void testSearchableSnapshotAllocationFilterSettings() throws Exception {
final int numShardsIndex = randomIntBetween(3, 6);
final String indexName = "test-idx";
final String restoredIndexName = indexName + "-copy";
final String repoName = "test-repo";
final String snapshotName = "test-snap";
final Client client = client();

internalCluster().ensureAtLeastNumSearchAndDataNodes(numShardsIndex);
createIndexWithDocsAndEnsureGreen(numShardsIndex, 1, 100, indexName);
createRepositoryWithSettings(null, repoName);
takeSnapshot(client, snapshotName, repoName, indexName);

restoreSnapshotAndEnsureGreen(client, snapshotName, repoName);
assertRemoteSnapshotIndexSettings(client, restoredIndexName);
final Set<String> dataNodes = new HashSet<>();
internalCluster().getDataNodeInstances(Node.class).forEach(node -> dataNodes.add(node.getNodeEnvironment().nodeId()));

final List<String> excludeNodes = new ArrayList<>();
for (int i = 0; i < numShardsIndex; ++i) {
String pickedNode = randomFrom(dataNodes);
assertIndexAssignedToNodeOrNot(restoredIndexName, pickedNode, true);
assertTrue(client.admin().indices().prepareUpdateSettings(restoredIndexName)
.setSettings(Settings.builder().put("index.routing.allocation.exclude._id", pickedNode)).execute().actionGet().isAcknowledged());
ClusterHealthResponse clusterHealthResponse = client.admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true).setTimeout(new TimeValue(5, TimeUnit.MINUTES)).execute().actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
assertIndexAssignedToNodeOrNot(restoredIndexName, pickedNode, false);
assertIndexAssignedToNodeOrNot(indexName, pickedNode, true);
}
}

private void assertIndexAssignedToNodeOrNot(String index, String node, boolean assigned) {
final ClusterState state = client().admin().cluster().prepareState().get().getState();
if (assigned) {
state.getRoutingTable().allShards(index).stream().anyMatch(shard -> shard.currentNodeId().equals(node));
} else {
state.getRoutingTable().allShards(index).stream().noneMatch(shard -> shard.currentNodeId().equals(node));
}
}

/**
* Tests the functionality of remote shard allocation to
* ensure it can handle node drops for failover scenarios and the cluster gets back to a healthy state when
Expand Down Expand Up @@ -341,11 +390,15 @@ public void testDeleteSearchableSnapshotBackingIndex() throws Exception {
}

private void createIndexWithDocsAndEnsureGreen(int numReplicasIndex, int numOfDocs, String indexName) throws InterruptedException {
createIndexWithDocsAndEnsureGreen(1, numReplicasIndex, numOfDocs, indexName);
}

private void createIndexWithDocsAndEnsureGreen(int numShardsIndex, int numReplicasIndex, int numOfDocs, String indexName) throws InterruptedException {
createIndex(
indexName,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, Integer.toString(numReplicasIndex))
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1")
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numReplicasIndex)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShardsIndex)
.put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.FS.getSettingsKey())
.build()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public class TransportUpdateSettingsAction extends TransportClusterManagerNodeAc
"index.number_of_replicas"
);

private final static String[] ALLOWLIST_REMOTE_SNAPSHOT_SETTINGS_PREFIXES = { "index.search.slowlog" };
private final static String[] ALLOWLIST_REMOTE_SNAPSHOT_SETTINGS_PREFIXES = { "index.search.slowlog", "index.routing.allocation" };

private final MetadataUpdateSettingsService updateSettingsService;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -729,23 +729,6 @@ assert node(failedShard.currentNodeId()).getByShardId(failedShard.shardId()) ==
+ " was matched but wasn't removed";
}

public void swapPrimaryWithReplica(
Logger logger,
ShardRouting primaryShard,
ShardRouting replicaShard,
RoutingChangesObserver changes
) {
assert primaryShard.primary() : "Invalid primary shard provided";
assert !replicaShard.primary() : "Invalid Replica shard provided";

ShardRouting newPrimary = primaryShard.moveActivePrimaryToReplica();
ShardRouting newReplica = replicaShard.moveActiveReplicaToPrimary();
updateAssigned(primaryShard, newPrimary);
updateAssigned(replicaShard, newReplica);
logger.info("Swap relocation performed for shard [{}]", newPrimary.shortSummary());
changes.replicaPromoted(newPrimary);
}

private void unassignPrimaryAndPromoteActiveReplicaIfExists(
ShardRouting failedShard,
UnassignedInfo unassignedInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,31 +84,50 @@ void moveShards() {
Queue<RoutingNode> excludedNodes = new ArrayDeque<>();
classifyNodesForShardMovement(eligibleNodes, excludedNodes);

if (excludedNodes.isEmpty()) {
logger.debug("No excluded nodes found. Returning...");
return;
// move shards that cannot remain on eligible nodes
final List<ShardRouting> forceMoveShards = new ArrayList<>();
eligibleNodes.forEach(sourceNode -> {
for (final ShardRouting shardRouting : sourceNode) {
if (ineligibleForMove(shardRouting)) {
continue;
}

if (allocation.deciders().canRemain(shardRouting, sourceNode, allocation) == Decision.NO) {
forceMoveShards.add(shardRouting);
}
}
});
for (final ShardRouting shard : forceMoveShards) {
if (eligibleNodes.isEmpty()) {
logger.trace("there are no eligible nodes available, return");
return;
}

tryShardMovementToEligibleNode(eligibleNodes, shard);
}

// move shards that are currently assigned on excluded nodes
while (!eligibleNodes.isEmpty() && !excludedNodes.isEmpty()) {
RoutingNode sourceNode = excludedNodes.poll();
for (ShardRouting ineligibleShard : sourceNode) {
if (ineligibleShard.started() == false) {
continue;
}

if (!RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(ineligibleShard, allocation))) {
for (final ShardRouting ineligibleShard : sourceNode) {
if (ineligibleForMove(ineligibleShard)) {
continue;
}

if (eligibleNodes.isEmpty()) {
break;
logger.trace("there are no eligible nodes available, return");
return;
}

tryShardMovementToEligibleNode(eligibleNodes, ineligibleShard);
}
}
}

private boolean ineligibleForMove(ShardRouting shard) {
return !shard.started() || !RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shard, allocation));
}

/**
* Classifies the nodes into eligible and excluded depending on whether node is able or unable for shard assignment
* @param eligibleNodes contains the list of classified nodes eligible to accept shards
Expand Down Expand Up @@ -145,10 +164,23 @@ private void classifyNodesForShardMovement(Queue<RoutingNode> eligibleNodes, Que
* @param shard the ineligible shard to be moved
*/
private void tryShardMovementToEligibleNode(Queue<RoutingNode> eligibleNodes, ShardRouting shard) {
Set<String> nodesCheckedForShard = new HashSet<>();
final Set<String> nodesCheckedForShard = new HashSet<>();
int numNodesToCheck = eligibleNodes.size();
while (!eligibleNodes.isEmpty()) {
RoutingNode targetNode = eligibleNodes.poll();
Decision currentShardDecision = allocation.deciders().canAllocate(shard, targetNode, allocation);
assert numNodesToCheck > 0;
final RoutingNode targetNode = eligibleNodes.poll();
--numNodesToCheck;
// skip the node that the target shard is currently allocated on
if (targetNode.nodeId().equals(shard.currentNodeId())) {
assert nodesCheckedForShard.add(targetNode.nodeId());
eligibleNodes.offer(targetNode);
if (numNodesToCheck == 0) {
return;
}
continue;
}

final Decision currentShardDecision = allocation.deciders().canAllocate(shard, targetNode, allocation);

if (currentShardDecision.type() == Decision.Type.YES) {
if (logger.isDebugEnabled()) {
Expand All @@ -166,7 +198,7 @@ private void tryShardMovementToEligibleNode(Queue<RoutingNode> eligibleNodes, Sh
allocation.changes()
);
eligibleNodes.offer(targetNode);
break;
return;
} else {
if (logger.isTraceEnabled()) {
logger.trace(
Expand All @@ -177,18 +209,19 @@ private void tryShardMovementToEligibleNode(Queue<RoutingNode> eligibleNodes, Sh
);
}

Decision nodeLevelDecision = allocation.deciders().canAllocateAnyShardToNode(targetNode, allocation);
final Decision nodeLevelDecision = allocation.deciders().canAllocateAnyShardToNode(targetNode, allocation);
if (nodeLevelDecision.type() == Decision.Type.YES) {
logger.debug("Node: [{}] can still accept shards. Adding it back to the queue.", targetNode.nodeId());
eligibleNodes.offer(targetNode);
nodesCheckedForShard.add(targetNode.nodeId());
assert nodesCheckedForShard.add(targetNode.nodeId());
} else {
logger.debug("Node: [{}] cannot accept any more shards. Removing it from queue.", targetNode.nodeId());
}

// Break out if all nodes in the queue have been checked for this shard
if (eligibleNodes.stream().allMatch(rn -> nodesCheckedForShard.contains(rn.nodeId()))) {
break;
// Break out if all eligible nodes have been examined
if (numNodesToCheck == 0) {
assert eligibleNodes.stream().allMatch(rn -> nodesCheckedForShard.contains(rn.nodeId()));
return;
}
}
}
Expand Down Expand Up @@ -485,6 +518,10 @@ private void tryRebalanceNode(
continue;
}

if (targetNode.getByShardId(shard.shardId()) != null) {
continue;
}

// Try relocate the shard on the target node
Decision rebalanceDecision = tryRelocateShard(shard, targetNode);

Expand Down Expand Up @@ -522,21 +559,10 @@ private void tryRebalanceNode(
}

/**
* For every primary shard for which this method is invoked,
* swap is attempted with the destination node in case replica shard is present.
* In case replica is not present, relocation of the shard id performed.
* For every primary shard for which this method is invoked, relocation of the shard id performed.
*/
private Decision tryRelocateShard(ShardRouting shard, RoutingNode destinationNode) {
// Check if there is already a replica for the shard on the destination node.
// Then we can directly swap the replica with the primary shards.
// Invariant: We only allow swap relocation on remote shards.
ShardRouting replicaShard = destinationNode.getByShardId(shard.shardId());
if (replicaShard != null) {
assert !replicaShard.primary() : "Primary Shard found while expected Replica during shard rebalance";
return executeSwapShard(shard, replicaShard, allocation);
}

// Since no replica present on the destinationNode; try relocating the shard to the destination node
assert destinationNode.getByShardId(shard.shardId()) == null;
Decision allocationDecision = allocation.deciders().canAllocate(shard, destinationNode, allocation);
Decision rebalanceDecision = allocation.deciders().canRebalance(shard, allocation);
logger.trace(
Expand Down Expand Up @@ -566,15 +592,6 @@ private Decision tryRelocateShard(ShardRouting shard, RoutingNode destinationNod
return Decision.NO;
}

private Decision executeSwapShard(ShardRouting primaryShard, ShardRouting replicaShard, RoutingAllocation allocation) {
if (!replicaShard.started()) {
return new Decision.Single(Decision.Type.NO);
}

allocation.routingNodes().swapPrimaryWithReplica(logger, primaryShard, replicaShard, allocation.changes());
return new Decision.Single(Decision.Type.YES);
}

private void failUnattemptedShards() {
RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = routingNodes.unassigned().iterator();
while (unassignedIterator.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,40 +206,4 @@ public void testInterleavedShardIteratorReplicaFirst() {
}
assertEquals(shardCount, this.totalNumberOfShards);
}

public void testSwapPrimaryWithReplica() {
// Initialize all the shards for test index 1 and 2
initPrimaries();
startInitializingShards(TEST_INDEX_1);
startInitializingShards(TEST_INDEX_1);
startInitializingShards(TEST_INDEX_2);
startInitializingShards(TEST_INDEX_2);

// Create primary shard count imbalance between two nodes
final RoutingNodes routingNodes = this.clusterState.getRoutingNodes();
final RoutingNode node0 = routingNodes.node("node0");
final RoutingNode node1 = routingNodes.node("node1");
final List<ShardRouting> shardRoutingList = node0.shardsWithState(TEST_INDEX_1, ShardRoutingState.STARTED);
final RoutingChangesObserver routingChangesObserver = Mockito.mock(RoutingChangesObserver.class);
int swaps = 0;

for (ShardRouting routing : shardRoutingList) {
if (routing.primary()) {
ShardRouting swap = node1.getByShardId(routing.shardId());
routingNodes.swapPrimaryWithReplica(logger, routing, swap, routingChangesObserver);
swaps++;
}
}
Mockito.verify(routingChangesObserver, Mockito.times(swaps)).replicaPromoted(Mockito.any());

final List<ShardRouting> shards = node1.shardsWithState(TEST_INDEX_1, ShardRoutingState.STARTED);
int shardCount = 0;
for (ShardRouting shard : shards) {
if (shard.primary()) {
shardCount++;
}
}

assertTrue(shardCount >= swaps);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void testExcludeNodeIdMoveBlocked() {

/**
* Test move operations for index level allocation settings.
* Supported for local indices, not supported for remote indices.
* Supported for local indices and remote indices.
*/
public void testIndexLevelExclusions() throws InterruptedException {
int localOnlyNodes = 7;
Expand Down Expand Up @@ -102,8 +102,7 @@ public void testIndexLevelExclusions() throws InterruptedException {
// No shard of updated local index should be on excluded local capable node
assertTrue(routingTable.allShards(localIndex).stream().noneMatch(shard -> shard.currentNodeId().equals(excludedLocalOnlyNode)));

// Since remote index shards are untouched, at least one shard should
// continue to stay on the excluded remote capable node
assertTrue(routingTable.allShards(remoteIndex).stream().anyMatch(shard -> shard.currentNodeId().equals(excludedRemoteCapableNode)));
// No shard of updated remote index should be on excluded remote capable node
assertTrue(routingTable.allShards(remoteIndex).stream().noneMatch(shard -> shard.currentNodeId().equals(excludedRemoteCapableNode)));
}
}

0 comments on commit 48eedd4

Please sign in to comment.