From 082f0594582116713355ae00fe5ad001774a511d Mon Sep 17 00:00:00 2001 From: Kunal Kotwani Date: Fri, 4 Nov 2022 11:39:51 -0700 Subject: [PATCH] Add allocation tests for remote shards (#5048) Signed-off-by: Kunal Kotwani Signed-off-by: Kunal Kotwani --- CHANGELOG.md | 2 +- .../snapshots/SearchableSnapshotIT.java | 240 ++++++++++-------- .../allocator/RemoteShardsBalancer.java | 15 +- .../opensearch/test/InternalTestCluster.java | 60 +++++ .../test/OpenSearchIntegTestCase.java | 7 + 5 files changed, 219 insertions(+), 105 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b1255bba0ac01..0f67931fdbc9c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -61,7 +61,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Data node side change([#4204](https://github.com/opensearch-project/OpenSearch/pull/4204)) - on-boarding of tasks([#4542](https://github.com/opensearch-project/OpenSearch/pull/4542)) - Integs ([4588](https://github.com/opensearch-project/OpenSearch/pull/4588)) - +- Integration tests for searchable snapshots ([#5048](https://github.com/opensearch-project/OpenSearch/pull/5048)) ### Dependencies - Bumps `log4j-core` from 2.18.0 to 2.19.0 - Bumps `reactor-netty-http` from 1.0.18 to 1.0.23 diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java index ff656e0ef51f8..17c32bb407bc3 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java @@ -68,6 +68,35 @@ private Settings.Builder chunkedRepositorySettings() { return settings; } + /** + * Tests a happy path scenario for searchable snapshots by creating 2 indices, + * taking a snapshot, restoring them as searchable snapshots. + * Ensures availability of sufficient data nodes and search capable nodes. + */ + public void testCreateSearchableSnapshot() throws Exception { + final int numReplicasIndex1 = randomIntBetween(1, 4); + final int numReplicasIndex2 = randomIntBetween(0, 2); + final Client client = client(); + + internalCluster().ensureAtLeastNumDataNodes(Math.max(numReplicasIndex1, numReplicasIndex2) + 1); + createIndexWithDocsAndEnsureGreen(numReplicasIndex1, 100, "test-idx-1"); + createIndexWithDocsAndEnsureGreen(numReplicasIndex2, 100, "test-idx-2"); + + takeSnapshot(client, "test-idx-1", "test-idx-2"); + deleteIndicesAndEnsureGreen(client, "test-idx-1", "test-idx-2"); + + internalCluster().ensureAtLeastNumSearchNodes(Math.max(numReplicasIndex1, numReplicasIndex2) + 1); + restoreSnapshotAndEnsureGreen(client); + + assertDocCount("test-idx-1-copy", 100L); + assertDocCount("test-idx-2-copy", 100L); + assertIndexDirectoryDoesNotExist("test-idx-1-copy", "test-idx-2-copy"); + } + + /** + * Tests a chunked repository scenario for searchable snapshots by creating an index, + * taking a snapshot, restoring it as a searchable snapshot index. + */ public void testCreateSearchableSnapshotWithChunks() throws Exception { final int numReplicasIndex = randomIntBetween(1, 4); final String indexName = "test-idx"; @@ -76,153 +105,160 @@ public void testCreateSearchableSnapshotWithChunks() throws Exception { Settings.Builder repositorySettings = chunkedRepositorySettings(); - internalCluster().ensureAtLeastNumDataNodes(numReplicasIndex + 1); - createIndex( - indexName, - Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, Integer.toString(numReplicasIndex)) - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1") - .build() - ); - ensureGreen(); - indexRandomDocs(indexName, 1000); + internalCluster().ensureAtLeastNumSearchAndDataNodes(numReplicasIndex + 1); + createIndexWithDocsAndEnsureGreen(numReplicasIndex, 1000, indexName); + takeSnapshot(client, repositorySettings, indexName); - createRepository("test-repo", "fs", repositorySettings); - logger.info("--> snapshot"); - final CreateSnapshotResponse createSnapshotResponse = client.admin() - .cluster() - .prepareCreateSnapshot("test-repo", "test-snap") - .setWaitForCompletion(true) - .setIndices(indexName) - .get(); - MatcherAssert.assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); - MatcherAssert.assertThat( - createSnapshotResponse.getSnapshotInfo().successfulShards(), - equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()) - ); + deleteIndicesAndEnsureGreen(client, indexName); + restoreSnapshotAndEnsureGreen(client); - assertTrue(client.admin().indices().prepareDelete(indexName).get().isAcknowledged()); + assertDocCount(restoredIndexName, 1000L); + } - internalCluster().ensureAtLeastNumSearchNodes(numReplicasIndex + 1); - logger.info("--> restore indices as 'remote_snapshot'"); - client.admin() - .cluster() - .prepareRestoreSnapshot("test-repo", "test-snap") - .setRenamePattern("(.+)") - .setRenameReplacement("$1-copy") - .setStorageType(RestoreSnapshotRequest.StorageType.REMOTE_SNAPSHOT) - .setWaitForCompletion(true) - .execute() - .actionGet(); - ensureGreen(); + /** + * Tests the functionality of remote shard allocation to + * ensure it can assign remote shards to a node with local shards given it has the + * search role capabilities. + */ + public void testSearchableSnapshotAllocationForLocalAndRemoteShardsOnSameNode() throws Exception { + final int numReplicasIndex = randomIntBetween(1, 4); + final String indexName = "test-idx"; + final String restoredIndexName = indexName + "-copy"; + final Client client = client(); - assertDocCount(restoredIndexName, 1000L); + internalCluster().ensureAtLeastNumSearchAndDataNodes(numReplicasIndex + 1); + createIndexWithDocsAndEnsureGreen(numReplicasIndex, 100, indexName); + takeSnapshot(client, indexName); + + restoreSnapshotAndEnsureGreen(client); + assertDocCount(restoredIndexName, 100L); + assertDocCount(indexName, 100L); } - public void testCreateSearchableSnapshot() throws Exception { - final int numReplicasIndex1 = randomIntBetween(1, 4); - final int numReplicasIndex2 = randomIntBetween(0, 2); - internalCluster().ensureAtLeastNumDataNodes(Math.max(numReplicasIndex1, numReplicasIndex2) + 1); + /** + * Tests the functionality of remote shard allocation to + * ensure it can handle node drops for failover scenarios and the cluster gets back to a healthy state when + * nodes with search capabilities are added back to the cluster. + */ + public void testSearchableSnapshotAllocationForFailoverAndRecovery() throws Exception { + final int numReplicasIndex = 1; + final String indexName = "test-idx"; + final String restoredIndexName = indexName + "-copy"; final Client client = client(); - createRepository("test-repo", "fs"); - createIndex( - "test-idx-1", - Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, Integer.toString(numReplicasIndex1)) - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1") - .build() - ); - createIndex( - "test-idx-2", - Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, Integer.toString(numReplicasIndex2)) - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1") - .build() - ); - ensureGreen(); - indexRandomDocs("test-idx-1", 100); - indexRandomDocs("test-idx-2", 100); - logger.info("--> snapshot"); - final CreateSnapshotResponse createSnapshotResponse = client.admin() - .cluster() - .prepareCreateSnapshot("test-repo", "test-snap") - .setWaitForCompletion(true) - .setIndices("test-idx-1", "test-idx-2") - .get(); - MatcherAssert.assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); - MatcherAssert.assertThat( - createSnapshotResponse.getSnapshotInfo().successfulShards(), - equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()) - ); + internalCluster().ensureAtLeastNumDataNodes(numReplicasIndex + 1); + createIndexWithDocsAndEnsureGreen(numReplicasIndex, 100, indexName); - assertTrue(client.admin().indices().prepareDelete("test-idx-1", "test-idx-2").get().isAcknowledged()); + takeSnapshot(client, indexName); + deleteIndicesAndEnsureGreen(client, indexName); - internalCluster().ensureAtLeastNumSearchNodes(Math.max(numReplicasIndex1, numReplicasIndex2) + 1); + internalCluster().ensureAtLeastNumSearchNodes(numReplicasIndex + 1); + restoreSnapshotAndEnsureGreen(client); + assertDocCount(restoredIndexName, 100L); - logger.info("--> restore indices as 'remote_snapshot'"); - client.admin() - .cluster() - .prepareRestoreSnapshot("test-repo", "test-snap") - .setRenamePattern("(.+)") - .setRenameReplacement("$1-copy") - .setStorageType(RestoreSnapshotRequest.StorageType.REMOTE_SNAPSHOT) - .setWaitForCompletion(true) - .execute() - .actionGet(); - ensureGreen(); + logger.info("--> stop a random search node"); + internalCluster().stopRandomSearchNode(); + ensureYellow(restoredIndexName); + assertDocCount(restoredIndexName, 100L); - assertDocCount("test-idx-1-copy", 100L); - assertDocCount("test-idx-2-copy", 100L); - assertIndexDirectoryDoesNotExist("test-idx-1-copy", "test-idx-2-copy"); + logger.info("--> stop the last search node"); + internalCluster().stopRandomSearchNode(); + ensureRed(restoredIndexName); + + logger.info("--> add 3 new search nodes"); + internalCluster().ensureAtLeastNumSearchNodes(numReplicasIndex + 2); + ensureGreen(restoredIndexName); + assertDocCount(restoredIndexName, 100); + + logger.info("--> stop a random search node"); + internalCluster().stopRandomSearchNode(); + ensureGreen(restoredIndexName); + assertDocCount(restoredIndexName, 100); } + /** + * Tests the functionality of index write block on a searchable snapshot index. + */ public void testSearchableSnapshotIndexIsReadOnly() throws Exception { final String indexName = "test-index"; + final String restoredIndexName = indexName + "-copy"; final Client client = client(); - createRepository("test-repo", "fs"); + + createIndexWithDocsAndEnsureGreen(0, 100, indexName); + takeSnapshot(client, indexName); + deleteIndicesAndEnsureGreen(client, indexName); + + internalCluster().ensureAtLeastNumSearchNodes(1); + restoreSnapshotAndEnsureGreen(client); + + assertIndexingBlocked(restoredIndexName); + assertIndexSettingChangeBlocked(restoredIndexName); + assertTrue(client.admin().indices().prepareDelete(restoredIndexName).get().isAcknowledged()); + assertThrows( + "Expect index to not exist", + IndexNotFoundException.class, + () -> client.admin().indices().prepareGetIndex().setIndices(restoredIndexName).execute().actionGet() + ); + } + + private void createIndexWithDocsAndEnsureGreen(int numReplicasIndex, int numOfDocs, String indexName) throws InterruptedException { createIndex( indexName, - Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0").put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1").build() + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, Integer.toString(numReplicasIndex)) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1") + .build() ); ensureGreen(); - logger.info("--> snapshot"); + indexRandomDocs(indexName, numOfDocs); + ensureGreen(); + } + + private void takeSnapshot(Client client, String... indices) { + takeSnapshot(client, null, indices); + } + + private void takeSnapshot(Client client, Settings.Builder repositorySettings, String... indices) { + logger.info("--> Create a repository"); + if (repositorySettings == null) { + createRepository("test-repo", "fs"); + } else { + createRepository("test-repo", "fs", repositorySettings); + } + logger.info("--> Take a snapshot"); final CreateSnapshotResponse createSnapshotResponse = client.admin() .cluster() .prepareCreateSnapshot("test-repo", "test-snap") .setWaitForCompletion(true) - .setIndices(indexName) + .setIndices(indices) .get(); + MatcherAssert.assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); MatcherAssert.assertThat( createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()) ); + } - assertTrue(client.admin().indices().prepareDelete(indexName).get().isAcknowledged()); + private void deleteIndicesAndEnsureGreen(Client client, String... indices) { + assertTrue(client.admin().indices().prepareDelete(indices).get().isAcknowledged()); + ensureGreen(); + } + private void restoreSnapshotAndEnsureGreen(Client client) { logger.info("--> restore indices as 'remote_snapshot'"); client.admin() .cluster() .prepareRestoreSnapshot("test-repo", "test-snap") .setRenamePattern("(.+)") - .setRenameReplacement("$1") + .setRenameReplacement("$1-copy") .setStorageType(RestoreSnapshotRequest.StorageType.REMOTE_SNAPSHOT) .setWaitForCompletion(true) .execute() .actionGet(); ensureGreen(); - - assertIndexingBlocked(indexName); - assertIndexSettingChangeBlocked(indexName); - assertTrue(client.admin().indices().prepareDelete(indexName).get().isAcknowledged()); - assertThrows( - "Expect index to not exist", - IndexNotFoundException.class, - () -> client.admin().indices().prepareGetIndex().setIndices(indexName).execute().actionGet() - ); } private void assertIndexingBlocked(String index) { diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java index efb2c5a27a12f..d80371768e545 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java @@ -21,6 +21,7 @@ import org.opensearch.cluster.routing.allocation.decider.Decision; import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider; import org.opensearch.common.Randomness; +import org.opensearch.cluster.routing.RecoverySource; import java.util.ArrayDeque; import java.util.ArrayList; @@ -57,6 +58,7 @@ public RemoteShardsBalancer(Logger logger, RoutingAllocation allocation) { */ @Override void allocateUnassigned() { + unassignIgnoredRemoteShards(allocation); if (routingNodes.unassigned().isEmpty()) { logger.debug("No unassigned remote shards found."); return; @@ -273,7 +275,6 @@ MoveDecision decideRebalance(ShardRouting shardRouting) { */ public Map groupUnassignedShardsByIndex() { HashMap unassignedShardMap = new HashMap<>(); - unassignIgnoredRemoteShards(allocation); for (ShardRouting shard : routingNodes.unassigned().drain()) { String index = shard.getIndexName(); if (!RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shard, allocation))) { @@ -298,7 +299,17 @@ private void unassignIgnoredRemoteShards(RoutingAllocation routingAllocation) { for (ShardRouting shard : unassignedShards.drainIgnored()) { RoutingPool pool = RoutingPool.getShardPool(shard, routingAllocation); if (pool == RoutingPool.REMOTE_CAPABLE && shard.unassigned() && (shard.primary() || !shard.unassignedInfo().isDelayed())) { - unassignedShards.add(shard); + ShardRouting unassignedShard = shard; + // Shard when moved to an unassigned state updates the recovery source to be ExistingStoreRecoverySource + // Remote shards do not have an existing store to recover from and can be recovered from an empty source + // to re-fetch any shard blocks from the repository. + if (shard.primary()) { + if (!RecoverySource.Type.SNAPSHOT.equals(shard.recoverySource().getType())) { + unassignedShard = shard.updateUnassigned(shard.unassignedInfo(), RecoverySource.EmptyStoreRecoverySource.INSTANCE); + } + } + + unassignedShards.add(unassignedShard); } else { unassignedShards.ignoreShard(shard, shard.unassignedInfo().getLastAllocationStatus(), routingAllocation.changes()); } diff --git a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java index 93d01976e4be3..4863880d38052 100644 --- a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java @@ -163,6 +163,7 @@ import static org.opensearch.discovery.DiscoveryModule.DISCOVERY_TYPE_SETTING; import static org.opensearch.discovery.DiscoveryModule.ZEN2_DISCOVERY_TYPE; import static org.opensearch.discovery.FileBasedSeedHostsProvider.UNICAST_HOSTS_FILE; +import static org.opensearch.test.NodeRoles.onlyRoles; import static org.opensearch.test.OpenSearchTestCase.assertBusy; import static org.opensearch.test.OpenSearchTestCase.randomFrom; import static org.opensearch.test.NodeRoles.dataOnlyNode; @@ -204,6 +205,11 @@ public final class InternalTestCluster extends TestCluster { DiscoveryNodeRole.SEARCH_ROLE ); + private static final Predicate SEARCH_AND_DATA_NODE_PREDICATE = nodeAndClient -> DiscoveryNode.hasRole( + nodeAndClient.node.settings(), + DiscoveryNodeRole.SEARCH_ROLE + ) && DiscoveryNode.isDataNode(nodeAndClient.node.settings()); + private static final Predicate NO_DATA_NO_CLUSTER_MANAGER_PREDICATE = nodeAndClient -> DiscoveryNode .isClusterManagerNode(nodeAndClient.node.settings()) == false && DiscoveryNode.isDataNode(nodeAndClient.node.settings()) == false; @@ -687,6 +693,27 @@ public synchronized void ensureAtLeastNumSearchNodes(int n) { } } + /** + * Ensures that at least n data-search nodes are present in the cluster. + * if more nodes than n are present this method will not + * stop any of the running nodes. + */ + public synchronized void ensureAtLeastNumSearchAndDataNodes(int n) { + int size = numSearchAndDataNodes(); + if (size < n) { + logger.info("increasing cluster size from {} to {}", size, n); + if (numSharedDedicatedClusterManagerNodes > 0) { + startDataAndSearchNodes(n - size); + } else { + Set searchAndDataRoles = new HashSet<>(); + searchAndDataRoles.add(DiscoveryNodeRole.DATA_ROLE); + searchAndDataRoles.add(DiscoveryNodeRole.SEARCH_ROLE); + startNodes(n - size, Settings.builder().put(onlyRoles(Settings.EMPTY, searchAndDataRoles)).build()); + } + validateClusterFormed(); + } + } + /** * Ensures that at most n are up and running. * If less nodes that n are running this method @@ -1719,6 +1746,20 @@ public InetSocketAddress[] httpAddresses() { return addresses.toArray(new InetSocketAddress[addresses.size()]); } + /** + * Stops a random search node in the cluster. Returns true if a node was found to stop, false otherwise. + */ + public synchronized boolean stopRandomSearchNode() throws IOException { + ensureOpen(); + NodeAndClient nodeAndClient = getRandomNodeAndClient(SEARCH_NODE_PREDICATE); + if (nodeAndClient != null) { + logger.info("Closing random node [{}] ", nodeAndClient.name); + stopNodesAndClient(nodeAndClient); + return true; + } + return false; + } + /** * Stops a random data node in the cluster. Returns true if a node was found to stop, false otherwise. */ @@ -2306,6 +2347,17 @@ public List startMasterOnlyNodes(int numNodes, Settings settings) { return startClusterManagerOnlyNodes(numNodes, settings); } + public List startDataAndSearchNodes(int numNodes) { + return startDataAndSearchNodes(numNodes, Settings.EMPTY); + } + + public List startDataAndSearchNodes(int numNodes, Settings settings) { + Set searchAndDataRoles = new HashSet<>(); + searchAndDataRoles.add(DiscoveryNodeRole.DATA_ROLE); + searchAndDataRoles.add(DiscoveryNodeRole.SEARCH_ROLE); + return startNodes(numNodes, Settings.builder().put(onlyRoles(settings, searchAndDataRoles)).build()); + } + public List startDataOnlyNodes(int numNodes) { return startDataOnlyNodes(numNodes, Settings.EMPTY); } @@ -2382,6 +2434,10 @@ public int numSearchNodes() { return searchNodeAndClients().size(); } + public int numSearchAndDataNodes() { + return searchDataNodeAndClients().size(); + } + @Override public int numDataAndClusterManagerNodes() { return filterNodes(nodes, DATA_NODE_PREDICATE.or(CLUSTER_MANAGER_NODE_PREDICATE)).size(); @@ -2445,6 +2501,10 @@ private Collection searchNodeAndClients() { return filterNodes(nodes, SEARCH_NODE_PREDICATE); } + private Collection searchDataNodeAndClients() { + return filterNodes(nodes, SEARCH_AND_DATA_NODE_PREDICATE); + } + private static Collection filterNodes( Map map, Predicate predicate diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index fe8a7cb2b786b..94aae206c2eb3 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -924,6 +924,13 @@ public ClusterHealthStatus ensureYellow(String... indices) { return ensureColor(ClusterHealthStatus.YELLOW, TimeValue.timeValueSeconds(30), false, indices); } + /** + * Ensures the cluster has a red state via the cluster health API. + */ + public ClusterHealthStatus ensureRed(String... indices) { + return ensureColor(ClusterHealthStatus.RED, TimeValue.timeValueSeconds(30), false, indices); + } + /** * Ensures the cluster has a yellow state via the cluster health API and ensures the that cluster has no initializing shards * for the given indices