Skip to content

Commit

Permalink
Mark restored indices as remote backed during migration
Browse files Browse the repository at this point in the history
Signed-off-by: Lakshya Taragi <[email protected]>
  • Loading branch information
ltaragi committed Mar 19, 2024
1 parent f9cd3e3 commit e605b95
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,22 @@
import java.util.Map;
import java.util.Optional;

import static org.opensearch.cluster.metadata.IndexMetadata.*;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.Direction.REMOTE_STORE;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.Direction.NONE;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode.MIXED;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode.STRICT;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.Direction.NONE;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.Direction.REMOTE_STORE;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false)
public class RemoteStoreMigrationAllocationIT extends MigrationBaseTestCase {

private static final String TEST_INDEX = "test_index";
protected static final String NAME = "remote_store_migration";
public static final String TEST_INDEX = "test_index";
public static final String NAME = "remote_store_migration";

private final ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
private static final ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
private Client client;

// tests for primary shard copy allocation with MIXED mode and REMOTE_STORE direction
Expand Down Expand Up @@ -73,7 +73,10 @@ public void testDontAllocateNewPrimaryShardOnNonRemoteNodeForMixedModeAndRemoteS
Decision decision = getDecisionForTargetNode(nonRemoteNode, true, true, false);
Decision.Type type = Decision.Type.NO;
assertEquals(type, decision.type());
assertEquals("[remote_store migration_direction]: primary shard copy can not be allocated to a non-remote node", decision.getExplanation().toLowerCase(Locale.ROOT));
assertEquals(
"[remote_store migration_direction]: primary shard copy can not be allocated to a non-remote node",
decision.getExplanation().toLowerCase(Locale.ROOT)
);

logger.info(" --> attempt allocation");
attemptAllocation(nonRemoteNodeName);
Expand Down Expand Up @@ -275,7 +278,10 @@ public void testAllocateNewReplicaShardOnNonRemoteNodeIfPrimaryShardOnRemoteNode

Decision.Type type = Decision.Type.YES;
assertEquals(type, decision.type());
assertEquals("[remote_store migration_direction]: replica shard copy can be allocated to a non-remote node", decision.getExplanation().toLowerCase(Locale.ROOT));
assertEquals(
"[remote_store migration_direction]: replica shard copy can be allocated to a non-remote node",
decision.getExplanation().toLowerCase(Locale.ROOT)
);

logger.info(" --> allocate replica shard on non-remote node");
attemptAllocation(nonRemoteNodeName);
Expand Down Expand Up @@ -304,8 +310,7 @@ public void testAlwaysAllocateNewShardForStrictMode() throws Exception {
DiscoveryNode remoteNode2 = assertNodeInCluster(remoteNodeName2);
nodes.add(remoteNode1);
nodes.add(remoteNode2);
}
else {
} else {
initializeCluster(false);
setClusterMode(STRICT.mode);
addRemote = false;
Expand Down Expand Up @@ -364,6 +369,7 @@ public void testAlwaysAllocateNewShardForStrictMode() throws Exception {
}

// test for remote store backed index

public void testDontAllocateToNonRemoteNodeForRemoteStoreBackedIndex() throws Exception {
logger.info(" --> initialize cluster with remote master node");
initializeCluster(true);
Expand All @@ -382,8 +388,7 @@ public void testDontAllocateToNonRemoteNodeForRemoteStoreBackedIndex() throws Ex
logger.info(" --> verify expected decision for allocating a new shard on a non-remote node");
if (isReplicaAllocation) {
prepareIndexWithAllocatedPrimary(remoteNode, Optional.empty());
}
else {
} else {
prepareIndexWithoutReplica(Optional.empty());
}

Expand Down Expand Up @@ -430,20 +435,20 @@ public void initializeCluster(boolean remoteClusterManager) {
}

// set the compatibility mode of cluster [strict, mixed]
public void setClusterMode(String mode) {
public static void setClusterMode(String mode) {
updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), mode));
assertAcked(client.admin().cluster().updateSettings(updateSettingsRequest).actionGet());
assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
}

// set the migration direction for cluster [remote_store, docrep, none]
public void setDirection(String direction) {
public static void setDirection(String direction) {
updateSettingsRequest.persistentSettings(Settings.builder().put(MIGRATION_DIRECTION_SETTING.getKey(), direction));
assertAcked(client.admin().cluster().updateSettings(updateSettingsRequest).actionGet());
assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
}

// verify that the given nodeName exists in cluster
public DiscoveryNode assertNodeInCluster(String nodeName) {
Map<String, DiscoveryNode> nodes = client.admin().cluster().prepareState().get().getState().nodes().getNodes();
public static DiscoveryNode assertNodeInCluster(String nodeName) {
Map<String, DiscoveryNode> nodes = internalCluster().client().admin().cluster().prepareState().get().getState().nodes().getNodes();
DiscoveryNode discoveryNode = null;
for (Map.Entry<String, DiscoveryNode> entry : nodes.entrySet()) {
DiscoveryNode node = entry.getValue();
Expand All @@ -457,9 +462,9 @@ public DiscoveryNode assertNodeInCluster(String nodeName) {
}

// returns a comma-separated list of node names excluding `except`
private String allNodesExcept(String except) {
public static String allNodesExcept(String except) {
StringBuilder exclude = new StringBuilder();
DiscoveryNodes allNodes = client.admin().cluster().prepareState().get().getState().nodes();
DiscoveryNodes allNodes = internalCluster().client().admin().cluster().prepareState().get().getState().nodes();
for (DiscoveryNode node : allNodes) {
if (node.getName().equals(except) == false) {
exclude.append(node.getName()).append(",");
Expand Down Expand Up @@ -511,13 +516,15 @@ private Decision getDecisionForTargetNode(
}

// create a new test index
public void prepareIndexWithoutReplica(Optional<String> name) {
public static void prepareIndexWithoutReplica(Optional<String> name) {
String indexName = name.orElse(TEST_INDEX);
client.admin()
internalCluster().client()
.admin()
.indices()
.prepareCreate(indexName)
.setSettings(
Settings.builder().put("index.number_of_shards", 1)
Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put("index.routing.allocation.exclude._name", allNodesExcept(null))
)
Expand All @@ -531,7 +538,8 @@ public void prepareIndexWithAllocatedPrimary(DiscoveryNode primaryShardNode, Opt
.indices()
.prepareCreate(indexName)
.setSettings(
Settings.builder().put("index.number_of_shards", 1)
Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 1)
.put("index.routing.allocation.include._name", primaryShardNode.getName())
.put("index.routing.allocation.exclude._name", allNodesExcept(primaryShardNode.getName()))
Expand Down Expand Up @@ -573,16 +581,23 @@ private void attemptAllocation(String targetNodeName) {
}

private ShardRouting getShardRouting(boolean isPrimary) {
IndexShardRoutingTable table = client.admin().cluster().prepareState().execute().actionGet().getState().getRoutingTable().index(TEST_INDEX).shard(0);
IndexShardRoutingTable table = client.admin()
.cluster()
.prepareState()
.execute()
.actionGet()
.getState()
.getRoutingTable()
.index(TEST_INDEX)
.shard(0);
return (isPrimary ? table.primaryShard() : table.replicaShards().get(0));
}

// verify that shard does not exist at targetNode
private void assertNonAllocation(boolean isPrimary) {
if (isPrimary) {
ensureRed(TEST_INDEX);
}
else {
} else {
ensureYellowAndNoInitializingShards(TEST_INDEX);
}
ShardRouting shardRouting = getShardRouting(isPrimary);
Expand Down
Loading

0 comments on commit e605b95

Please sign in to comment.