From 2454603231f72a28a1454c5fe41ccef5c6f5b560 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Thu, 12 Sep 2024 18:15:03 +0530 Subject: [PATCH] Pass pinning entity and timestamp while deleting snapshot of deleted index Signed-off-by: Sachin Kale --- .../snapshots/DeleteSnapshotV2IT.java | 182 ++++++++++++++++-- .../RemoteFsTimestampAwareTranslog.java | 110 ++++++----- .../RemoteStorePinnedTimestampService.java | 55 ++++-- .../blobstore/BlobStoreRepository.java | 50 ++++- .../snapshots/SnapshotsService.java | 10 +- 5 files changed, 317 insertions(+), 90 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotV2IT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotV2IT.java index 1d7a58384c0be..7174f68baca6c 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotV2IT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotV2IT.java @@ -23,6 +23,7 @@ import org.opensearch.repositories.fs.FsRepository; import org.opensearch.test.OpenSearchIntegTestCase; +import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.util.concurrent.TimeUnit; @@ -206,7 +207,6 @@ public void testDeleteShallowCopyV2MultipleSnapshots() throws Exception { } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/15692") public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exception { disableRepoConsistencyCheck("Remote store repository is being used in the test"); final Path remoteStoreRepoPath = randomRepoPath(); @@ -242,12 +242,6 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exceptio .get() .getSetting(remoteStoreEnabledIndexName, IndexMetadata.SETTING_INDEX_UUID); - String numShards = client().admin() - .indices() - .prepareGetSettings(remoteStoreEnabledIndexName) - .get() - .getSetting(remoteStoreEnabledIndexName, IndexMetadata.SETTING_NUMBER_OF_SHARDS); - logger.info("--> create two remote index shallow snapshots"); CreateSnapshotResponse createSnapshotResponse = client().admin() .cluster() @@ -269,6 +263,14 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exceptio assertThat(snapshotInfo2.successfulShards(), equalTo(snapshotInfo2.totalShards())); assertThat(snapshotInfo2.snapshotId().getName(), equalTo("snap2")); + remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1)); + long currentTime = System.currentTimeMillis(); + long maxWaitRetry = 10; + while (maxWaitRetry >= 0 && RemoteStorePinnedTimestampService.getPinnedTimestamps().v1() <= currentTime) { + Thread.sleep(1000); + maxWaitRetry -= 1; + } + // delete remote store index assertAcked(client().admin().indices().prepareDelete(remoteStoreEnabledIndexName)); @@ -291,14 +293,13 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exceptio .get(); assertAcked(deleteSnapshotResponse); - Thread.sleep(5000); - assertBusy(() -> { try { assertThat(RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath), lessThan(segmentFilesCountBeforeDeletingSnapshot1)); - } catch (Exception e) {} + } catch (NoSuchFileException e) { + fail(); + } }, 30, TimeUnit.SECONDS); - int segmentFilesCountAfterDeletingSnapshot1 = RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath); logger.info("--> delete snapshot 1"); RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); @@ -312,18 +313,169 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exceptio // Delete is async. Give time for it assertBusy(() -> { try { - assertThat(RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath), lessThan(segmentFilesCountAfterDeletingSnapshot1)); - } catch (Exception e) {} + assertEquals(0, RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath)); + } catch (NoSuchFileException e) { + fail(); + } }, 60, TimeUnit.SECONDS); assertBusy(() -> { try { - assertThat(RemoteStoreBaseIntegTestCase.getFileCount(translogPath), lessThan(translogFilesCountBeforeDeletingSnapshot1)); - } catch (Exception e) {} + assertEquals(0, RemoteStoreBaseIntegTestCase.getFileCount(translogPath)); + } catch (NoSuchFileException e) { + fail(); + } }, 60, TimeUnit.SECONDS); } + public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2SingleSnapshot() throws Exception { + disableRepoConsistencyCheck("Remote store repository is being used in the test"); + final Path remoteStoreRepoPath = randomRepoPath(); + Settings settings = remoteStoreClusterSettings(REMOTE_REPO_NAME, remoteStoreRepoPath); + settings = Settings.builder() + .put(settings) + .put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true) + .put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), RemoteStoreEnums.PathType.FIXED.toString()) + .build(); + String clusterManagerName = internalCluster().startClusterManagerOnlyNode(settings); + internalCluster().startDataOnlyNode(settings); + final Client clusterManagerClient = internalCluster().clusterManagerClient(); + ensureStableCluster(2); + + RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance( + RemoteStorePinnedTimestampService.class, + clusterManagerName + ); + + final String snapshotRepoName = "snapshot-repo-name"; + final Path snapshotRepoPath = randomRepoPath(); + createRepository(snapshotRepoName, "mock", snapshotRepoSettingsForShallowV2(snapshotRepoPath)); + + final String remoteStoreEnabledIndexName = "remote-index-1"; + final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings(); + createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings); + indexRandomDocs(remoteStoreEnabledIndexName, 25); + + String indexUUID = client().admin() + .indices() + .prepareGetSettings(remoteStoreEnabledIndexName) + .get() + .getSetting(remoteStoreEnabledIndexName, IndexMetadata.SETTING_INDEX_UUID); + + logger.info("--> create two remote index shallow snapshots"); + CreateSnapshotResponse createSnapshotResponse = client().admin() + .cluster() + .prepareCreateSnapshot(snapshotRepoName, "snap1") + .setWaitForCompletion(true) + .get(); + SnapshotInfo snapshotInfo1 = createSnapshotResponse.getSnapshotInfo(); + + Path indexPath = Path.of(String.valueOf(remoteStoreRepoPath), indexUUID); + Path shardPath = Path.of(String.valueOf(indexPath), "0"); + + // delete remote store index + assertAcked(client().admin().indices().prepareDelete(remoteStoreEnabledIndexName)); + + logger.info("--> delete snapshot 1"); + + Path segmentsPath = Path.of(String.valueOf(shardPath), "segments"); + Path translogPath = Path.of(String.valueOf(shardPath), "translog"); + + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + + remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1)); + long currentTime = System.currentTimeMillis(); + long maxWaitRetry = 10; + while (maxWaitRetry >= 0 && RemoteStorePinnedTimestampService.getPinnedTimestamps().v1() <= currentTime) { + Thread.sleep(1000); + maxWaitRetry -= 1; + } + + AcknowledgedResponse deleteSnapshotResponse = clusterManagerClient.admin() + .cluster() + .prepareDeleteSnapshot(snapshotRepoName, snapshotInfo1.snapshotId().getName()) + .get(); + assertAcked(deleteSnapshotResponse); + + // Delete is async. Give time for it + // assertBusy(() -> { + // try { + // assertEquals(0, RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath)); + // } catch (NoSuchFileException e) { + // fail(); + // } + // }, 60, TimeUnit.SECONDS); + + assertBusy(() -> { + try { + assertEquals(0, RemoteStoreBaseIntegTestCase.getFileCount(translogPath)); + } catch (NoSuchFileException e) { + fail(); + } + }, 60, TimeUnit.SECONDS); + } + + public void testRemoteStoreCleanupForDeletedIndexWithoutAnySnapshot() throws Exception { + disableRepoConsistencyCheck("Remote store repository is being used in the test"); + final Path remoteStoreRepoPath = randomRepoPath(); + Settings settings = remoteStoreClusterSettings(REMOTE_REPO_NAME, remoteStoreRepoPath); + settings = Settings.builder() + .put(settings) + .put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true) + .put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), RemoteStoreEnums.PathType.FIXED.toString()) + .build(); + String clusterManagerName = internalCluster().startClusterManagerOnlyNode(settings); + internalCluster().startDataOnlyNode(settings); + final Client clusterManagerClient = internalCluster().clusterManagerClient(); + ensureStableCluster(2); + + RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance( + RemoteStorePinnedTimestampService.class, + clusterManagerName + ); + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1)); + + final String remoteStoreEnabledIndexName = "remote-index-1"; + final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings(); + createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings); + indexRandomDocs(remoteStoreEnabledIndexName, 5); + + String indexUUID = client().admin() + .indices() + .prepareGetSettings(remoteStoreEnabledIndexName) + .get() + .getSetting(remoteStoreEnabledIndexName, IndexMetadata.SETTING_INDEX_UUID); + + remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1)); + long currentTime = System.currentTimeMillis(); + long maxWaitRetry = 10; + while (maxWaitRetry >= 0 && RemoteStorePinnedTimestampService.getPinnedTimestamps().v1() <= currentTime) { + Thread.sleep(1000); + maxWaitRetry -= 1; + } + // delete remote store index + assertAcked(client().admin().indices().prepareDelete(remoteStoreEnabledIndexName)); + + Path indexPath = Path.of(String.valueOf(remoteStoreRepoPath), indexUUID); + Path shardPath = Path.of(String.valueOf(indexPath), "0"); + Path segmentsPath = Path.of(String.valueOf(shardPath), "segments"); + Path translogPath = Path.of(String.valueOf(shardPath), "translog"); + + // Get total segments remote store directory file count for deleted index and shard 0 + assertBusy(() -> { + try { + assertEquals(0, RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath)); + } catch (NoSuchFileException e) { + // While files are getting deleted, we encounter NoSuchFileException in RemoteStoreBaseIntegTestCase.getFileCount + // Failing the assertion for assertBusy to try again + fail(); + } + }); + assertBusy(() -> { assertEquals(0, RemoteStoreBaseIntegTestCase.getFileCount(translogPath)); }); + } + private Settings snapshotV2Settings(Path remoteStoreRepoPath) { Settings settings = Settings.builder() .put(remoteStoreClusterSettings(REMOTE_REPO_NAME, remoteStoreRepoPath)) diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java index 98b821126502b..c4af01572e007 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java @@ -266,7 +266,7 @@ protected Set getGenerationsToBeDeleted(List metadataFilesNotToBeD } protected List getMetadataFilesToBeDeleted(List metadataFiles) { - return getMetadataFilesToBeDeleted(metadataFiles, metadataFilePinnedTimestampMap, minRemoteGenReferenced, logger); + return getMetadataFilesToBeDeleted(metadataFiles, metadataFilePinnedTimestampMap, minRemoteGenReferenced, null, null, logger); } // Visible for testing @@ -274,9 +274,14 @@ protected static List getMetadataFilesToBeDeleted( List metadataFiles, Map metadataFilePinnedTimestampMap, long minRemoteGenReferenced, + String pinningEntityToSkip, + Long pinnedTimestampToSkip, Logger logger ) { - Tuple> pinnedTimestampsState = RemoteStorePinnedTimestampService.getPinnedTimestamps(); + Tuple> pinnedTimestampsState = RemoteStorePinnedTimestampService.getPinnedTimestamps( + pinningEntityToSkip, + pinnedTimestampToSkip + ); // Keep files since last successful run of scheduler List metadataFilesToBeDeleted = RemoteStoreUtils.filterOutMetadataFilesBasedOnAge( @@ -483,55 +488,66 @@ protected static Tuple getMinMaxPrimaryTermFromMetadataFile( } } - public static void cleanup(TranslogTransferManager translogTransferManager) throws IOException { - ActionListener> listMetadataFilesListener = new ActionListener<>() { - @Override - public void onResponse(List blobMetadata) { - List metadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList()); + public static void cleanup( + TranslogTransferManager translogTransferManager, + boolean forceClean, + String pinningEntity, + Long pinnedTimestamp + ) throws IOException { + if (forceClean) { + translogTransferManager.delete(); + } else { + ActionListener> listMetadataFilesListener = new ActionListener<>() { + @Override + public void onResponse(List blobMetadata) { + List metadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList()); + + try { + if (metadataFiles.isEmpty()) { + staticLogger.debug("No stale translog metadata files found"); + return; + } + List metadataFilesToBeDeleted = getMetadataFilesToBeDeleted( + metadataFiles, + new HashMap<>(), + Long.MAX_VALUE, + pinningEntity, + pinnedTimestamp, + staticLogger + ); + if (metadataFilesToBeDeleted.isEmpty()) { + staticLogger.debug("No metadata files to delete"); + return; + } + staticLogger.debug(() -> "metadataFilesToBeDeleted = " + metadataFilesToBeDeleted); - try { - if (metadataFiles.isEmpty()) { - staticLogger.debug("No stale translog metadata files found"); - return; - } - List metadataFilesToBeDeleted = getMetadataFilesToBeDeleted( - metadataFiles, - new HashMap<>(), - Long.MAX_VALUE, - staticLogger - ); - if (metadataFilesToBeDeleted.isEmpty()) { - staticLogger.debug("No metadata files to delete"); - return; + // For all the files that we are keeping, fetch min and max generations + List metadataFilesNotToBeDeleted = new ArrayList<>(metadataFiles); + metadataFilesNotToBeDeleted.removeAll(metadataFilesToBeDeleted); + staticLogger.debug(() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted); + + // Delete stale metadata files + translogTransferManager.deleteMetadataFilesAsync(metadataFilesToBeDeleted, () -> {}); + + // Delete stale primary terms + deleteStaleRemotePrimaryTerms( + metadataFilesNotToBeDeleted, + translogTransferManager, + new HashMap<>(), + new AtomicLong(Long.MAX_VALUE), + staticLogger + ); + } catch (Exception e) { + staticLogger.error("Exception while cleaning up metadata and primary terms", e); } - staticLogger.debug(() -> "metadataFilesToBeDeleted = " + metadataFilesToBeDeleted); + } - // For all the files that we are keeping, fetch min and max generations - List metadataFilesNotToBeDeleted = new ArrayList<>(metadataFiles); - metadataFilesNotToBeDeleted.removeAll(metadataFilesToBeDeleted); - staticLogger.debug(() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted); - - // Delete stale metadata files - translogTransferManager.deleteMetadataFilesAsync(metadataFilesToBeDeleted, () -> {}); - - // Delete stale primary terms - deleteStaleRemotePrimaryTerms( - metadataFilesNotToBeDeleted, - translogTransferManager, - new HashMap<>(), - new AtomicLong(Long.MAX_VALUE), - staticLogger - ); - } catch (Exception e) { + @Override + public void onFailure(Exception e) { staticLogger.error("Exception while cleaning up metadata and primary terms", e); } - } - - @Override - public void onFailure(Exception e) { - staticLogger.error("Exception while cleaning up metadata and primary terms", e); - } - }; - translogTransferManager.listTranslogMetadataFilesAsync(listMetadataFilesListener); + }; + translogTransferManager.listTranslogMetadataFilesAsync(listMetadataFilesListener); + } } } diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java index acb6afe71b067..6080c43e021c6 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java @@ -37,7 +37,6 @@ import java.util.Map; import java.util.Set; import java.util.function.Supplier; -import java.util.stream.Collectors; /** * Service for managing pinned timestamps in a remote store. @@ -48,7 +47,8 @@ @ExperimentalApi public class RemoteStorePinnedTimestampService implements Closeable { private static final Logger logger = LogManager.getLogger(RemoteStorePinnedTimestampService.class); - private static Tuple> pinnedTimestampsSet = new Tuple<>(-1L, Set.of()); + private static Tuple>> pinningEntityTimestampMap = new Tuple<>(-1L, Map.of()); + public static final String PINNED_TIMESTAMPS_PATH_TOKEN = "pinned_timestamps"; public static final String PINNED_TIMESTAMPS_FILENAME_SEPARATOR = "__"; @@ -199,21 +199,23 @@ public void cloneTimestamp(long timestamp, String existingPinningEntity, String } } - private String getBlobName(long timestamp, String pinningEntity) { + public static String getBlobName(long timestamp, String pinningEntity) { return String.join(PINNED_TIMESTAMPS_FILENAME_SEPARATOR, pinningEntity, String.valueOf(timestamp)); } - private long getTimestampFromBlobName(String blobName) { + public static Tuple getPinningEntityTimestampFromBlobName(String blobName) { String[] blobNameTokens = blobName.split(PINNED_TIMESTAMPS_FILENAME_SEPARATOR); if (blobNameTokens.length < 2) { logger.error("Pinned timestamps blob name contains invalid format: {}", blobName); } try { - return Long.parseLong(blobNameTokens[blobNameTokens.length - 1]); + String pinningEntity = blobName.substring(blobName.lastIndexOf(PINNED_TIMESTAMPS_FILENAME_SEPARATOR)); + Long timestamp = Long.parseLong(blobNameTokens[blobNameTokens.length - 1]); + return new Tuple<>(pinningEntity, timestamp); } catch (NumberFormatException e) { logger.error(() -> new ParameterizedMessage("Pinned timestamps blob name contains invalid format: {}", blobName), e); } - return -1; + return null; } /** @@ -248,14 +250,32 @@ public void close() throws IOException { // Used in integ tests public void rescheduleAsyncUpdatePinnedTimestampTask(TimeValue pinnedTimestampsSchedulerInterval) { if (pinnedTimestampsSchedulerInterval != null) { - pinnedTimestampsSet = new Tuple<>(-1L, Set.of()); + pinningEntityTimestampMap = new Tuple<>(-1L, Map.of()); asyncUpdatePinnedTimestampTask.close(); startAsyncUpdateTask(pinnedTimestampsSchedulerInterval); } } public static Tuple> getPinnedTimestamps() { - return pinnedTimestampsSet; + return getPinnedTimestamps(null, null); + } + + public static Tuple> getPinnedTimestamps(String pinningEntityToSkip, Long pinnedTimestampToSkip) { + Set allPinnedTimestamps = new HashSet<>(); + if (pinningEntityToSkip == null || pinnedTimestampToSkip == null) { + pinningEntityTimestampMap.v2().values().forEach(allPinnedTimestamps::addAll); + } else { + for (String pinningEntity : pinningEntityTimestampMap.v2().keySet()) { + if (pinningEntity.equals(pinningEntityToSkip)) { + Set timestamps = new HashSet<>(pinningEntityTimestampMap.v2().get(pinningEntity)); + timestamps.remove(pinnedTimestampToSkip); + allPinnedTimestamps.addAll(timestamps); + } else { + allPinnedTimestamps.addAll(pinningEntityTimestampMap.v2().get(pinningEntity)); + } + } + } + return new Tuple<>(pinningEntityTimestampMap.v1(), allPinnedTimestamps); } /** @@ -279,16 +299,21 @@ protected void runInternal() { Map pinnedTimestampList = blobContainer.listBlobs(); if (pinnedTimestampList.isEmpty()) { logger.debug("Fetched empty pinned timestamps from remote store: {}", triggerTimestamp); - pinnedTimestampsSet = new Tuple<>(triggerTimestamp, Set.of()); + pinningEntityTimestampMap = new Tuple<>(triggerTimestamp, Map.of()); return; } - Set pinnedTimestamps = pinnedTimestampList.keySet() - .stream() - .map(RemoteStorePinnedTimestampService.this::getTimestampFromBlobName) - .filter(timestamp -> timestamp != -1) - .collect(Collectors.toSet()); + Map> pinnedTimestamps = new HashMap<>(); + for (String blobName : pinnedTimestampList.keySet()) { + Tuple pinningEntityTimestamp = getPinningEntityTimestampFromBlobName(blobName); + if (pinningEntityTimestamp != null) { + if (pinnedTimestamps.containsKey(pinningEntityTimestamp.v1()) == false) { + pinnedTimestamps.put(pinningEntityTimestamp.v1(), new HashSet<>()); + } + pinnedTimestamps.get(pinningEntityTimestamp.v1()).add(pinningEntityTimestamp.v2()); + } + } logger.debug("Fetched pinned timestamps from remote store: {} - {}", triggerTimestamp, pinnedTimestamps); - pinnedTimestampsSet = new Tuple<>(triggerTimestamp, pinnedTimestamps); + pinningEntityTimestampMap = new Tuple<>(triggerTimestamp, pinnedTimestamps); } catch (Throwable t) { logger.error("Exception while fetching pinned timestamp details", t); } diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index b954560c1bc94..c68d69d4f7b4e 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -196,7 +196,6 @@ import static org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm.FNV_1A_COMPOSITE_1; import static org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName; import static org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat.SNAPSHOT_ONLY_FORMAT_PARAMS; -import static org.opensearch.snapshots.SnapshotsService.SNAPSHOT_PINNED_TIMESTAMP_DELIMITER; /** * BlobStore - based implementation of Snapshot Repository @@ -1264,7 +1263,8 @@ private void doDeleteShardSnapshots( snapshotIds, writeShardMetaDataAndComputeDeletesStep.result(), remoteSegmentStoreDirectoryFactory, - afterCleanupsListener + afterCleanupsListener, + snapshotIdPinnedTimestampMap ); } else { asyncCleanupUnlinkedShardLevelBlobs( @@ -1283,7 +1283,8 @@ private void cleanUpRemoteStoreFilesForDeletedIndicesV2( Collection snapshotIds, Collection result, RemoteSegmentStoreDirectoryFactory remoteSegmentStoreDirectoryFactory, - ActionListener afterCleanupsListener + ActionListener afterCleanupsListener, + Map snapshotIdPinnedTimestampMap ) { try { Set uniqueIndexIds = new HashSet<>(); @@ -1292,7 +1293,14 @@ private void cleanUpRemoteStoreFilesForDeletedIndicesV2( } // iterate through all the indices and trigger remote store directory cleanup for deleted index segments for (String indexId : uniqueIndexIds) { - cleanRemoteStoreDirectoryIfNeeded(snapshotIds, indexId, repositoryData, remoteSegmentStoreDirectoryFactory); + cleanRemoteStoreDirectoryIfNeeded( + snapshotIds, + indexId, + repositoryData, + remoteSegmentStoreDirectoryFactory, + snapshotIdPinnedTimestampMap, + false + ); } afterCleanupsListener.onResponse(null); } catch (Exception e) { @@ -1340,7 +1348,7 @@ private void removeSnapshotPinnedTimestamp( ) { remoteStorePinnedTimestampService.unpinTimestamp( timestampToUnpin, - repository + SNAPSHOT_PINNED_TIMESTAMP_DELIMITER + snapshotId.getUUID(), + SnapshotsService.getPinningEntity(repository, snapshotId.getUUID()), new ActionListener() { @Override public void onResponse(Void unused) { @@ -2078,7 +2086,14 @@ private void executeOneStaleIndexDelete( deleteResult = deleteResult.add(cleanUpStaleSnapshotShardPathsFile(matchingShardPaths, snapshotShardPaths)); if (remoteSegmentStoreDirectoryFactory != null) { - cleanRemoteStoreDirectoryIfNeeded(deletedSnapshots, indexSnId, oldRepoData, remoteSegmentStoreDirectoryFactory); + cleanRemoteStoreDirectoryIfNeeded( + deletedSnapshots, + indexSnId, + oldRepoData, + remoteSegmentStoreDirectoryFactory, + new HashMap<>(), + true + ); } // Finally, we delete the [base_path]/indexId folder @@ -2135,7 +2150,9 @@ private void cleanRemoteStoreDirectoryIfNeeded( Collection deletedSnapshots, String indexSnId, RepositoryData oldRepoData, - RemoteSegmentStoreDirectoryFactory remoteSegmentStoreDirectoryFactory + RemoteSegmentStoreDirectoryFactory remoteSegmentStoreDirectoryFactory, + Map snapshotIdPinnedTimestampMap, + boolean forceClean ) { assert (indexSnId != null); @@ -2178,6 +2195,8 @@ private void cleanRemoteStoreDirectoryIfNeeded( prevIndexMetadata ); + String pinnginEntity = SnapshotsService.getPinningEntity(getMetadata().name(), snapshotId.getUUID()); + for (int shardId = 0; shardId < prevIndexMetadata.getNumberOfShards(); shardId++) { ShardId shard = new ShardId(Index.UNKNOWN_INDEX_NAME, prevIndexMetadata.getIndexUUID(), shardId); remoteDirectoryCleanupAsync( @@ -2189,7 +2208,15 @@ private void cleanRemoteStoreDirectoryIfNeeded( ThreadPool.Names.REMOTE_PURGE, remoteStorePathStrategy ); - remoteTranslogCleanupAsync(remoteTranslogRepository, shard, remoteStorePathStrategy, prevIndexMetadata); + remoteTranslogCleanupAsync( + remoteTranslogRepository, + shard, + remoteStorePathStrategy, + prevIndexMetadata, + forceClean, + pinnginEntity, + snapshotIdPinnedTimestampMap.get(snapshotId) + ); } } } catch (Exception e) { @@ -2213,7 +2240,10 @@ private void remoteTranslogCleanupAsync( Repository remoteTranslogRepository, ShardId shardId, RemoteStorePathStrategy remoteStorePathStrategy, - IndexMetadata prevIndexMetadata + IndexMetadata prevIndexMetadata, + boolean forceClean, + String pinningEntity, + Long pinnedTimestamp ) { assert remoteTranslogRepository instanceof BlobStoreRepository; boolean indexMetadataEnabled = RemoteStoreUtils.determineTranslogMetadataEnabled(prevIndexMetadata); @@ -2230,7 +2260,7 @@ private void remoteTranslogCleanupAsync( indexMetadataEnabled ); try { - RemoteFsTimestampAwareTranslog.cleanup(translogTransferManager); + RemoteFsTimestampAwareTranslog.cleanup(translogTransferManager, forceClean, pinningEntity, pinnedTimestamp); } catch (IOException e) { logger.error("Exception while cleaning up remote translog for shard: " + shardId, e); } diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java index f6e550525a3e5..23f6deff3715d 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java @@ -613,7 +613,7 @@ private void updateSnapshotPinnedTimestamp( ) { remoteStorePinnedTimestampService.pinTimestamp( timestampToPin, - snapshot.getRepository() + SNAPSHOT_PINNED_TIMESTAMP_DELIMITER + snapshot.getSnapshotId().getUUID(), + getPinningEntity(snapshot.getRepository(), snapshot.getSnapshotId().getUUID()), new ActionListener() { @Override public void onResponse(Void unused) { @@ -631,6 +631,10 @@ public void onFailure(Exception e) { ); } + public static String getPinningEntity(String repositoryName, String snapshotUUID) { + return repositoryName + SNAPSHOT_PINNED_TIMESTAMP_DELIMITER + snapshotUUID; + } + private void cloneSnapshotPinnedTimestamp( RepositoryData repositoryData, SnapshotId sourceSnapshot, @@ -640,8 +644,8 @@ private void cloneSnapshotPinnedTimestamp( ) { remoteStorePinnedTimestampService.cloneTimestamp( timestampToPin, - snapshot.getRepository() + SNAPSHOT_PINNED_TIMESTAMP_DELIMITER + sourceSnapshot.getUUID(), - snapshot.getRepository() + SNAPSHOT_PINNED_TIMESTAMP_DELIMITER + snapshot.getSnapshotId().getUUID(), + getPinningEntity(snapshot.getRepository(), sourceSnapshot.getUUID()), + getPinningEntity(snapshot.getRepository(), snapshot.getSnapshotId().getUUID()), new ActionListener() { @Override public void onResponse(Void unused) {