diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotV2IT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotV2IT.java index 69e85b13548e0..b00a8c8d64ed9 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotV2IT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotV2IT.java @@ -37,6 +37,7 @@ import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest; import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse; import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.action.delete.DeleteResponse; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.action.support.master.AcknowledgedResponse; @@ -174,6 +175,118 @@ public void testCloneShallowCopyV2() throws Exception { assertThat(cloneSnapshotInfo.totalShards(), equalTo(sourceSnapshotInfo.totalShards())); } + public void testCloneShallowCopyV2DeletedIndex() throws Exception { + disableRepoConsistencyCheck("Remote store repository is being used in the test"); + final Path remoteStoreRepoPath = randomRepoPath(); + internalCluster().startClusterManagerOnlyNode(snapshotV2Settings(remoteStoreRepoPath)); + internalCluster().startDataOnlyNode(snapshotV2Settings(remoteStoreRepoPath)); + internalCluster().startDataOnlyNode(snapshotV2Settings(remoteStoreRepoPath)); + + String indexName1 = "testindex1"; + String indexName2 = "testindex2"; + String indexName3 = "testindex3"; + String snapshotRepoName = "test-clone-snapshot-repo"; + String snapshotName1 = "test-create-snapshot1"; + Path absolutePath1 = randomRepoPath().toAbsolutePath(); + logger.info("Snapshot Path [{}]", absolutePath1); + + Client client = client(); + + assertAcked( + client.admin() + .cluster() + .preparePutRepository(snapshotRepoName) + .setType(FsRepository.TYPE) + .setSettings( + Settings.builder() + .put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1) + .put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean()) + .put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES) + .put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true) + .put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true) + ) + ); + + createIndex(indexName1, getRemoteStoreBackedIndexSettings()); + createIndex(indexName2, getRemoteStoreBackedIndexSettings()); + + final int numDocsInIndex1 = 10; + final int numDocsInIndex2 = 20; + indexRandomDocs(indexName1, numDocsInIndex1); + indexRandomDocs(indexName2, numDocsInIndex2); + ensureGreen(indexName1, indexName2); + + CreateSnapshotResponse createSnapshotResponse = client().admin() + .cluster() + .prepareCreateSnapshot(snapshotRepoName, snapshotName1) + .setWaitForCompletion(true) + .get(); + SnapshotInfo sourceSnapshotInfo = createSnapshotResponse.getSnapshotInfo(); + assertThat(sourceSnapshotInfo.state(), equalTo(SnapshotState.SUCCESS)); + assertThat(sourceSnapshotInfo.successfulShards(), greaterThan(0)); + assertThat(sourceSnapshotInfo.successfulShards(), equalTo(sourceSnapshotInfo.totalShards())); + assertThat(sourceSnapshotInfo.snapshotId().getName(), equalTo(snapshotName1)); + + // Validate that the snapshot was created + final BlobStoreRepository repository = (BlobStoreRepository) internalCluster().getCurrentClusterManagerNodeInstance( + RepositoriesService.class + ).repository(snapshotRepoName); + PlainActionFuture repositoryDataPlainActionFuture = new PlainActionFuture<>(); + repository.getRepositoryData(repositoryDataPlainActionFuture); + + RepositoryData repositoryData = repositoryDataPlainActionFuture.get(); + + assertTrue(repositoryData.getSnapshotIds().contains(sourceSnapshotInfo.snapshotId())); + + createIndex(indexName3, getRemoteStoreBackedIndexSettings()); + indexRandomDocs(indexName3, 10); + ensureGreen(indexName3); + + assertAcked(client().admin().indices().delete(new DeleteIndexRequest(indexName1)).get()); + assertAcked(client().admin().indices().delete(new DeleteIndexRequest(indexName2)).get()); + + AcknowledgedResponse response = client().admin() + .cluster() + .prepareCloneSnapshot(snapshotRepoName, snapshotName1, "test_clone_snapshot1") + .setIndices("*") + .get(); + assertTrue(response.isAcknowledged()); + awaitClusterManagerFinishRepoOperations(); + + AtomicReference cloneSnapshotId = new AtomicReference<>(); + // Validate that snapshot is present in repository data + waitUntil(() -> { + PlainActionFuture repositoryDataPlainActionFutureClone = new PlainActionFuture<>(); + repository.getRepositoryData(repositoryDataPlainActionFutureClone); + + RepositoryData repositoryData1; + try { + repositoryData1 = repositoryDataPlainActionFutureClone.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + for (SnapshotId snapshotId : repositoryData1.getSnapshotIds()) { + if (snapshotId.getName().equals("test_clone_snapshot1")) { + cloneSnapshotId.set(snapshotId); + return true; + } + } + return false; + }, 90, TimeUnit.SECONDS); + + final SnapshotId cloneSnapshotIdFinal = cloneSnapshotId.get(); + SnapshotInfo cloneSnapshotInfo = PlainActionFuture.get( + f -> repository.threadPool().generic().execute(ActionRunnable.supply(f, () -> repository.getSnapshotInfo(cloneSnapshotIdFinal))) + ); + + assertThat(cloneSnapshotInfo.getPinnedTimestamp(), equalTo(sourceSnapshotInfo.getPinnedTimestamp())); + for (String index : sourceSnapshotInfo.indices()) { + assertTrue(cloneSnapshotInfo.indices().contains(index)); + + } + assertThat(cloneSnapshotInfo.totalShards(), equalTo(sourceSnapshotInfo.totalShards())); + } + public void testCloneShallowCopyAfterDisablingV2() throws Exception { disableRepoConsistencyCheck("Remote store repository is being used in the test"); final Path remoteStoreRepoPath = randomRepoPath(); diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java index 6688c7dd0431a..6e66f8c958666 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java @@ -885,7 +885,6 @@ public void cloneSnapshotV2( ) { long startTime = System.currentTimeMillis(); - ClusterState currentState = clusterService.state(); String snapshotName = snapshot.getSnapshotId().getName(); repository.executeConsistentStateUpdate(repositoryData -> new ClusterStateUpdateTask(Priority.URGENT) { private SnapshotsInProgress.Entry newEntry; @@ -963,8 +962,6 @@ public void clusterStateProcessed(String source, ClusterState oldState, final Cl final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); executor.execute(ActionRunnable.supply(snapshotInfoListener, () -> repository.getSnapshotInfo(sourceSnapshotId))); - final ShardGenerations shardGenerations = repositoryData.shardGenerations(); - snapshotInfoListener.whenComplete(snapshotInfo -> { final SnapshotInfo cloneSnapshotInfo = new SnapshotInfo( snapshot.getSnapshotId(), @@ -984,17 +981,28 @@ public void clusterStateProcessed(String source, ClusterState oldState, final Cl throw new SnapshotException(repositoryName, snapshotName, "Aborting snapshot-v2 clone, no longer cluster manager"); } final StepListener pinnedTimestampListener = new StepListener<>(); - pinnedTimestampListener.whenComplete(repoData -> { + final StepListener metadataListener = new StepListener<>(); + pinnedTimestampListener.whenComplete( + rData -> threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(metadataListener, () -> { + final Metadata.Builder metaBuilder = Metadata.builder(repository.getSnapshotGlobalMetadata(newEntry.source())); + for (IndexId index : newEntry.indices()) { + metaBuilder.put(repository.getSnapshotIndexMetaData(repositoryData, newEntry.source(), index), false); + } + return metaBuilder.build(); + })), + e -> { + logger.error("Failed to update pinned timestamp for snapshot-v2 {} {} ", repositoryName, snapshotName); + stateWithoutSnapshotV2(newState); + leaveRepoLoop(repositoryName); + listener.onFailure(e); + } + ); + metadataListener.whenComplete(meta -> { + ShardGenerations shardGenerations = buildGenerationsV2(newEntry, meta); repository.finalizeSnapshot( shardGenerations, repositoryData.getGenId(), - metadataForSnapshot( - currentState.metadata(), - newEntry.includeGlobalState(), - false, - newEntry.dataStreams(), - newEntry.indices() - ), + metadataForSnapshot(meta, newEntry.includeGlobalState(), false, newEntry.dataStreams(), newEntry.indices()), cloneSnapshotInfo, repositoryData.getVersion(sourceSnapshotId), state -> stateWithoutSnapshot(state, snapshot), @@ -1038,7 +1046,7 @@ public void onFailure(Exception e) { } ); }, e -> { - logger.error("Failed to update pinned timestamp for snapshot-v2 {} {} ", repositoryName, snapshotName); + logger.error("Failed to retrieve metadata for snapshot-v2 {} {} ", repositoryName, snapshotName); stateWithoutSnapshotV2(newState); leaveRepoLoop(repositoryName); listener.onFailure(e); @@ -1544,6 +1552,17 @@ private static ShardGenerations buildGenerations(SnapshotsInProgress.Entry snaps return builder.build(); } + private static ShardGenerations buildGenerationsV2(SnapshotsInProgress.Entry snapshot, Metadata metadata) { + ShardGenerations.Builder builder = ShardGenerations.builder(); + snapshot.indices().forEach(indexId -> { + int shardCount = metadata.index(indexId.getName()).getNumberOfShards(); + for (int i = 0; i < shardCount; i++) { + builder.put(indexId, i, null); + } + }); + return builder.build(); + } + private static Metadata metadataForSnapshot( Metadata metadata, boolean includeGlobalState,