From 6045092c02550baa4764be9c477f1cc86512b94e Mon Sep 17 00:00:00 2001 From: panguixin Date: Mon, 15 Apr 2024 21:02:28 +0800 Subject: [PATCH] fix snapshot status Signed-off-by: panguixin --- .../snapshots/SearchableSnapshotIT.java | 55 ++++++++----- .../blobstore/BlobStoreRepository.java | 9 ++- .../snapshots/SnapshotShardsService.java | 80 +++++++++---------- .../blobstore/BlobStoreTestUtil.java | 30 +++---- 4 files changed, 92 insertions(+), 82 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java index 90bb2b501764e..c696ae82eb3e4 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java @@ -17,6 +17,7 @@ import org.opensearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest; import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse; import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest; +import org.opensearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse; import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest; import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse; import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequestBuilder; @@ -52,11 +53,13 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -127,21 +130,24 @@ public void testCreateSearchableSnapshot() throws Exception { public void testSnapshottingSearchableSnapshots() throws Exception { final String repoName = "test-repo"; + final String initSnapName = "initial-snapshot"; final String indexName = "test-idx"; + final String repeatSnapNamePrefix = "test-repeated-snap-"; + final String repeatIndexNamePrefix = indexName + "-copy-"; final Client client = client(); // create an index, add data, snapshot it, then delete it internalCluster().ensureAtLeastNumDataNodes(1); createIndexWithDocsAndEnsureGreen(0, 100, indexName); createRepositoryWithSettings(null, repoName); - takeSnapshot(client, "initial-snapshot", repoName, indexName); + takeSnapshot(client, initSnapName, repoName, indexName); deleteIndicesAndEnsureGreen(client, indexName); // restore the index as a searchable snapshot internalCluster().ensureAtLeastNumSearchNodes(1); client.admin() .cluster() - .prepareRestoreSnapshot(repoName, "initial-snapshot") + .prepareRestoreSnapshot(repoName, initSnapName) .setRenamePattern("(.+)") .setRenameReplacement("$1-copy-0") .setStorageType(RestoreSnapshotRequest.StorageType.REMOTE_SNAPSHOT) @@ -154,7 +160,7 @@ public void testSnapshottingSearchableSnapshots() throws Exception { // Test that the searchable snapshot index can continue to be snapshotted and restored for (int i = 0; i < 4; i++) { - final String repeatedSnapshotName = "test-repeated-snap-" + i; + final String repeatedSnapshotName = repeatSnapNamePrefix + i; takeSnapshot(client, repeatedSnapshotName, repoName); deleteIndicesAndEnsureGreen(client, "_all"); client.admin() @@ -176,21 +182,34 @@ public void testSnapshottingSearchableSnapshots() throws Exception { final Map> snapshotInfoMap = response.getSnapshots() .stream() .collect(Collectors.toMap(s -> s.snapshotId().getName(), SnapshotInfo::indices)); - assertEquals( - Map.of( - "initial-snapshot", - List.of("test-idx"), - "test-repeated-snap-0", - List.of("test-idx-copy-0"), - "test-repeated-snap-1", - List.of("test-idx-copy-1"), - "test-repeated-snap-2", - List.of("test-idx-copy-2"), - "test-repeated-snap-3", - List.of("test-idx-copy-3") - ), - snapshotInfoMap - ); + final Map> expect = new HashMap<>(); + expect.put(initSnapName, List.of(indexName)); + IntStream.range(0, 4).forEach(i -> expect.put(repeatSnapNamePrefix + i, List.of(repeatIndexNamePrefix + i))); + assertEquals(expect, snapshotInfoMap); + + String[] snapNames = new String[5]; + IntStream.range(0, 4).forEach(i -> snapNames[i] = repeatSnapNamePrefix + i); + snapNames[4] = initSnapName; + SnapshotsStatusResponse snapshotsStatusResponse = client.admin() + .cluster() + .prepareSnapshotStatus(repoName) + .addSnapshots(snapNames) + .execute() + .actionGet(); + snapshotsStatusResponse.getSnapshots().forEach(s -> { + String snapName = s.getSnapshot().getSnapshotId().getName(); + assertEquals(1, s.getIndices().size()); + assertEquals(1, s.getShards().size()); + if (snapName.equals("initial-snapshot")) { + assertNotNull(s.getIndices().get("test-idx")); + assertTrue(s.getShards().get(0).getStats().getTotalFileCount() > 0); + } else { + assertTrue(snapName.startsWith(repeatSnapNamePrefix)); + assertEquals(1, s.getIndices().size()); + assertNotNull(s.getIndices().get(repeatIndexNamePrefix + snapName.substring(repeatSnapNamePrefix.length()))); + assertEquals(0L, s.getShards().get(0).getStats().getTotalFileCount()); + } + }); } /** diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 1a5701d9204ef..2c8f2b6d0b6d4 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -2787,9 +2787,12 @@ public void snapshotShard( long indexIncrementalSize = 0; long indexTotalFileSize = 0; final BlockingQueue filesToSnapshot = new LinkedBlockingQueue<>(); - // If we did not find a set of files that is equal to the current commit we determine the files to upload by comparing files - // in the commit with files already in the repository - if (filesFromSegmentInfos == null) { + if (store.indexSettings().isRemoteSnapshot()) { + // If the source of the data is another remote snapshot (i.e. searchable snapshot) then no need to snapshot the shard + indexCommitPointFiles = List.of(); + } else if (filesFromSegmentInfos == null) { + // If we did not find a set of files that is equal to the current commit we determine the files to upload by comparing files + // in the commit with files already in the repository indexCommitPointFiles = new ArrayList<>(); final Collection fileNames; final Store.MetadataSnapshot metadataFromStore; diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java index 89f1ea142336e..8da36bbb8d4bd 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java @@ -276,53 +276,47 @@ private void startNewShards(SnapshotsInProgress.Entry entry, Map() { - @Override - public void onResponse(String newGeneration) { - assert newGeneration != null; - assert newGeneration.equals(snapshotStatus.generation()); - if (logger.isDebugEnabled()) { - final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy(); - logger.debug( - "snapshot [{}] completed to [{}] with [{}] at generation [{}]", - snapshot, - snapshot.getRepository(), - lastSnapshotStatus, - snapshotStatus.generation() - ); - } - notifySuccessfulSnapshotShard(snapshot, shardId, newGeneration); + snapshot( + shardId, + snapshot, + indexId, + entry.userMetadata(), + snapshotStatus, + entry.version(), + entry.remoteStoreIndexShallowCopy(), + new ActionListener<>() { + @Override + public void onResponse(String newGeneration) { + assert newGeneration != null; + assert newGeneration.equals(snapshotStatus.generation()); + if (logger.isDebugEnabled()) { + final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy(); + logger.debug( + "snapshot [{}] completed to [{}] with [{}] at generation [{}]", + snapshot, + snapshot.getRepository(), + lastSnapshotStatus, + snapshotStatus.generation() + ); } + notifySuccessfulSnapshotShard(snapshot, shardId, newGeneration); + } - @Override - public void onFailure(Exception e) { - final String failure; - if (e instanceof AbortedSnapshotException) { - failure = "aborted"; - logger.debug(() -> new ParameterizedMessage("[{}][{}] aborted shard snapshot", shardId, snapshot), e); - } else { - failure = summarizeFailure(e); - logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e); - } - snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), failure); - notifyFailedSnapshotShard(snapshot, shardId, failure); + @Override + public void onFailure(Exception e) { + final String failure; + if (e instanceof AbortedSnapshotException) { + failure = "aborted"; + logger.debug(() -> new ParameterizedMessage("[{}][{}] aborted shard snapshot", shardId, snapshot), e); + } else { + failure = summarizeFailure(e); + logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e); } + snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), failure); + notifyFailedSnapshotShard(snapshot, shardId, failure); } - ); - } + } + ); } }); } diff --git a/test/framework/src/main/java/org/opensearch/repositories/blobstore/BlobStoreTestUtil.java b/test/framework/src/main/java/org/opensearch/repositories/blobstore/BlobStoreTestUtil.java index f55eb72b7aa28..21342f1c92477 100644 --- a/test/framework/src/main/java/org/opensearch/repositories/blobstore/BlobStoreTestUtil.java +++ b/test/framework/src/main/java/org/opensearch/repositories/blobstore/BlobStoreTestUtil.java @@ -143,7 +143,7 @@ public static void assertConsistency(BlobStoreRepository repository, Executor ex } assertIndexUUIDs(repository, repositoryData); assertSnapshotUUIDs(repository, repositoryData); - assertShardIndexGenerations(repository, blobContainer, repositoryData); + assertShardIndexGenerations(blobContainer, repositoryData); return null; } catch (AssertionError e) { return e; @@ -167,7 +167,7 @@ private static void assertIndexGenerations(BlobContainer repoRoot, long latestGe assertTrue(indexGenerations.length <= 2); } - private static void assertShardIndexGenerations(BlobStoreRepository repository, BlobContainer repoRoot, RepositoryData repositoryData) + private static void assertShardIndexGenerations(BlobContainer repoRoot, RepositoryData repositoryData) throws IOException { final ShardGenerations shardGenerations = repositoryData.shardGenerations(); final BlobContainer indicesContainer = repoRoot.children().get("indices"); @@ -176,22 +176,16 @@ private static void assertShardIndexGenerations(BlobStoreRepository repository, if (gens.isEmpty() == false) { final BlobContainer indexContainer = indicesContainer.children().get(index.getId()); final Map shardContainers = indexContainer.children(); - if (isRemoteSnapshot(repository, repositoryData, index)) { - // If the source of the data is another snapshot (i.e. searchable snapshot) - // then assert that there is no shard data (because it exists in the source snapshot) - assertThat(shardContainers, anEmptyMap()); - } else { - for (int i = 0; i < gens.size(); i++) { - final String generation = gens.get(i); - assertThat(generation, not(ShardGenerations.DELETED_SHARD_GEN)); - if (generation != null && generation.equals(ShardGenerations.NEW_SHARD_GEN) == false) { - final String shardId = Integer.toString(i); - assertThat(shardContainers, hasKey(shardId)); - assertThat( - shardContainers.get(shardId).listBlobsByPrefix(BlobStoreRepository.INDEX_FILE_PREFIX), - hasKey(BlobStoreRepository.INDEX_FILE_PREFIX + generation) - ); - } + for (int i = 0; i < gens.size(); i++) { + final String generation = gens.get(i); + assertThat(generation, not(ShardGenerations.DELETED_SHARD_GEN)); + if (generation != null && generation.equals(ShardGenerations.NEW_SHARD_GEN) == false) { + final String shardId = Integer.toString(i); + assertThat(shardContainers, hasKey(shardId)); + assertThat( + shardContainers.get(shardId).listBlobsByPrefix(BlobStoreRepository.INDEX_FILE_PREFIX), + hasKey(BlobStoreRepository.INDEX_FILE_PREFIX + generation) + ); } } }