From 72c0be0e9303bc251a084d203b24ba5b352a6918 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Fri, 6 Sep 2024 06:08:04 +0000 Subject: [PATCH] [Snapshot V2] Support pinned timestamp in clone flow (#15524) Signed-off-by: Anshu Agarwal (cherry picked from commit debd04008b56f9f7e54f1d9380dff869d6d81141) Signed-off-by: github-actions[bot] --- .../opensearch/snapshots/CloneSnapshotIT.java | 15 +- .../snapshots/CloneSnapshotV2IT.java | 438 ++++++++++++++++++ .../clone/TransportCloneSnapshotAction.java | 2 +- .../snapshots/SnapshotsService.java | 272 ++++++++++- 4 files changed, 713 insertions(+), 14 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotV2IT.java diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotIT.java index 00e2d9bd92158..e694bf2b6ba80 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotIT.java @@ -64,7 +64,6 @@ import java.util.List; import java.util.concurrent.ExecutionException; -import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.hasSize; @@ -145,7 +144,7 @@ public void testShardClone() throws Exception { } public void testCloneSnapshotIndex() throws Exception { - internalCluster().startClusterManagerOnlyNode(); + internalCluster().startClusterManagerOnlyNode(LARGE_SNAPSHOT_POOL_SETTINGS); internalCluster().startDataOnlyNode(); final String repoName = "repo-name"; createRepository(repoName, "fs"); @@ -336,7 +335,7 @@ public void testClonePreventsSnapshotDelete() throws Exception { indexRandomDocs(indexName, randomIntBetween(20, 100)); final String targetSnapshot = "target-snapshot"; - blockNodeOnAnyFiles(repoName, clusterManagerName); + blockClusterManagerOnWriteIndexFile(repoName); final ActionFuture cloneFuture = startClone(repoName, sourceSnapshot, targetSnapshot, indexName); waitForBlock(clusterManagerName, repoName, TimeValue.timeValueSeconds(30L)); assertFalse(cloneFuture.isDone()); @@ -444,7 +443,7 @@ public void testLongRunningSnapshotAllowsConcurrentClone() throws Exception { } public void testDeletePreventsClone() throws Exception { - final String clusterManagerName = internalCluster().startClusterManagerOnlyNode(); + final String clusterManagerName = internalCluster().startClusterManagerOnlyNode(LARGE_SNAPSHOT_POOL_SETTINGS); internalCluster().startDataOnlyNode(); final String repoName = "repo-name"; createRepository(repoName, "mock"); @@ -457,7 +456,7 @@ public void testDeletePreventsClone() throws Exception { indexRandomDocs(indexName, randomIntBetween(20, 100)); final String targetSnapshot = "target-snapshot"; - blockNodeOnAnyFiles(repoName, clusterManagerName); + blockClusterManagerOnWriteIndexFile(repoName); final ActionFuture deleteFuture = startDeleteSnapshot(repoName, sourceSnapshot); waitForBlock(clusterManagerName, repoName, TimeValue.timeValueSeconds(30L)); assertFalse(deleteFuture.isDone()); @@ -609,7 +608,7 @@ public void testClusterManagerFailoverDuringCloneStep2() throws Exception { public void testExceptionDuringShardClone() throws Exception { // large snapshot pool so blocked snapshot threads from cloning don't prevent concurrent snapshot finalizations - internalCluster().startClusterManagerOnlyNodes(3, LARGE_SNAPSHOT_POOL_SETTINGS); + internalCluster().startClusterManagerOnlyNode(LARGE_SNAPSHOT_POOL_SETTINGS); internalCluster().startDataOnlyNode(); final String repoName = "test-repo"; createRepository(repoName, "mock"); @@ -620,7 +619,7 @@ public void testExceptionDuringShardClone() throws Exception { createFullSnapshot(repoName, sourceSnapshot); final String targetSnapshot = "target-snapshot"; - blockClusterManagerFromFinalizingSnapshotOnSnapFile(repoName); + blockClusterManagerFromFinalizingSnapshotOnIndexFile(repoName); final ActionFuture cloneFuture = startCloneFromDataNode(repoName, sourceSnapshot, targetSnapshot, testIndex); awaitNumberOfSnapshotsInProgress(1); final String clusterManagerNode = internalCluster().getClusterManagerName(); @@ -698,7 +697,7 @@ public void testStartSnapshotWithSuccessfulShardClonePendingFinalization() throw } public void testStartCloneWithSuccessfulShardClonePendingFinalization() throws Exception { - final String clusterManagerName = internalCluster().startClusterManagerOnlyNode(); + final String clusterManagerName = internalCluster().startClusterManagerOnlyNode(LARGE_SNAPSHOT_POOL_SETTINGS); internalCluster().startDataOnlyNode(); final String repoName = "test-repo"; createRepository(repoName, "mock"); diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotV2IT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotV2IT.java new file mode 100644 index 0000000000000..c6744ae62db60 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotV2IT.java @@ -0,0 +1,438 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.snapshots; + +import org.opensearch.action.ActionRunnable; +import org.opensearch.action.DocWriteResponse; +import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; +import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; +import org.opensearch.action.delete.DeleteResponse; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.client.Client; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.indices.RemoteStoreSettings; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.RepositoryData; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.repositories.fs.FsRepository; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.nio.file.Path; + +import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class CloneSnapshotV2IT extends AbstractSnapshotIntegTestCase { + + public void testCloneShallowCopyV2() throws Exception { + disableRepoConsistencyCheck("Remote store repository is being used in the test"); + final Path remoteStoreRepoPath = randomRepoPath(); + internalCluster().startClusterManagerOnlyNode(snapshotV2Settings(remoteStoreRepoPath)); + internalCluster().startDataOnlyNode(snapshotV2Settings(remoteStoreRepoPath)); + internalCluster().startDataOnlyNode(snapshotV2Settings(remoteStoreRepoPath)); + + String indexName1 = "testindex1"; + String indexName2 = "testindex2"; + String indexName3 = "testindex3"; + String snapshotRepoName = "test-clone-snapshot-repo"; + String snapshotName1 = "test-create-snapshot1"; + Path absolutePath1 = randomRepoPath().toAbsolutePath(); + logger.info("Snapshot Path [{}]", absolutePath1); + + Client client = client(); + + assertAcked( + client.admin() + .cluster() + .preparePutRepository(snapshotRepoName) + .setType(FsRepository.TYPE) + .setSettings( + Settings.builder() + .put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1) + .put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean()) + .put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES) + .put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true) + .put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true) + ) + ); + + createIndex(indexName1, getRemoteStoreBackedIndexSettings()); + createIndex(indexName2, getRemoteStoreBackedIndexSettings()); + + final int numDocsInIndex1 = 10; + final int numDocsInIndex2 = 20; + indexRandomDocs(indexName1, numDocsInIndex1); + indexRandomDocs(indexName2, numDocsInIndex2); + ensureGreen(indexName1, indexName2); + + CreateSnapshotResponse createSnapshotResponse = client().admin() + .cluster() + .prepareCreateSnapshot(snapshotRepoName, snapshotName1) + .setWaitForCompletion(true) + .get(); + SnapshotInfo sourceSnapshotInfo = createSnapshotResponse.getSnapshotInfo(); + assertThat(sourceSnapshotInfo.state(), equalTo(SnapshotState.SUCCESS)); + assertThat(sourceSnapshotInfo.successfulShards(), greaterThan(0)); + assertThat(sourceSnapshotInfo.successfulShards(), equalTo(sourceSnapshotInfo.totalShards())); + assertThat(sourceSnapshotInfo.snapshotId().getName(), equalTo(snapshotName1)); + + // Validate that the snapshot was created + final BlobStoreRepository repository = (BlobStoreRepository) internalCluster().getCurrentClusterManagerNodeInstance( + RepositoriesService.class + ).repository(snapshotRepoName); + PlainActionFuture repositoryDataPlainActionFuture = new PlainActionFuture<>(); + repository.getRepositoryData(repositoryDataPlainActionFuture); + + RepositoryData repositoryData = repositoryDataPlainActionFuture.get(); + + assertTrue(repositoryData.getSnapshotIds().contains(sourceSnapshotInfo.snapshotId())); + + createIndex(indexName3, getRemoteStoreBackedIndexSettings()); + indexRandomDocs(indexName3, 10); + ensureGreen(indexName3); + + AcknowledgedResponse response = client().admin() + .cluster() + .prepareCloneSnapshot(snapshotRepoName, snapshotName1, "test_clone_snapshot1") + .setIndices("*") + .get(); + assertTrue(response.isAcknowledged()); + awaitClusterManagerFinishRepoOperations(); + + // Validate that snapshot is present in repository data + PlainActionFuture repositoryDataPlainActionFutureClone = new PlainActionFuture<>(); + repository.getRepositoryData(repositoryDataPlainActionFutureClone); + + repositoryData = repositoryDataPlainActionFutureClone.get(); + assertEquals(repositoryData.getSnapshotIds().size(), 2); + boolean foundCloneInRepoData = false; + SnapshotId cloneSnapshotId = null; + for (SnapshotId snapshotId : repositoryData.getSnapshotIds()) { + if (snapshotId.getName().equals("test_clone_snapshot1")) { + foundCloneInRepoData = true; + cloneSnapshotId = snapshotId; + } + } + final SnapshotId cloneSnapshotIdFinal = cloneSnapshotId; + SnapshotInfo cloneSnapshotInfo = PlainActionFuture.get( + f -> repository.threadPool().generic().execute(ActionRunnable.supply(f, () -> repository.getSnapshotInfo(cloneSnapshotIdFinal))) + ); + + assertTrue(foundCloneInRepoData); + + assertThat(cloneSnapshotInfo.getPinnedTimestamp(), equalTo(sourceSnapshotInfo.getPinnedTimestamp())); + for (String index : sourceSnapshotInfo.indices()) { + assertTrue(cloneSnapshotInfo.indices().contains(index)); + + } + assertThat(cloneSnapshotInfo.totalShards(), equalTo(sourceSnapshotInfo.totalShards())); + } + + public void testCloneShallowCopyAfterDisablingV2() throws Exception { + disableRepoConsistencyCheck("Remote store repository is being used in the test"); + final Path remoteStoreRepoPath = randomRepoPath(); + internalCluster().startClusterManagerOnlyNode(snapshotV2Settings(remoteStoreRepoPath)); + internalCluster().startDataOnlyNode(snapshotV2Settings(remoteStoreRepoPath)); + internalCluster().startDataOnlyNode(snapshotV2Settings(remoteStoreRepoPath)); + + String indexName1 = "testindex1"; + String indexName2 = "testindex2"; + String indexName3 = "testindex3"; + String snapshotRepoName = "test-clone-snapshot-repo"; + String sourceSnapshotV2 = "test-source-snapshot-v2"; + String sourceSnapshotV1 = "test-source-snapshot-v1"; + String cloneSnapshotV2 = "test-clone-snapshot-v2"; + Path absolutePath1 = randomRepoPath().toAbsolutePath(); + logger.info("Snapshot Path [{}]", absolutePath1); + + Client client = client(); + + assertAcked( + client.admin() + .cluster() + .preparePutRepository(snapshotRepoName) + .setType(FsRepository.TYPE) + .setSettings( + Settings.builder() + .put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1) + .put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean()) + .put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES) + .put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true) + .put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true) + ) + ); + + createIndex(indexName1, getRemoteStoreBackedIndexSettings()); + createIndex(indexName2, getRemoteStoreBackedIndexSettings()); + + final int numDocsInIndex1 = 10; + final int numDocsInIndex2 = 20; + indexRandomDocs(indexName1, numDocsInIndex1); + indexRandomDocs(indexName2, numDocsInIndex2); + ensureGreen(indexName1, indexName2); + + // create source snapshot which is v2 + CreateSnapshotResponse createSnapshotResponse = client().admin() + .cluster() + .prepareCreateSnapshot(snapshotRepoName, sourceSnapshotV2) + .setWaitForCompletion(true) + .get(); + SnapshotInfo sourceSnapshotInfo = createSnapshotResponse.getSnapshotInfo(); + assertThat(sourceSnapshotInfo.state(), equalTo(SnapshotState.SUCCESS)); + assertThat(sourceSnapshotInfo.successfulShards(), greaterThan(0)); + assertThat(sourceSnapshotInfo.successfulShards(), equalTo(sourceSnapshotInfo.totalShards())); + assertThat(sourceSnapshotInfo.snapshotId().getName(), equalTo(sourceSnapshotV2)); + + // Validate that the snapshot was created + final BlobStoreRepository repository = (BlobStoreRepository) internalCluster().getCurrentClusterManagerNodeInstance( + RepositoriesService.class + ).repository(snapshotRepoName); + PlainActionFuture repositoryDataPlainActionFuture = new PlainActionFuture<>(); + repository.getRepositoryData(repositoryDataPlainActionFuture); + + RepositoryData repositoryData = repositoryDataPlainActionFuture.get(); + + assertTrue(repositoryData.getSnapshotIds().contains(sourceSnapshotInfo.snapshotId())); + + createIndex(indexName3, getRemoteStoreBackedIndexSettings()); + indexRandomDocs(indexName3, 10); + ensureGreen(indexName3); + + // disable snapshot v2 in repo + assertAcked( + client.admin() + .cluster() + .preparePutRepository(snapshotRepoName) + .setType(FsRepository.TYPE) + .setSettings( + Settings.builder() + .put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1) + .put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean()) + .put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES) + .put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true) + .put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), false) + ) + ); + + // validate that the created snapshot is v1 + CreateSnapshotResponse createSnapshotResponseV1 = client().admin() + .cluster() + .prepareCreateSnapshot(snapshotRepoName, sourceSnapshotV1) + .setWaitForCompletion(true) + .get(); + SnapshotInfo sourceSnapshotInfoV1 = createSnapshotResponseV1.getSnapshotInfo(); + assertThat(sourceSnapshotInfoV1.state(), equalTo(SnapshotState.SUCCESS)); + assertThat(sourceSnapshotInfoV1.successfulShards(), greaterThan(0)); + assertThat(sourceSnapshotInfoV1.successfulShards(), equalTo(sourceSnapshotInfoV1.totalShards())); + assertThat(sourceSnapshotInfoV1.getPinnedTimestamp(), equalTo(0L)); + + // Validate that snapshot is present in repository data + PlainActionFuture repositoryDataV1PlainActionFuture = new PlainActionFuture<>(); + BlobStoreRepository repositoryV1 = (BlobStoreRepository) internalCluster().getCurrentClusterManagerNodeInstance( + RepositoriesService.class + ).repository(snapshotRepoName); + repositoryV1.getRepositoryData(repositoryDataV1PlainActionFuture); + + repositoryData = repositoryDataV1PlainActionFuture.get(); + + assertTrue(repositoryData.getSnapshotIds().contains(sourceSnapshotInfoV1.snapshotId())); + assertEquals(repositoryData.getSnapshotIds().size(), 2); + + // clone should get created for v2 snapshot + AcknowledgedResponse response = client().admin() + .cluster() + .prepareCloneSnapshot(snapshotRepoName, sourceSnapshotV2, cloneSnapshotV2) + .setIndices("*") + .get(); + assertTrue(response.isAcknowledged()); + awaitClusterManagerFinishRepoOperations(); + + // Validate that snapshot is present in repository data + PlainActionFuture repositoryDataCloneV2PlainActionFuture = new PlainActionFuture<>(); + BlobStoreRepository repositoryCloneV2 = (BlobStoreRepository) internalCluster().getCurrentClusterManagerNodeInstance( + RepositoriesService.class + ).repository(snapshotRepoName); + repositoryCloneV2.getRepositoryData(repositoryDataCloneV2PlainActionFuture); + + repositoryData = repositoryDataCloneV2PlainActionFuture.get(); + + assertEquals(repositoryData.getSnapshotIds().size(), 3); + boolean foundCloneInRepoData = false; + SnapshotId cloneSnapshotId = null; + for (SnapshotId snapshotId : repositoryData.getSnapshotIds()) { + if (snapshotId.getName().equals(cloneSnapshotV2)) { + foundCloneInRepoData = true; + cloneSnapshotId = snapshotId; + } + } + final SnapshotId cloneSnapshotIdFinal = cloneSnapshotId; + SnapshotInfo cloneSnapshotInfo = PlainActionFuture.get( + f -> repository.threadPool().generic().execute(ActionRunnable.supply(f, () -> repository.getSnapshotInfo(cloneSnapshotIdFinal))) + ); + + assertTrue(foundCloneInRepoData); + // pinned timestamp value in clone snapshot v2 matches source snapshot v2 + assertThat(cloneSnapshotInfo.getPinnedTimestamp(), equalTo(sourceSnapshotInfo.getPinnedTimestamp())); + for (String index : sourceSnapshotInfo.indices()) { + assertTrue(cloneSnapshotInfo.indices().contains(index)); + + } + assertThat(cloneSnapshotInfo.totalShards(), equalTo(sourceSnapshotInfo.totalShards())); + + } + + public void testRestoreFromClone() throws Exception { + disableRepoConsistencyCheck("Remote store repository is being used in the test"); + final Path remoteStoreRepoPath = randomRepoPath(); + + internalCluster().startClusterManagerOnlyNode(snapshotV2Settings(remoteStoreRepoPath)); + internalCluster().startDataOnlyNode(snapshotV2Settings(remoteStoreRepoPath)); + internalCluster().startDataOnlyNode(snapshotV2Settings(remoteStoreRepoPath)); + + String indexName1 = "testindex1"; + String indexName2 = "testindex2"; + + String snapshotRepoName = "test-clone-snapshot-repo"; + String sourceSnapshot = "test-source-snapshot"; + String cloneSnapshot = "test-clone-snapshot"; + + Path absolutePath1 = randomRepoPath().toAbsolutePath(); + logger.info("Snapshot Path [{}]", absolutePath1); + + String restoredIndexName1 = indexName1 + "-restored"; + + Client client = client(); + + assertAcked( + client.admin() + .cluster() + .preparePutRepository(snapshotRepoName) + .setType(FsRepository.TYPE) + .setSettings( + Settings.builder() + .put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1) + .put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean()) + .put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES) + .put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true) + .put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true) + ) + ); + + createIndex(indexName1, getRemoteStoreBackedIndexSettings()); + createIndex(indexName2, getRemoteStoreBackedIndexSettings()); + + final int numDocsInIndex1 = 10; + final int numDocsInIndex2 = 20; + indexRandomDocs(indexName1, numDocsInIndex1); + indexRandomDocs(indexName2, numDocsInIndex2); + ensureGreen(indexName1, indexName2); + + logger.info("--> create source snapshot"); + + CreateSnapshotResponse createSnapshotResponse = client().admin() + .cluster() + .prepareCreateSnapshot(snapshotRepoName, sourceSnapshot) + .setWaitForCompletion(true) + .get(); + SnapshotInfo sourceSnapshotInfo = createSnapshotResponse.getSnapshotInfo(); + assertThat(sourceSnapshotInfo.state(), equalTo(SnapshotState.SUCCESS)); + assertThat(sourceSnapshotInfo.successfulShards(), greaterThan(0)); + assertThat(sourceSnapshotInfo.successfulShards(), equalTo(sourceSnapshotInfo.totalShards())); + assertThat(sourceSnapshotInfo.snapshotId().getName(), equalTo(sourceSnapshot)); + + AcknowledgedResponse response = client().admin() + .cluster() + .prepareCloneSnapshot(snapshotRepoName, sourceSnapshot, cloneSnapshot) + .setIndices("*") + .get(); + assertTrue(response.isAcknowledged()); + + DeleteResponse deleteResponse = client().prepareDelete(indexName1, "0").execute().actionGet(); + assertEquals(deleteResponse.getResult(), DocWriteResponse.Result.DELETED); + ensureGreen(indexName1); + + deleteResponse = client().prepareDelete(indexName1, "1").execute().actionGet(); + assertEquals(deleteResponse.getResult(), DocWriteResponse.Result.DELETED); + ensureGreen(indexName1); + + // delete the source snapshot + AcknowledgedResponse deleteSnapshotResponse = internalCluster().clusterManagerClient() + .admin() + .cluster() + .prepareDeleteSnapshot(snapshotRepoName, sourceSnapshot) + .get(); + assertAcked(deleteSnapshotResponse); + + deleteResponse = client().prepareDelete(indexName1, "2").execute().actionGet(); + assertEquals(deleteResponse.getResult(), DocWriteResponse.Result.DELETED); + ensureGreen(indexName1); + ensureGreen(indexName1); + + // restore from clone + RestoreSnapshotResponse restoreSnapshotResponse1 = client.admin() + .cluster() + .prepareRestoreSnapshot(snapshotRepoName, cloneSnapshot) + .setWaitForCompletion(true) + .setIndices(indexName1) + .setRenamePattern(indexName1) + .setRenameReplacement(restoredIndexName1) + .get(); + + assertEquals(restoreSnapshotResponse1.status(), RestStatus.OK); + ensureGreen(restoredIndexName1, indexName2); + assertDocsPresentInIndex(client, restoredIndexName1, numDocsInIndex1); + assertDocsPresentInIndex(client, indexName2, numDocsInIndex2); + } + + private void assertDocsPresentInIndex(Client client, String indexName, int numOfDocs) { + for (int i = 0; i < numOfDocs; i++) { + String id = Integer.toString(i); + logger.info("checking for index " + indexName + " with docId" + id); + assertTrue("doc with id" + id + " is not present for index " + indexName, client.prepareGet(indexName, id).get().isExists()); + } + } + + private Settings snapshotV2Settings(Path remoteStoreRepoPath) { + String REMOTE_REPO_NAME = "remote-store-repo-name"; + Settings settings = Settings.builder() + .put(remoteStoreClusterSettings(REMOTE_REPO_NAME, remoteStoreRepoPath)) + .put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true) + .build(); + return settings; + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/clone/TransportCloneSnapshotAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/clone/TransportCloneSnapshotAction.java index 54ef372609390..6da26d9a2da45 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/clone/TransportCloneSnapshotAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/clone/TransportCloneSnapshotAction.java @@ -95,7 +95,7 @@ protected void clusterManagerOperation( ClusterState state, ActionListener listener ) { - snapshotsService.cloneSnapshot(request, ActionListener.map(listener, v -> new AcknowledgedResponse(true))); + snapshotsService.executeClone(request, ActionListener.map(listener, v -> new AcknowledgedResponse(true))); } @Override diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java index dc5fe59911e90..5307902692e50 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java @@ -814,6 +814,34 @@ public void onFailure(Exception e) { ); } + private void cloneSnapshotPinnedTimestamp( + RepositoryData repositoryData, + SnapshotId sourceSnapshot, + Snapshot snapshot, + long timestampToPin, + ActionListener listener + ) { + remoteStorePinnedTimestampService.cloneTimestamp( + timestampToPin, + snapshot.getRepository() + SNAPSHOT_PINNED_TIMESTAMP_DELIMITER + sourceSnapshot.getUUID(), + snapshot.getRepository() + SNAPSHOT_PINNED_TIMESTAMP_DELIMITER + snapshot.getSnapshotId().getUUID(), + new ActionListener() { + @Override + public void onResponse(Void unused) { + logger.debug("Timestamp pinned successfully for clone snapshot {}", snapshot.getSnapshotId().getName()); + listener.onResponse(repositoryData); + } + + @Override + public void onFailure(Exception e) { + logger.error("Failed to pin timestamp for clone snapshot {} with exception {}", snapshot.getSnapshotId().getName(), e); + listener.onFailure(e); + + } + } + ); + } + private static void ensureSnapshotNameNotRunning( List runningSnapshots, String repositoryName, @@ -835,9 +863,14 @@ private static Map getInFlightIndexIds(List listener) { + /** + * This method does some pre-validation, checks for the presence of source snapshot in repository data. + * For shallow snapshot v2 clone, it checks the pinned timestamp to be greater than zero in the source snapshot. + * + * @param request snapshot request + * @param listener snapshot completion listener + */ + public void executeClone(CloneSnapshotRequest request, ActionListener listener) { final String repositoryName = request.repository(); Repository repository = repositoriesService.repository(repositoryName); if (repository.isReadOnly()) { @@ -846,10 +879,230 @@ public void cloneSnapshot(CloneSnapshotRequest request, ActionListener lis } final String snapshotName = indexNameExpressionResolver.resolveDateMathExpression(request.target()); validate(repositoryName, snapshotName); - // TODO: create snapshot UUID in CloneSnapshotRequest and make this operation idempotent to cleanly deal with transport layer - // retries final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); final Snapshot snapshot = new Snapshot(repositoryName, snapshotId); + try { + final StepListener repositoryDataListener = new StepListener<>(); + repositoriesService.getRepositoryData(repositoryName, repositoryDataListener); + repositoryDataListener.whenComplete(repositoryData -> { + final SnapshotId sourceSnapshotId = repositoryData.getSnapshotIds() + .stream() + .filter(src -> src.getName().equals(request.source())) + .findAny() + .orElseThrow(() -> new SnapshotMissingException(repositoryName, request.source())); + final StepListener snapshotInfoListener = new StepListener<>(); + final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); + + executor.execute(ActionRunnable.supply(snapshotInfoListener, () -> repository.getSnapshotInfo(sourceSnapshotId))); + + snapshotInfoListener.whenComplete(sourceSnapshotInfo -> { + if (sourceSnapshotInfo.getPinnedTimestamp() > 0) { + if (hasWildCardPatterForCloneSnapshotV2(request.indices()) == false) { + throw new SnapshotException( + repositoryName, + snapshotName, + "Aborting clone for Snapshot-v2, only wildcard pattern '*' is supported for indices" + ); + } + cloneSnapshotV2(request, snapshot, repositoryName, repository, listener); + } else { + cloneSnapshot(request, snapshot, repositoryName, repository, listener); + } + }, e -> listener.onFailure(e)); + }, e -> listener.onFailure(e)); + + } catch (Exception e) { + assert false : new AssertionError(e); + logger.error("Snapshot {} clone failed with exception {}", snapshot.getSnapshotId().getName(), e); + listener.onFailure(e); + } + } + + /** + * This method is responsible for creating a clone of the shallow snapshot v2. + * It pins the same timestamp that is pinned by the source snapshot. + * + * Unlike traditional snapshot operations, this method performs a synchronous clone execution and doesn't + * upload any shard metadata to the snapshot repository. + * The pinned timestamp is later reconciled with remote store segment and translog metadata files during the restore + * operation. + * + * @param request snapshot request + * @param snapshot clone snapshot + * @param repositoryName snapshot repository name + * @param repository snapshot repository + * @param listener completion listener + */ + public void cloneSnapshotV2( + CloneSnapshotRequest request, + Snapshot snapshot, + String repositoryName, + Repository repository, + ActionListener listener + ) { + + long startTime = System.currentTimeMillis(); + ClusterState currentState = clusterService.state(); + String snapshotName = snapshot.getSnapshotId().getName(); + repository.executeConsistentStateUpdate(repositoryData -> new ClusterStateUpdateTask(Priority.URGENT) { + private SnapshotsInProgress.Entry newEntry; + private SnapshotId sourceSnapshotId; + private List indicesForSnapshot; + + @Override + public ClusterState execute(ClusterState currentState) { + createSnapshotPreValidations(currentState, repositoryData, repositoryName, snapshotName); + final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); + final List runningSnapshots = snapshots.entries(); + + sourceSnapshotId = repositoryData.getSnapshotIds() + .stream() + .filter(src -> src.getName().equals(request.source())) + .findAny() + .orElseThrow(() -> new SnapshotMissingException(repositoryName, request.source())); + + final SnapshotDeletionsInProgress deletionsInProgress = currentState.custom( + SnapshotDeletionsInProgress.TYPE, + SnapshotDeletionsInProgress.EMPTY + ); + if (deletionsInProgress.getEntries().stream().anyMatch(entry -> entry.getSnapshots().contains(sourceSnapshotId))) { + throw new ConcurrentSnapshotExecutionException( + repositoryName, + sourceSnapshotId.getName(), + "cannot clone from snapshot that is being deleted" + ); + } + indicesForSnapshot = new ArrayList<>(); + for (IndexId indexId : repositoryData.getIndices().values()) { + if (repositoryData.getSnapshots(indexId).contains(sourceSnapshotId)) { + indicesForSnapshot.add(indexId.getName()); + } + } + + newEntry = SnapshotsInProgress.startClone( + snapshot, + sourceSnapshotId, + repositoryData.resolveIndices(indicesForSnapshot), + threadPool.absoluteTimeInMillis(), + repositoryData.getGenId(), + minCompatibleVersion(currentState.nodes().getMinNodeVersion(), repositoryData, null) + ); + final List newEntries = new ArrayList<>(runningSnapshots); + newEntries.add(newEntry); + return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(newEntries)).build(); + } + + @Override + public void onFailure(String source, Exception e) { + logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to clone snapshot-v2", repositoryName, snapshotName), e); + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) { + logger.info("snapshot-v2 clone [{}] started", snapshot); + final StepListener snapshotInfoListener = new StepListener<>(); + final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); + + executor.execute(ActionRunnable.supply(snapshotInfoListener, () -> repository.getSnapshotInfo(sourceSnapshotId))); + final ShardGenerations shardGenerations = repositoryData.shardGenerations(); + + snapshotInfoListener.whenComplete(snapshotInfo -> { + final SnapshotInfo cloneSnapshotInfo = new SnapshotInfo( + snapshot.getSnapshotId(), + indicesForSnapshot, + newEntry.dataStreams(), + startTime, + null, + System.currentTimeMillis(), + snapshotInfo.totalShards(), + Collections.emptyList(), + newEntry.includeGlobalState(), + newEntry.userMetadata(), + true, + snapshotInfo.getPinnedTimestamp() + ); + if (!clusterService.state().nodes().isLocalNodeElectedClusterManager()) { + throw new SnapshotException(repositoryName, snapshotName, "Aborting snapshot-v2 clone, no longer cluster manager"); + } + final StepListener pinnedTimestampListener = new StepListener<>(); + pinnedTimestampListener.whenComplete(repoData -> { + logger.info("snapshot-v2 clone [{}] completed successfully", snapshot); + listener.onResponse(null); + }, listener::onFailure); + repository.finalizeSnapshot( + shardGenerations, + repositoryData.getGenId(), + metadataForSnapshot( + currentState.metadata(), + newEntry.includeGlobalState(), + false, + newEntry.dataStreams(), + newEntry.indices() + ), + cloneSnapshotInfo, + repositoryData.getVersion(sourceSnapshotId), + state -> stateWithoutSnapshot(state, snapshot), + Priority.IMMEDIATE, + new ActionListener() { + @Override + public void onResponse(RepositoryData repositoryData) { + if (!clusterService.state().nodes().isLocalNodeElectedClusterManager()) { + failSnapshotCompletionListeners( + snapshot, + new SnapshotException(snapshot, "Aborting Snapshot-v2 clone, no longer cluster manager") + ); + listener.onFailure( + new SnapshotException( + repositoryName, + snapshotName, + "Aborting Snapshot-v2 clone, no longer cluster manager" + ) + ); + return; + } + cloneSnapshotPinnedTimestamp( + repositoryData, + sourceSnapshotId, + snapshot, + snapshotInfo.getPinnedTimestamp(), + pinnedTimestampListener + ); + } + + @Override + public void onFailure(Exception e) { + logger.error( + "Failed to upload files to snapshot repo {} for clone snapshot-v2 {} ", + repositoryName, + snapshotName + ); + listener.onFailure(e); + } + } + ); + + }, listener::onFailure); + } + + @Override + public TimeValue timeout() { + return request.clusterManagerNodeTimeout(); + } + }, "clone_snapshot_v2 [" + request.source() + "][" + snapshotName + ']', listener::onFailure); + } + + // TODO: It is worth revisiting the design choice of creating a placeholder entry in snapshots-in-progress here once we have a cache + // for repository metadata and loading it has predictable performance + public void cloneSnapshot( + CloneSnapshotRequest request, + Snapshot snapshot, + String repositoryName, + Repository repository, + ActionListener listener + ) { + String snapshotName = snapshot.getSnapshotId().getName(); + initializingClones.add(snapshot); repository.executeConsistentStateUpdate(repositoryData -> new ClusterStateUpdateTask() { @@ -4091,6 +4344,15 @@ private void startExecutableClones(SnapshotsInProgress snapshotsInProgress, @Nul } } + private boolean hasWildCardPatterForCloneSnapshotV2(String[] indices) { + for (String index : indices) { + if ("*".equals(index)) { + return true; + } + } + return false; + } + private class UpdateSnapshotStatusAction extends TransportClusterManagerNodeAction< UpdateIndexShardSnapshotStatusRequest, UpdateIndexShardSnapshotStatusResponse> {