From b8f24c011fef2790f67735001e270611e7e74c7a Mon Sep 17 00:00:00 2001 From: Bhumika Saini Date: Thu, 20 Jul 2023 21:54:35 +0530 Subject: [PATCH] Add integration tests for remote store restore flow (#8484) --------- Signed-off-by: Bhumika Saini (cherry picked from commit 27a14c7c811f930b2b36c13f20e1371c6206dd51) --- .../remotestore/PrimaryTermValidationIT.java | 168 +++++ .../RemoteStoreBaseIntegTestCase.java | 151 +++++ .../opensearch/remotestore/RemoteStoreIT.java | 627 ++++++++++++++++++ .../cluster/routing/RecoverySource.java | 7 + 4 files changed, 953 insertions(+) create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/PrimaryTermValidationIT.java create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/PrimaryTermValidationIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/PrimaryTermValidationIT.java new file mode 100644 index 0000000000000..d1fdde30a9537 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/PrimaryTermValidationIT.java @@ -0,0 +1,168 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore; + +import org.opensearch.action.admin.cluster.health.ClusterHealthRequest; +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; +import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse; +import org.opensearch.action.admin.indices.refresh.RefreshResponse; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.support.IndicesOptions; +import org.opensearch.cluster.coordination.FollowersChecker; +import org.opensearch.cluster.coordination.LeaderChecker; +import org.opensearch.cluster.health.ClusterHealthStatus; +import org.opensearch.cluster.health.ClusterIndexHealth; +import org.opensearch.common.UUIDs; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.index.shard.ShardNotFoundException; +import org.opensearch.plugins.Plugin; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.disruption.NetworkDisruption; +import org.opensearch.test.transport.MockTransportService; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.hamcrest.Matchers.equalTo; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) + +public class PrimaryTermValidationIT extends RemoteStoreBaseIntegTestCase { + + private static final String INDEX_NAME = "remote-store-test-idx-1"; + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(MockTransportService.TestPlugin.class); + } + + public void testPrimaryTermValidation() throws Exception { + // Follower checker interval is lower compared to leader checker so that the cluster manager can remove the node + // with network partition faster. The follower check retry count is also kept 1. + Settings clusterSettings = Settings.builder() + .put(LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING.getKey(), "1s") + .put(LeaderChecker.LEADER_CHECK_INTERVAL_SETTING.getKey(), "20s") + .put(LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), 4) + .put(FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING.getKey(), "1s") + .put(FollowersChecker.FOLLOWER_CHECK_INTERVAL_SETTING.getKey(), "1s") + .put(FollowersChecker.FOLLOWER_CHECK_RETRY_COUNT_SETTING.getKey(), 1) + .build(); + internalCluster().startClusterManagerOnlyNode(clusterSettings); + + // Create repository + absolutePath = randomRepoPath().toAbsolutePath(); + assertAcked( + clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath)) + ); + absolutePath2 = randomRepoPath().toAbsolutePath(); + putRepository(absolutePath2, REPOSITORY_2_NAME); + + // Start data nodes and create index + internalCluster().startDataOnlyNodes(2, clusterSettings); + createIndex(INDEX_NAME, remoteTranslogIndexSettings(1)); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + // Get the names of nodes to create network disruption + String primaryNode = primaryNodeName(INDEX_NAME); + String replicaNode = replicaNodeName(INDEX_NAME); + String clusterManagerNode = internalCluster().getClusterManagerName(); + logger.info("Node names : clusterManager={} primary={} replica={}", clusterManagerNode, primaryNode, replicaNode); + + // Index some docs and validate that both primary and replica node has it. Refresh is triggered to trigger segment replication + // to ensure replica is also upto date. + int numOfDocs = randomIntBetween(5, 10); + for (int i = 0; i < numOfDocs; i++) { + indexSameDoc(clusterManagerNode, INDEX_NAME); + } + refresh(INDEX_NAME); + assertBusy( + () -> assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), numOfDocs) + ); + assertBusy( + () -> assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), numOfDocs) + ); + + // Start network disruption - primary node will be isolated + Set nodesInOneSide = Stream.of(clusterManagerNode, replicaNode).collect(Collectors.toCollection(HashSet::new)); + Set nodesInOtherSide = Stream.of(primaryNode).collect(Collectors.toCollection(HashSet::new)); + NetworkDisruption networkDisruption = new NetworkDisruption( + new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide), + NetworkDisruption.DISCONNECT + ); + internalCluster().setDisruptionScheme(networkDisruption); + logger.info("--> network disruption is started"); + networkDisruption.startDisrupting(); + + // Ensure the node which is partitioned is removed from the cluster + assertBusy(() -> { + NodesInfoResponse response = client(clusterManagerNode).admin().cluster().prepareNodesInfo().get(); + assertThat(response.getNodes().size(), equalTo(2)); + }); + + // Ensure that the cluster manager has latest information about the index + assertBusy(() -> { + ClusterHealthResponse clusterHealthResponse = client(clusterManagerNode).admin() + .cluster() + .health(new ClusterHealthRequest()) + .actionGet(TimeValue.timeValueSeconds(1)); + assertTrue(clusterHealthResponse.getIndices().containsKey(INDEX_NAME)); + ClusterIndexHealth clusterIndexHealth = clusterHealthResponse.getIndices().get(INDEX_NAME); + assertEquals(ClusterHealthStatus.YELLOW, clusterHealthResponse.getStatus()); + assertEquals(1, clusterIndexHealth.getNumberOfShards()); + assertEquals(1, clusterIndexHealth.getActiveShards()); + assertEquals(1, clusterIndexHealth.getUnassignedShards()); + assertEquals(1, clusterIndexHealth.getUnassignedShards()); + assertEquals(1, clusterIndexHealth.getActivePrimaryShards()); + assertEquals(ClusterHealthStatus.YELLOW, clusterIndexHealth.getStatus()); + }); + + // Index data to the newly promoted primary + indexSameDoc(clusterManagerNode, INDEX_NAME); + RefreshResponse refreshResponse = client(clusterManagerNode).admin() + .indices() + .prepareRefresh(INDEX_NAME) + .setIndicesOptions(IndicesOptions.STRICT_EXPAND_OPEN_HIDDEN_FORBID_CLOSED) + .execute() + .actionGet(); + assertNoFailures(refreshResponse); + assertEquals(1, refreshResponse.getSuccessfulShards()); + assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), numOfDocs + 1); + + // At this point we stop the disruption. Since the follower checker has already failed and cluster manager has removed the node + // from cluster, failed node needs to start discovery process by leader checker call. We stop the disruption to allow the failed + // node to + // communicate with the other node which it assumes has replica. + networkDisruption.stopDisrupting(); + + // When the index call is made to the stale primary, it makes the primary term validation call to the other node (which + // it assumes has the replica node). At this moment, the stale primary realises that it is no more the primary and the caller + // received the following exception. + ShardNotFoundException exception = assertThrows(ShardNotFoundException.class, () -> indexSameDoc(primaryNode, INDEX_NAME)); + assertTrue(exception.getMessage().contains("no such shard")); + ensureStableCluster(3); + ensureGreen(INDEX_NAME); + } + + private IndexResponse indexSameDoc(String nodeName, String indexName) { + return client(nodeName).prepareIndex(indexName) + .setId(UUIDs.randomBase64UUID()) + .setSource("{\"foo\" : \"bar\"}", XContentType.JSON) + .get(); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java new file mode 100644 index 0000000000000..0b3ceac176193 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java @@ -0,0 +1,151 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore; + +import org.junit.After; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.UUIDs; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.index.IndexModule; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.mapper.MapperService; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.io.IOException; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +public class RemoteStoreBaseIntegTestCase extends OpenSearchIntegTestCase { + protected static final String REPOSITORY_NAME = "test-remore-store-repo"; + protected static final String REPOSITORY_2_NAME = "test-remore-store-repo-2"; + protected static final int SHARD_COUNT = 1; + protected static final int REPLICA_COUNT = 1; + protected Path absolutePath; + protected Path absolutePath2; + + @Override + protected boolean addMockInternalEngine() { + return false; + } + + @Override + protected Settings featureFlagSettings() { + return Settings.builder() + .put(super.featureFlagSettings()) + .put(FeatureFlags.REMOTE_STORE, "true") + .put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true") + .build(); + } + + public Settings indexSettings() { + return defaultIndexSettings(); + } + + IndexResponse indexSingleDoc(String indexName) { + return client().prepareIndex(indexName) + .setId(UUIDs.randomBase64UUID()) + .setSource(randomAlphaOfLength(5), randomAlphaOfLength(5)) + .get(); + } + + private Settings defaultIndexSettings() { + return Settings.builder() + .put(super.indexSettings()) + .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT) + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "300s") + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build(); + } + + protected Settings remoteStoreIndexSettings(int numberOfReplicas, int numberOfShards) { + return Settings.builder() + .put(defaultIndexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas) + .build(); + } + + protected Settings remoteStoreIndexSettings(int numberOfReplicas) { + return remoteStoreIndexSettings(numberOfReplicas, 1); + } + + protected Settings remoteStoreIndexSettings(int numberOfReplicas, long totalFieldLimit) { + return Settings.builder() + .put(remoteStoreIndexSettings(numberOfReplicas)) + .put(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), totalFieldLimit) + .build(); + } + + protected Settings remoteTranslogIndexSettings(int numberOfReplicas, int numberOfShards) { + boolean sameRepoForRSSAndRTS = randomBoolean(); + return Settings.builder() + .put(remoteStoreIndexSettings(numberOfReplicas, numberOfShards)) + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, sameRepoForRSSAndRTS ? REPOSITORY_NAME : REPOSITORY_2_NAME) + .build(); + } + + protected Settings remoteTranslogIndexSettings(int numberOfReplicas) { + return remoteTranslogIndexSettings(numberOfReplicas, 1); + } + + protected void putRepository(Path path) { + putRepository(path, REPOSITORY_NAME); + } + + protected void putRepository(Path path, String repoName) { + assertAcked(clusterAdmin().preparePutRepository(repoName).setType("fs").setSettings(Settings.builder().put("location", path))); + } + + protected void setupRepo() { + internalCluster().startClusterManagerOnlyNode(); + absolutePath = randomRepoPath().toAbsolutePath(); + putRepository(absolutePath); + absolutePath2 = randomRepoPath().toAbsolutePath(); + putRepository(absolutePath2, REPOSITORY_2_NAME); + } + + @After + public void teardown() { + assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME)); + assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_2_NAME)); + } + + public int getFileCount(Path path) throws Exception { + final AtomicInteger filesExisting = new AtomicInteger(0); + Files.walkFileTree(path, new SimpleFileVisitor<>() { + @Override + public FileVisitResult postVisitDirectory(Path dir, IOException impossible) throws IOException { + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) { + filesExisting.incrementAndGet(); + return FileVisitResult.CONTINUE; + } + }); + + return filesExisting.get(); + } + +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java new file mode 100644 index 0000000000000..4fcaeabae05fe --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -0,0 +1,627 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore; + +import org.hamcrest.MatcherAssert; +import org.junit.Before; +import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest; +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; +import org.opensearch.action.admin.indices.recovery.RecoveryResponse; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.cluster.health.ClusterHealthStatus; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.routing.RecoverySource; +import org.opensearch.common.UUIDs; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.shard.RemoteStoreRefreshListener; +import org.opensearch.indices.recovery.RecoveryState; +import org.opensearch.plugins.Plugin; +import org.opensearch.test.InternalTestCluster; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.transport.MockTransportService; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.comparesEqualTo; +import static org.hamcrest.Matchers.oneOf; +import static org.hamcrest.Matchers.is; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class RemoteStoreIT extends RemoteStoreBaseIntegTestCase { + + private static final String INDEX_NAME = "remote-store-test-idx-1"; + private static final String INDEX_NAMES = "test-remote-store-1,test-remote-store-2,remote-store-test-index-1,remote-store-test-index-2"; + private static final String INDEX_NAMES_WILDCARD = "test-remote-store-*,remote-store-test-index-*"; + private static final String TOTAL_OPERATIONS = "total-operations"; + private static final String REFRESHED_OR_FLUSHED_OPERATIONS = "refreshed-or-flushed-operations"; + private static final String MAX_SEQ_NO_TOTAL = "max-seq-no-total"; + private static final String MAX_SEQ_NO_REFRESHED_OR_FLUSHED = "max-seq-no-refreshed-or-flushed"; + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(MockTransportService.TestPlugin.class); + } + + @Before + public void setup() { + setupRepo(); + } + + @Override + public Settings indexSettings() { + return remoteStoreIndexSettings(0); + } + + private IndexResponse indexSingleDoc() { + return client().prepareIndex(INDEX_NAME) + .setId(UUIDs.randomBase64UUID()) + .setSource(randomAlphaOfLength(5), randomAlphaOfLength(5)) + .get(); + } + + private Map indexData(int numberOfIterations, boolean invokeFlush, String index) { + long totalOperations = 0; + long refreshedOrFlushedOperations = 0; + long maxSeqNo = -1; + long maxSeqNoRefreshedOrFlushed = -1; + int shardId = 0; + Map indexingStats = new HashMap<>(); + for (int i = 0; i < numberOfIterations; i++) { + if (invokeFlush) { + flush(index); + } else { + refresh(index); + } + maxSeqNoRefreshedOrFlushed = maxSeqNo; + indexingStats.put(MAX_SEQ_NO_REFRESHED_OR_FLUSHED + "-shard-" + shardId, maxSeqNoRefreshedOrFlushed); + refreshedOrFlushedOperations = totalOperations; + int numberOfOperations = randomIntBetween(20, 50); + for (int j = 0; j < numberOfOperations; j++) { + IndexResponse response = INDEX_NAME.equals(index) ? indexSingleDoc() : indexSingleDoc(index); + maxSeqNo = response.getSeqNo(); + shardId = response.getShardId().id(); + indexingStats.put(MAX_SEQ_NO_TOTAL + "-shard-" + shardId, maxSeqNo); + } + totalOperations += numberOfOperations; + } + + indexingStats.put(TOTAL_OPERATIONS, totalOperations); + indexingStats.put(REFRESHED_OR_FLUSHED_OPERATIONS, refreshedOrFlushedOperations); + indexingStats.put(MAX_SEQ_NO_TOTAL, maxSeqNo); + indexingStats.put(MAX_SEQ_NO_REFRESHED_OR_FLUSHED, maxSeqNoRefreshedOrFlushed); + return indexingStats; + } + + private void verifyRestoredData(Map indexStats, boolean checkTotal, String indexName) { + String statsGranularity = checkTotal ? TOTAL_OPERATIONS : REFRESHED_OR_FLUSHED_OPERATIONS; + String maxSeqNoGranularity = checkTotal ? MAX_SEQ_NO_TOTAL : MAX_SEQ_NO_REFRESHED_OR_FLUSHED; + ensureYellowAndNoInitializingShards(indexName); + ensureGreen(indexName); + assertHitCount(client().prepareSearch(indexName).setSize(0).get(), indexStats.get(statsGranularity)); + IndexResponse response = INDEX_NAME.equals(indexName) ? indexSingleDoc() : indexSingleDoc(indexName); + assertEquals(indexStats.get(maxSeqNoGranularity + "-shard-" + response.getShardId().id()) + 1, response.getSeqNo()); + refresh(indexName); + assertHitCount(client().prepareSearch(indexName).setSize(0).get(), indexStats.get(statsGranularity) + 1); + } + + private void prepareCluster( + int numClusterManagerNodes, + int numDataOnlyNodes, + boolean remoteTranslogEnabled, + String indices, + int replicaCount, + int shardCount + ) { + internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes); + internalCluster().startDataOnlyNodes(numDataOnlyNodes); + for (String index : indices.split(",")) { + if (remoteTranslogEnabled) { + createIndex(index, remoteTranslogIndexSettings(replicaCount, shardCount)); + } else { + createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount)); + } + + ensureYellowAndNoInitializingShards(index); + ensureGreen(index); + } + } + + /** + * Helper function to test restoring an index with no replication from remote store. Only primary node is dropped. + * @param remoteTranslog If true, Remote Translog Store is also enabled in addition to Remote Segment Store. + * @param numberOfIterations Number of times a refresh/flush should be invoked, followed by indexing some data. + * @param invokeFlush If true, a flush is invoked. Otherwise, a refresh is invoked. + * @throws IOException IO Exception. + */ + private void testRestoreFlow(boolean remoteTranslog, int numberOfIterations, boolean invokeFlush, int shardCount) throws IOException { + prepareCluster(0, 3, remoteTranslog, INDEX_NAME, 0, shardCount); + Map indexStats = indexData(numberOfIterations, invokeFlush, INDEX_NAME); + assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards); + + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME))); + ensureRed(INDEX_NAME); + + assertAcked(client().admin().indices().prepareClose(INDEX_NAME)); + client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), PlainActionFuture.newFuture()); + + ensureGreen(INDEX_NAME); + assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards); + verifyRestoredData(indexStats, remoteTranslog, INDEX_NAME); + } + + /** + * Helper function to test restoring an index having replicas from remote store when all the nodes housing the primary/replica drop. + * @param remoteTranslog If true, Remote Translog Store is also enabled in addition to Remote Segment Store. + * @param numberOfIterations Number of times a refresh/flush should be invoked, followed by indexing some data. + * @param invokeFlush If true, a flush is invoked. Otherwise, a refresh is invoked. + * @throws IOException IO Exception. + */ + private void testRestoreFlowBothPrimaryReplicasDown(boolean remoteTranslog, int numberOfIterations, boolean invokeFlush, int shardCount) + throws IOException { + prepareCluster(1, 2, remoteTranslog, INDEX_NAME, 1, shardCount); + Map indexStats = indexData(numberOfIterations, invokeFlush, INDEX_NAME); + assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards); + + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(replicaNodeName(INDEX_NAME))); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME))); + ensureRed(INDEX_NAME); + internalCluster().startDataOnlyNodes(2); + + assertAcked(client().admin().indices().prepareClose(INDEX_NAME)); + client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), PlainActionFuture.newFuture()); + + ensureGreen(INDEX_NAME); + assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards); + assertEquals(0, getNumShards(INDEX_NAME).numReplicas); + verifyRestoredData(indexStats, remoteTranslog, INDEX_NAME); + } + + /** + * Helper function to test restoring multiple indices from remote store when all the nodes housing the primary/replica drop. + * @param remoteTranslog If true, Remote Translog Store is also enabled in addition to Remote Segment Store. + * @param numberOfIterations Number of times a refresh/flush should be invoked, followed by indexing some data. + * @param invokeFlush If true, a flush is invoked. Otherwise, a refresh is invoked. + * @throws IOException IO Exception. + */ + private void testRestoreFlowMultipleIndices(boolean remoteTranslog, int numberOfIterations, boolean invokeFlush, int shardCount) + throws IOException { + prepareCluster(1, 3, remoteTranslog, INDEX_NAMES, 1, shardCount); + String[] indices = INDEX_NAMES.split(","); + Map> indicesStats = new HashMap<>(); + for (String index : indices) { + Map indexStats = indexData(numberOfIterations, invokeFlush, index); + indicesStats.put(index, indexStats); + assertEquals(shardCount, getNumShards(index).totalNumShards); + } + + for (String index : indices) { + ClusterHealthStatus indexHealth = ensureRed(index); + if (ClusterHealthStatus.RED.equals(indexHealth)) { + continue; + } + + if (ClusterHealthStatus.GREEN.equals(indexHealth)) { + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(replicaNodeName(index))); + } + + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(index))); + } + + ensureRed(indices); + internalCluster().startDataOnlyNodes(3); + + assertAcked(client().admin().indices().prepareClose(indices)); + client().admin() + .cluster() + .restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAMES_WILDCARD.split(",")), PlainActionFuture.newFuture()); + ensureGreen(indices); + for (String index : indices) { + assertEquals(shardCount, getNumShards(index).totalNumShards); + verifyRestoredData(indicesStats.get(index), remoteTranslog, index); + } + } + + /** + * Simulates all data restored using Remote Translog Store. + * @throws IOException IO Exception. + */ + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6188") + public void testRemoteTranslogRestoreWithNoDataPostCommit() throws IOException { + testRestoreFlow(true, 1, true, randomIntBetween(1, 5)); + } + + /** + * Simulates all data restored using Remote Translog Store. + * @throws IOException IO Exception. + */ + public void testRemoteTranslogRestoreWithNoDataPostRefresh() throws IOException { + testRestoreFlow(true, 1, false, randomIntBetween(1, 5)); + } + + /** + * Simulates refreshed data restored using Remote Segment Store + * and unrefreshed data restored using Remote Translog Store. + * @throws IOException IO Exception. + */ + public void testRemoteTranslogRestoreWithRefreshedData() throws IOException { + testRestoreFlow(true, randomIntBetween(2, 5), false, randomIntBetween(1, 5)); + } + + /** + * Simulates refreshed data restored using Remote Segment Store + * and unrefreshed data restored using Remote Translog Store. + * @throws IOException IO Exception. + */ + public void testRemoteTranslogRestoreWithCommittedData() throws IOException { + testRestoreFlow(true, randomIntBetween(2, 5), true, randomIntBetween(1, 5)); + } + + /** + * Simulates all data restored using Remote Translog Store. + * @throws IOException IO Exception. + */ + // @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6188") + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8479") + public void testRTSRestoreWithNoDataPostCommitPrimaryReplicaDown() throws IOException { + testRestoreFlowBothPrimaryReplicasDown(true, 1, true, randomIntBetween(1, 5)); + } + + /** + * Simulates all data restored using Remote Translog Store. + * @throws IOException IO Exception. + */ + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8479") + public void testRTSRestoreWithNoDataPostRefreshPrimaryReplicaDown() throws IOException { + testRestoreFlowBothPrimaryReplicasDown(true, 1, false, randomIntBetween(1, 5)); + } + + /** + * Simulates refreshed data restored using Remote Segment Store + * and unrefreshed data restored using Remote Translog Store. + * @throws IOException IO Exception. + */ + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8479") + public void testRTSRestoreWithRefreshedDataPrimaryReplicaDown() throws IOException { + testRestoreFlowBothPrimaryReplicasDown(true, randomIntBetween(2, 5), false, randomIntBetween(1, 5)); + } + + /** + * Simulates refreshed data restored using Remote Segment Store + * and unrefreshed data restored using Remote Translog Store. + * @throws IOException IO Exception. + */ + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8479") + public void testRTSRestoreWithCommittedDataPrimaryReplicaDown() throws IOException { + testRestoreFlowBothPrimaryReplicasDown(true, randomIntBetween(2, 5), true, randomIntBetween(1, 5)); + } + + /** + * Simulates refreshed data restored using Remote Segment Store + * and unrefreshed data restored using Remote Translog Store + * for multiple indices matching a wildcard name pattern. + * @throws IOException IO Exception. + */ + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8480") + public void testRTSRestoreWithCommittedDataMultipleIndicesPatterns() throws IOException { + testRestoreFlowMultipleIndices(true, 2, true, randomIntBetween(1, 5)); + } + + /** + * Simulates refreshed data restored using Remote Segment Store + * and unrefreshed data restored using Remote Translog Store, + * with all remote-enabled red indices considered for the restore by default. + * @throws IOException IO Exception. + */ + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8480") + public void testRTSRestoreWithCommittedDataDefaultAllIndices() throws IOException { + int shardCount = randomIntBetween(1, 5); + prepareCluster(1, 3, true, INDEX_NAMES, 1, shardCount); + String[] indices = INDEX_NAMES.split(","); + Map> indicesStats = new HashMap<>(); + for (String index : indices) { + Map indexStats = indexData(2, true, index); + indicesStats.put(index, indexStats); + assertEquals(shardCount, getNumShards(index).totalNumShards); + } + + for (String index : indices) { + if (ClusterHealthStatus.RED.equals(ensureRed(index))) { + continue; + } + + if (ClusterHealthStatus.GREEN.equals(ensureRed(index))) { + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(replicaNodeName(index))); + } + + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(index))); + } + + ensureRed(indices); + internalCluster().startDataOnlyNodes(3); + + assertAcked(client().admin().indices().prepareClose(indices)); + client().admin() + .cluster() + .restoreRemoteStore(new RestoreRemoteStoreRequest().indices(new String[] {}), PlainActionFuture.newFuture()); + ensureGreen(indices); + + for (String index : indices) { + assertEquals(shardCount, getNumShards(index).totalNumShards); + verifyRestoredData(indicesStats.get(index), true, index); + } + } + + /** + * Simulates refreshed data restored using Remote Segment Store + * and unrefreshed data restored using Remote Translog Store, + * with only some of the remote-enabled red indices requested for the restore. + * @throws IOException IO Exception. + */ + public void testRTSRestoreWithCommittedDataNotAllRedRemoteIndices() throws IOException { + int shardCount = randomIntBetween(1, 5); + prepareCluster(1, 3, true, INDEX_NAMES, 0, shardCount); + String[] indices = INDEX_NAMES.split(","); + Map> indicesStats = new HashMap<>(); + for (String index : indices) { + Map indexStats = indexData(2, true, index); + indicesStats.put(index, indexStats); + assertEquals(shardCount, getNumShards(index).totalNumShards); + } + + for (String index : indices) { + if (ClusterHealthStatus.RED.equals(ensureRed(index))) { + continue; + } + + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(index))); + } + + ensureRed(indices); + internalCluster().startDataOnlyNodes(3); + + assertAcked(client().admin().indices().prepareClose(indices[0], indices[1])); + client().admin() + .cluster() + .restoreRemoteStore(new RestoreRemoteStoreRequest().indices(indices[0], indices[1]), PlainActionFuture.newFuture()); + ensureGreen(indices[0], indices[1]); + assertEquals(shardCount, getNumShards(indices[0]).totalNumShards); + verifyRestoredData(indicesStats.get(indices[0]), true, indices[0]); + assertEquals(shardCount, getNumShards(indices[1]).totalNumShards); + verifyRestoredData(indicesStats.get(indices[1]), true, indices[1]); + ensureRed(indices[2], indices[3]); + } + + /** + * Simulates refreshed data restored using Remote Segment Store + * and unrefreshed data restored using Remote Translog Store, + * with all remote-enabled red indices being considered for the restore + * except those matching the specified exclusion pattern. + * @throws IOException IO Exception. + */ + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8480") + public void testRTSRestoreWithCommittedDataExcludeIndicesPatterns() throws IOException { + int shardCount = randomIntBetween(1, 5); + prepareCluster(1, 3, true, INDEX_NAMES, 1, shardCount); + String[] indices = INDEX_NAMES.split(","); + Map> indicesStats = new HashMap<>(); + for (String index : indices) { + Map indexStats = indexData(2, true, index); + indicesStats.put(index, indexStats); + assertEquals(shardCount, getNumShards(index).totalNumShards); + } + + for (String index : indices) { + if (ClusterHealthStatus.RED.equals(ensureRed(index))) { + continue; + } + + if (ClusterHealthStatus.GREEN.equals(ensureRed(index))) { + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(replicaNodeName(index))); + } + + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(index))); + } + + ensureRed(indices); + internalCluster().startDataOnlyNodes(3); + + assertAcked(client().admin().indices().prepareClose(indices[0], indices[1])); + client().admin() + .cluster() + .restoreRemoteStore(new RestoreRemoteStoreRequest().indices("*", "-remote-store-test-index-*"), PlainActionFuture.newFuture()); + ensureGreen(indices[0], indices[1]); + assertEquals(shardCount, getNumShards(indices[0]).totalNumShards); + verifyRestoredData(indicesStats.get(indices[0]), true, indices[0]); + assertEquals(shardCount, getNumShards(indices[1]).totalNumShards); + verifyRestoredData(indicesStats.get(indices[1]), true, indices[1]); + ensureRed(indices[2], indices[3]); + } + + /** + * Simulates no-op restore from remote store, + * when the index has no data. + * @throws IOException IO Exception. + */ + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6188") + public void testRTSRestoreNoData() throws IOException { + testRestoreFlow(true, 0, true, randomIntBetween(1, 5)); + } + + // TODO: Restore flow - index aliases + + private void testPeerRecovery(boolean remoteTranslog, int numberOfIterations, boolean invokeFlush) throws Exception { + internalCluster().startDataOnlyNodes(3); + if (remoteTranslog) { + createIndex(INDEX_NAME, remoteTranslogIndexSettings(0)); + } else { + createIndex(INDEX_NAME, remoteStoreIndexSettings(0)); + } + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + Map indexStats = indexData(numberOfIterations, invokeFlush, INDEX_NAME); + + client().admin() + .indices() + .prepareUpdateSettings(INDEX_NAME) + .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)) + .get(); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + refresh(INDEX_NAME); + String replicaNodeName = replicaNodeName(INDEX_NAME); + assertBusy( + () -> assertHitCount(client(replicaNodeName).prepareSearch(INDEX_NAME).setSize(0).get(), indexStats.get(TOTAL_OPERATIONS)), + 30, + TimeUnit.SECONDS + ); + + RecoveryResponse recoveryResponse = client(replicaNodeName).admin().indices().prepareRecoveries().get(); + + Optional recoverySource = recoveryResponse.shardRecoveryStates() + .get(INDEX_NAME) + .stream() + .filter(rs -> rs.getRecoverySource().getType() == RecoverySource.Type.PEER) + .findFirst(); + assertFalse(recoverySource.isEmpty()); + if (numberOfIterations == 1 && invokeFlush) { + // segments_N file is copied to new replica + assertEquals(1, recoverySource.get().getIndex().recoveredFileCount()); + } else { + assertEquals(0, recoverySource.get().getIndex().recoveredFileCount()); + } + + IndexResponse response = indexSingleDoc(); + assertEquals(indexStats.get(MAX_SEQ_NO_TOTAL) + 1, response.getSeqNo()); + refresh(INDEX_NAME); + assertBusy( + () -> assertHitCount(client(replicaNodeName).prepareSearch(INDEX_NAME).setSize(0).get(), indexStats.get(TOTAL_OPERATIONS) + 1), + 30, + TimeUnit.SECONDS + ); + } + + public void testPeerRecoveryWithRemoteStoreNoRemoteTranslogNoDataFlush() throws Exception { + testPeerRecovery(false, 1, true); + } + + public void testPeerRecoveryWithRemoteStoreNoRemoteTranslogFlush() throws Exception { + testPeerRecovery(false, randomIntBetween(2, 5), true); + } + + public void testPeerRecoveryWithRemoteStoreNoRemoteTranslogNoDataRefresh() throws Exception { + testPeerRecovery(false, 1, false); + } + + public void testPeerRecoveryWithRemoteStoreNoRemoteTranslogRefresh() throws Exception { + testPeerRecovery(false, randomIntBetween(2, 5), false); + } + + public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogNoDataFlush() throws Exception { + testPeerRecovery(true, 1, true); + } + + public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogFlush() throws Exception { + testPeerRecovery(true, randomIntBetween(2, 5), true); + } + + public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogNoDataRefresh() throws Exception { + testPeerRecovery(true, 1, false); + } + + public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogRefresh() throws Exception { + testPeerRecovery(true, randomIntBetween(2, 5), false); + } + + private void verifyRemoteStoreCleanup(boolean remoteTranslog) throws Exception { + internalCluster().startDataOnlyNodes(3); + if (remoteTranslog) { + createIndex(INDEX_NAME, remoteTranslogIndexSettings(1)); + } else { + createIndex(INDEX_NAME, remoteStoreIndexSettings(1)); + } + + indexData(5, randomBoolean(), INDEX_NAME); + String indexUUID = client().admin() + .indices() + .prepareGetSettings(INDEX_NAME) + .get() + .getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID); + Path indexPath = Path.of(String.valueOf(absolutePath), indexUUID); + assertTrue(getFileCount(indexPath) > 0); + assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get()); + // Delete is async. Give time for it + assertBusy(() -> { + try { + assertThat(getFileCount(indexPath), comparesEqualTo(0)); + } catch (Exception e) {} + }, 30, TimeUnit.SECONDS); + } + + public void testRemoteSegmentCleanup() throws Exception { + verifyRemoteStoreCleanup(false); + } + + public void testRemoteTranslogCleanup() throws Exception { + verifyRemoteStoreCleanup(true); + } + + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8658") + public void testStaleCommitDeletionWithInvokeFlush() throws Exception { + internalCluster().startDataOnlyNodes(3); + createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l)); + int numberOfIterations = randomIntBetween(5, 15); + indexData(numberOfIterations, true, INDEX_NAME); + String indexUUID = client().admin() + .indices() + .prepareGetSettings(INDEX_NAME) + .get() + .getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID); + Path indexPath = Path.of(String.valueOf(absolutePath), indexUUID, "/0/segments/metadata"); + // Delete is async. + assertBusy(() -> { + int actualFileCount = getFileCount(indexPath); + if (numberOfIterations <= RemoteStoreRefreshListener.LAST_N_METADATA_FILES_TO_KEEP) { + MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations, numberOfIterations + 1))); + } else { + // As delete is async its possible that the file gets created before the deletion or after + // deletion. + MatcherAssert.assertThat(actualFileCount, is(oneOf(10, 11))); + } + }, 30, TimeUnit.SECONDS); + } + + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8658") + public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception { + internalCluster().startDataOnlyNodes(3); + createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l)); + int numberOfIterations = randomIntBetween(5, 15); + indexData(numberOfIterations, false, INDEX_NAME); + String indexUUID = client().admin() + .indices() + .prepareGetSettings(INDEX_NAME) + .get() + .getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID); + Path indexPath = Path.of(String.valueOf(absolutePath), indexUUID, "/0/segments/metadata"); + int actualFileCount = getFileCount(indexPath); + // We also allow (numberOfIterations + 1) as index creation also triggers refresh. + MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations, numberOfIterations + 1))); + } +} diff --git a/server/src/main/java/org/opensearch/cluster/routing/RecoverySource.java b/server/src/main/java/org/opensearch/cluster/routing/RecoverySource.java index 6ffa155729c7a..ba889ff8c00c2 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RecoverySource.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RecoverySource.java @@ -347,6 +347,13 @@ public int hashCode() { return Objects.hash(restoreUUID, snapshot, index, version); } + // TODO: This override should be removed/be updated to return "true", + // i.e. we expect no retention leases, once the following issue is fixed: + // https://github.com/opensearch-project/OpenSearch/issues/8795 + @Override + public boolean expectEmptyRetentionLeases() { + return false; + } } /**