From a1846cf2cc1815f27c99db05ab4ecd731e4bcf8b Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Wed, 25 Sep 2024 21:51:36 +0530 Subject: [PATCH] [Backport 2.x] Optimize remote store GC flow with pinned timestamps (#16077) * Optimize remote store GC flow with pinned timestamps (#15943) Signed-off-by: Sachin Kale (cherry picked from commit dc4dbced2f9046ab5debb5dfec4e9634217be0c2) Signed-off-by: github-actions[bot] * Fix breaking changes Signed-off-by: Sachin Kale --------- Signed-off-by: Sachin Kale Signed-off-by: github-actions[bot] Co-authored-by: github-actions[bot] Co-authored-by: Sachin Kale --- ...rePinnedTimestampsGarbageCollectionIT.java | 433 ++++++++++++++++++ .../snapshots/DeleteSnapshotV2IT.java | 231 +++++----- .../store/RemoteSegmentStoreDirectory.java | 24 +- .../RemoteFsTimestampAwareTranslog.java | 171 ++++--- .../index/translog/RemoteFsTranslog.java | 13 +- .../transfer/TranslogTransferMetadata.java | 10 + .../RemoteStorePinnedTimestampService.java | 4 + .../blobstore/BlobStoreRepository.java | 43 +- .../snapshots/SnapshotsService.java | 10 +- .../RemoteSegmentStoreDirectoryTests.java | 3 +- .../RemoteFsTimestampAwareTranslogTests.java | 147 +++++- 11 files changed, 863 insertions(+), 226 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java new file mode 100644 index 0000000000000..0a2668c60d3bd --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java @@ -0,0 +1,433 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore; + +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.collect.Tuple; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.action.ActionListener; +import org.opensearch.index.translog.transfer.TranslogTransferMetadata; +import org.opensearch.indices.RemoteStoreSettings; +import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.LongStream; + +import static org.opensearch.index.IndexSettings.INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING; +import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG; +import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA; +import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class RemoteStorePinnedTimestampsGarbageCollectionIT extends RemoteStoreBaseIntegTestCase { + static final String INDEX_NAME = "remote-store-test-idx-1"; + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true) + .put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA.getKey(), false) + .build(); + } + + private void keepPinnedTimestampSchedulerUpdated() throws InterruptedException { + long currentTime = System.currentTimeMillis(); + int maxRetry = 10; + while (maxRetry > 0 && RemoteStorePinnedTimestampService.getPinnedTimestamps().v1() <= currentTime) { + Thread.sleep(1000); + maxRetry--; + } + } + + ActionListener noOpActionListener = new ActionListener<>() { + @Override + public void onResponse(Void unused) {} + + @Override + public void onFailure(Exception e) {} + }; + + public void testLiveIndexNoPinnedTimestamps() throws Exception { + prepareCluster(1, 1, Settings.EMPTY); + Settings indexSettings = Settings.builder() + .put(remoteStoreIndexSettings(0, 1)) + .put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 0) + .build(); + createIndex(INDEX_NAME, indexSettings); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + + RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance( + RemoteStorePinnedTimestampService.class, + primaryNodeName(INDEX_NAME) + ); + + remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1)); + + int numDocs = randomIntBetween(5, 10); + for (int i = 0; i < numDocs; i++) { + keepPinnedTimestampSchedulerUpdated(); + indexSingleDoc(INDEX_NAME, true); + } + + String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings()); + String shardDataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + DATA, + translogPathFixedPrefix + ).buildAsString(); + Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1"); + String shardMetadataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + METADATA, + translogPathFixedPrefix + ).buildAsString(); + Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath); + + assertBusy(() -> { + List metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList()); + assertEquals(2, metadataFiles.size()); + + verifyTranslogDataFileCount(metadataFiles, translogDataPath); + }); + } + + public void testLiveIndexNoPinnedTimestampsWithExtraGenSettingWithinLimit() throws Exception { + prepareCluster(1, 1, Settings.EMPTY); + Settings indexSettings = Settings.builder() + .put(remoteStoreIndexSettings(0, 1)) + .put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 10) + .build(); + createIndex(INDEX_NAME, indexSettings); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + + RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance( + RemoteStorePinnedTimestampService.class, + primaryNodeName(INDEX_NAME) + ); + + remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1)); + + int numDocs = randomIntBetween(5, 9); + for (int i = 0; i < numDocs; i++) { + keepPinnedTimestampSchedulerUpdated(); + indexSingleDoc(INDEX_NAME, true); + } + + String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings()); + String shardDataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + DATA, + translogPathFixedPrefix + ).buildAsString(); + Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1"); + String shardMetadataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + METADATA, + translogPathFixedPrefix + ).buildAsString(); + Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath); + + assertBusy(() -> { + List metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList()); + assertTrue(metadataFiles.size() >= numDocs + 1); + + verifyTranslogDataFileCount(metadataFiles, translogDataPath); + }, 30, TimeUnit.SECONDS); + } + + public void testLiveIndexNoPinnedTimestampsWithExtraGenSetting() throws Exception { + prepareCluster(1, 1, Settings.EMPTY); + Settings indexSettings = Settings.builder() + .put(remoteStoreIndexSettings(0, 1)) + .put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 3) + .build(); + createIndex(INDEX_NAME, indexSettings); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + + RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance( + RemoteStorePinnedTimestampService.class, + primaryNodeName(INDEX_NAME) + ); + + remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1)); + + int numDocs = 5; + for (int i = 0; i < numDocs; i++) { + keepPinnedTimestampSchedulerUpdated(); + indexSingleDoc(INDEX_NAME, true); + } + + String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings()); + String shardDataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + DATA, + translogPathFixedPrefix + ).buildAsString(); + Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1"); + String shardMetadataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + METADATA, + translogPathFixedPrefix + ).buildAsString(); + Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath); + + assertBusy(() -> { + List metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList()); + assertEquals(5, metadataFiles.size()); + + verifyTranslogDataFileCount(metadataFiles, translogDataPath); + }); + } + + public void testLiveIndexWithPinnedTimestamps() throws Exception { + prepareCluster(1, 1, Settings.EMPTY); + Settings indexSettings = Settings.builder() + .put(remoteStoreIndexSettings(0, 1)) + .put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 0) + .build(); + createIndex(INDEX_NAME, indexSettings); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + + RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance( + RemoteStorePinnedTimestampService.class, + primaryNodeName(INDEX_NAME) + ); + + remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1)); + + int numDocs = randomIntBetween(5, 10); + for (int i = 0; i < numDocs; i++) { + keepPinnedTimestampSchedulerUpdated(); + indexSingleDoc(INDEX_NAME, true); + if (i == 2) { + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.timeValueMinutes(1)); + remoteStorePinnedTimestampService.pinTimestamp(System.currentTimeMillis(), "xyz", noOpActionListener); + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + } + } + + String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings()); + String shardDataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + DATA, + translogPathFixedPrefix + ).buildAsString(); + Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1"); + String shardMetadataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + METADATA, + translogPathFixedPrefix + ).buildAsString(); + Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath); + + assertBusy(() -> { + List metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList()); + assertEquals(3, metadataFiles.size()); + + verifyTranslogDataFileCount(metadataFiles, translogDataPath); + }); + } + + public void testIndexDeletionNoPinnedTimestamps() throws Exception { + prepareCluster(1, 1, Settings.EMPTY); + Settings indexSettings = Settings.builder() + .put(remoteStoreIndexSettings(0, 1)) + .put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 0) + .build(); + createIndex(INDEX_NAME, indexSettings); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + + RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance( + RemoteStorePinnedTimestampService.class, + primaryNodeName(INDEX_NAME) + ); + + remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1)); + + int numDocs = randomIntBetween(5, 10); + for (int i = 0; i < numDocs; i++) { + keepPinnedTimestampSchedulerUpdated(); + indexSingleDoc(INDEX_NAME, true); + } + + String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings()); + String shardDataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + DATA, + translogPathFixedPrefix + ).buildAsString(); + Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1"); + String shardMetadataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + METADATA, + translogPathFixedPrefix + ).buildAsString(); + Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath); + + assertBusy(() -> { + List metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList()); + assertEquals(2, metadataFiles.size()); + + verifyTranslogDataFileCount(metadataFiles, translogDataPath); + }); + + keepPinnedTimestampSchedulerUpdated(); + client().admin().indices().prepareDelete(INDEX_NAME).get(); + + assertBusy(() -> { + assertEquals(0, Files.list(translogMetadataPath).collect(Collectors.toList()).size()); + assertEquals(0, Files.list(translogDataPath).collect(Collectors.toList()).size()); + }); + } + + public void testIndexDeletionWithPinnedTimestamps() throws Exception { + prepareCluster(1, 1, Settings.EMPTY); + Settings indexSettings = Settings.builder() + .put(remoteStoreIndexSettings(0, 1)) + .put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 0) + .build(); + createIndex(INDEX_NAME, indexSettings); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + + RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance( + RemoteStorePinnedTimestampService.class, + primaryNodeName(INDEX_NAME) + ); + + remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1)); + + int numDocs = randomIntBetween(5, 10); + for (int i = 0; i < numDocs; i++) { + keepPinnedTimestampSchedulerUpdated(); + indexSingleDoc(INDEX_NAME, true); + if (i == 2) { + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.timeValueMinutes(1)); + remoteStorePinnedTimestampService.pinTimestamp(System.currentTimeMillis(), "xyz", noOpActionListener); + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + } + } + + String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings()); + String shardDataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + DATA, + translogPathFixedPrefix + ).buildAsString(); + Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1"); + String shardMetadataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + METADATA, + translogPathFixedPrefix + ).buildAsString(); + Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath); + + assertBusy(() -> { + List metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList()); + assertEquals(3, metadataFiles.size()); + + verifyTranslogDataFileCount(metadataFiles, translogDataPath); + }, 30, TimeUnit.SECONDS); + + keepPinnedTimestampSchedulerUpdated(); + client().admin().indices().prepareDelete(INDEX_NAME).get(); + + assertBusy(() -> { + List metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList()); + assertEquals(1, metadataFiles.size()); + + verifyTranslogDataFileCount(metadataFiles, translogDataPath); + }); + } + + private void verifyTranslogDataFileCount(List metadataFiles, Path translogDataPath) throws IOException { + List mdFiles = metadataFiles.stream().map(p -> p.getFileName().toString()).collect(Collectors.toList()); + Set generations = new HashSet<>(); + for (String mdFile : mdFiles) { + Tuple minMaxGen = TranslogTransferMetadata.getMinMaxTranslogGenerationFromFilename(mdFile); + generations.addAll(LongStream.rangeClosed(minMaxGen.v1(), minMaxGen.v2()).boxed().collect(Collectors.toList())); + } + assertEquals(generations.size() * 2, Files.list(translogDataPath).collect(Collectors.toList()).size()); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotV2IT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotV2IT.java index 1d7a58384c0be..c4e3a478c8540 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotV2IT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotV2IT.java @@ -23,19 +23,31 @@ import org.opensearch.repositories.fs.FsRepository; import org.opensearch.test.OpenSearchIntegTestCase; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; import java.nio.file.Path; +import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.lessThan; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class DeleteSnapshotV2IT extends AbstractSnapshotIntegTestCase { private static final String REMOTE_REPO_NAME = "remote-store-repo-name"; + private void keepPinnedTimestampSchedulerUpdated() throws InterruptedException { + long currentTime = System.currentTimeMillis(); + int maxRetry = 10; + while (maxRetry > 0 && RemoteStorePinnedTimestampService.getPinnedTimestamps().v1() <= currentTime) { + Thread.sleep(1000); + maxRetry--; + } + } + public void testDeleteShallowCopyV2() throws Exception { disableRepoConsistencyCheck("Remote store repository is being used in the test"); @@ -74,8 +86,8 @@ public void testDeleteShallowCopyV2() throws Exception { createIndex(indexName1, getRemoteStoreBackedIndexSettings()); createIndex(indexName2, getRemoteStoreBackedIndexSettings()); - final int numDocsInIndex1 = 10; - final int numDocsInIndex2 = 20; + final int numDocsInIndex1 = 1; + final int numDocsInIndex2 = 2; indexRandomDocs(indexName1, numDocsInIndex1); indexRandomDocs(indexName2, numDocsInIndex2); ensureGreen(indexName1, indexName2); @@ -92,7 +104,7 @@ public void testDeleteShallowCopyV2() throws Exception { assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName1)); createIndex(indexName3, getRemoteStoreBackedIndexSettings()); - indexRandomDocs(indexName3, 10); + indexRandomDocs(indexName3, 1); CreateSnapshotResponse createSnapshotResponse2 = client().admin() .cluster() .prepareCreateSnapshot(snapshotRepoName, snapshotName2) @@ -105,109 +117,101 @@ public void testDeleteShallowCopyV2() throws Exception { assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName2)); assertAcked(client().admin().indices().prepareDelete(indexName1)); - Thread.sleep(100); - AcknowledgedResponse deleteResponse = client().admin() - .cluster() - .prepareDeleteSnapshot(snapshotRepoName, snapshotName2) - .setSnapshots(snapshotName2) - .get(); + AcknowledgedResponse deleteResponse = client().admin().cluster().prepareDeleteSnapshot(snapshotRepoName, snapshotName2).get(); assertTrue(deleteResponse.isAcknowledged()); // test delete non-existent snapshot assertThrows( SnapshotMissingException.class, - () -> client().admin().cluster().prepareDeleteSnapshot(snapshotRepoName, "random-snapshot").setSnapshots(snapshotName2).get() + () -> client().admin().cluster().prepareDeleteSnapshot(snapshotRepoName, "random-snapshot").get() ); - } - public void testDeleteShallowCopyV2MultipleSnapshots() throws Exception { + public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exception { disableRepoConsistencyCheck("Remote store repository is being used in the test"); final Path remoteStoreRepoPath = randomRepoPath(); + Settings settings = remoteStoreClusterSettings(REMOTE_REPO_NAME, remoteStoreRepoPath); + settings = Settings.builder() + .put(settings) + .put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true) + .put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), RemoteStoreEnums.PathType.FIXED.toString()) + .build(); + String clusterManagerName = internalCluster().startClusterManagerOnlyNode(settings); + internalCluster().startDataOnlyNode(settings); + final Client clusterManagerClient = internalCluster().clusterManagerClient(); + ensureStableCluster(2); - internalCluster().startClusterManagerOnlyNode(snapshotV2Settings(remoteStoreRepoPath)); - internalCluster().startDataOnlyNode(snapshotV2Settings(remoteStoreRepoPath)); - internalCluster().startDataOnlyNode(snapshotV2Settings(remoteStoreRepoPath)); - - String indexName1 = "testindex1"; - String indexName2 = "testindex2"; - String indexName3 = "testindex3"; - String snapshotRepoName = "test-create-snapshot-repo"; - String snapshotName1 = "test-create-snapshot1"; - String snapshotName2 = "test-create-snapshot2"; - Path absolutePath1 = randomRepoPath().toAbsolutePath(); - logger.info("Snapshot Path [{}]", absolutePath1); - - Client client = client(); - - assertAcked( - client.admin() - .cluster() - .preparePutRepository(snapshotRepoName) - .setType(FsRepository.TYPE) - .setSettings( - Settings.builder() - .put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1) - .put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean()) - .put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES) - .put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true) - .put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true) - ) + RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance( + RemoteStorePinnedTimestampService.class, + clusterManagerName ); - createIndex(indexName1, getRemoteStoreBackedIndexSettings()); + final String snapshotRepoName = "snapshot-repo-name"; + final Path snapshotRepoPath = randomRepoPath(); + createRepository(snapshotRepoName, "mock", snapshotRepoSettingsForShallowV2(snapshotRepoPath)); - createIndex(indexName2, getRemoteStoreBackedIndexSettings()); + final String remoteStoreEnabledIndexName = "remote-index-1"; + final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings(); + createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings); + indexRandomDocs(remoteStoreEnabledIndexName, 25); - final int numDocsInIndex1 = 10; - final int numDocsInIndex2 = 20; - indexRandomDocs(indexName1, numDocsInIndex1); - indexRandomDocs(indexName2, numDocsInIndex2); - ensureGreen(indexName1, indexName2); + String indexUUID = client().admin() + .indices() + .prepareGetSettings(remoteStoreEnabledIndexName) + .get() + .getSetting(remoteStoreEnabledIndexName, IndexMetadata.SETTING_INDEX_UUID); + logger.info("--> create two remote index shallow snapshots"); CreateSnapshotResponse createSnapshotResponse = client().admin() .cluster() - .prepareCreateSnapshot(snapshotRepoName, snapshotName1) + .prepareCreateSnapshot(snapshotRepoName, "snap1") .setWaitForCompletion(true) .get(); - SnapshotInfo snapshotInfo = createSnapshotResponse.getSnapshotInfo(); - assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS)); - assertThat(snapshotInfo.successfulShards(), greaterThan(0)); - assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards())); - assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName1)); + SnapshotInfo snapshotInfo1 = createSnapshotResponse.getSnapshotInfo(); - createIndex(indexName3, getRemoteStoreBackedIndexSettings()); - indexRandomDocs(indexName3, 10); + Path indexPath = Path.of(String.valueOf(remoteStoreRepoPath), indexUUID); + Path shardPath = Path.of(String.valueOf(indexPath), "0"); - CreateSnapshotResponse createSnapshotResponse2 = client().admin() - .cluster() - .prepareCreateSnapshot(snapshotRepoName, snapshotName2) - .setWaitForCompletion(true) - .get(); - snapshotInfo = createSnapshotResponse2.getSnapshotInfo(); - assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS)); - assertThat(snapshotInfo.successfulShards(), greaterThan(0)); - assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards())); - assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName2)); + // delete remote store index + assertAcked(client().admin().indices().prepareDelete(remoteStoreEnabledIndexName)); + + logger.info("--> delete snapshot 1"); + + Path segmentsPath = Path.of(String.valueOf(shardPath), "segments"); + Path translogPath = Path.of(String.valueOf(shardPath), "translog"); + + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); - AcknowledgedResponse deleteResponse = client().admin() + remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1)); + + keepPinnedTimestampSchedulerUpdated(); + + AcknowledgedResponse deleteSnapshotResponse = clusterManagerClient.admin() .cluster() - .prepareDeleteSnapshot(snapshotRepoName, snapshotName1, snapshotName2) - .setSnapshots(snapshotName2) + .prepareDeleteSnapshot(snapshotRepoName, snapshotInfo1.snapshotId().getName()) .get(); - assertTrue(deleteResponse.isAcknowledged()); + assertAcked(deleteSnapshotResponse); - // test delete non-existent snapshot - assertThrows( - SnapshotMissingException.class, - () -> client().admin().cluster().prepareDeleteSnapshot(snapshotRepoName, "random-snapshot").setSnapshots(snapshotName2).get() - ); + // Delete is async. Give time for it + assertBusy(() -> { + try { + assertEquals(0, RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath)); + } catch (NoSuchFileException e) { + fail(); + } + }, 60, TimeUnit.SECONDS); + assertBusy(() -> { + try { + assertEquals(0, RemoteStoreBaseIntegTestCase.getFileCount(translogPath)); + } catch (NoSuchFileException e) { + fail(); + } + }, 60, TimeUnit.SECONDS); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/15692") - public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exception { + public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2MultipleSnapshots() throws Exception { disableRepoConsistencyCheck("Remote store repository is being used in the test"); final Path remoteStoreRepoPath = randomRepoPath(); Settings settings = remoteStoreClusterSettings(REMOTE_REPO_NAME, remoteStoreRepoPath); @@ -242,11 +246,11 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exceptio .get() .getSetting(remoteStoreEnabledIndexName, IndexMetadata.SETTING_INDEX_UUID); - String numShards = client().admin() - .indices() - .prepareGetSettings(remoteStoreEnabledIndexName) - .get() - .getSetting(remoteStoreEnabledIndexName, IndexMetadata.SETTING_NUMBER_OF_SHARDS); + Path indexPath = Path.of(String.valueOf(remoteStoreRepoPath), indexUUID); + Path shardPath = Path.of(String.valueOf(indexPath), "0"); + + Path segmentsPath = Path.of(String.valueOf(shardPath), "segments", "data"); + Path translogPath = Path.of(String.valueOf(shardPath), "translog", "data", "1"); logger.info("--> create two remote index shallow snapshots"); CreateSnapshotResponse createSnapshotResponse = client().admin() @@ -256,6 +260,11 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exceptio .get(); SnapshotInfo snapshotInfo1 = createSnapshotResponse.getSnapshotInfo(); + List segmentsPostSnapshot1 = Files.list(segmentsPath).collect(Collectors.toList()); + List translogPostSnapshot1 = Files.list(translogPath).collect(Collectors.toList()); + + forceMerge(1); + refresh(remoteStoreEnabledIndexName); indexRandomDocs(remoteStoreEnabledIndexName, 25); CreateSnapshotResponse createSnapshotResponse2 = client().admin() @@ -264,46 +273,28 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exceptio .setWaitForCompletion(true) .get(); SnapshotInfo snapshotInfo2 = createSnapshotResponse2.getSnapshotInfo(); + + List segmentsPostSnapshot2 = Files.list(segmentsPath).collect(Collectors.toList()); + List translogPostSnapshot2 = Files.list(translogPath).collect(Collectors.toList()); + assertThat(snapshotInfo2.state(), equalTo(SnapshotState.SUCCESS)); assertThat(snapshotInfo2.successfulShards(), greaterThan(0)); assertThat(snapshotInfo2.successfulShards(), equalTo(snapshotInfo2.totalShards())); assertThat(snapshotInfo2.snapshotId().getName(), equalTo("snap2")); - // delete remote store index - assertAcked(client().admin().indices().prepareDelete(remoteStoreEnabledIndexName)); - - logger.info("--> delete snapshot 2"); + assertBusy(() -> assertTrue(translogPostSnapshot2.size() > translogPostSnapshot1.size()), 60, TimeUnit.SECONDS); + assertBusy(() -> assertTrue(segmentsPostSnapshot2.size() > segmentsPostSnapshot1.size()), 60, TimeUnit.SECONDS); - Path indexPath = Path.of(String.valueOf(remoteStoreRepoPath), indexUUID); - Path shardPath = Path.of(String.valueOf(indexPath), "0"); - Path segmentsPath = Path.of(String.valueOf(shardPath), "segments"); - Path translogPath = Path.of(String.valueOf(shardPath), "translog"); - - // Get total segments remote store directory file count for deleted index and shard 0 - int segmentFilesCountBeforeDeletingSnapshot1 = RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath); - int translogFilesCountBeforeDeletingSnapshot1 = RemoteStoreBaseIntegTestCase.getFileCount(translogPath); - - RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + keepPinnedTimestampSchedulerUpdated(); - AcknowledgedResponse deleteSnapshotResponse = clusterManagerClient.admin() - .cluster() - .prepareDeleteSnapshot(snapshotRepoName, snapshotInfo2.snapshotId().getName()) - .get(); - assertAcked(deleteSnapshotResponse); - - Thread.sleep(5000); - - assertBusy(() -> { - try { - assertThat(RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath), lessThan(segmentFilesCountBeforeDeletingSnapshot1)); - } catch (Exception e) {} - }, 30, TimeUnit.SECONDS); - int segmentFilesCountAfterDeletingSnapshot1 = RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath); + // delete remote store index + assertAcked(client().admin().indices().prepareDelete(remoteStoreEnabledIndexName)); logger.info("--> delete snapshot 1"); RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + keepPinnedTimestampSchedulerUpdated(); // on snapshot deletion, remote store segment files should get cleaned up for deleted index - `remote-index-1` - deleteSnapshotResponse = clusterManagerClient.admin() + AcknowledgedResponse deleteSnapshotResponse = clusterManagerClient.admin() .cluster() .prepareDeleteSnapshot(snapshotRepoName, snapshotInfo1.snapshotId().getName()) .get(); @@ -311,23 +302,21 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exceptio // Delete is async. Give time for it assertBusy(() -> { - try { - assertThat(RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath), lessThan(segmentFilesCountAfterDeletingSnapshot1)); - } catch (Exception e) {} + List segmentsPostDeletionOfSnapshot1 = Files.list(segmentsPath).collect(Collectors.toList()); + assertTrue(segmentsPostDeletionOfSnapshot1.size() < segmentsPostSnapshot2.size()); }, 60, TimeUnit.SECONDS); - - assertBusy(() -> { - try { - assertThat(RemoteStoreBaseIntegTestCase.getFileCount(translogPath), lessThan(translogFilesCountBeforeDeletingSnapshot1)); - } catch (Exception e) {} - }, 60, TimeUnit.SECONDS); - + // To uncomment following, we need to handle deletion of generations in translog cleanup flow + // List translogPostDeletionOfSnapshot1 = Files.list(translogPath).collect(Collectors.toList()); + // Delete is async. Give time for it + // assertBusy(() -> assertEquals(translogPostSnapshot2.size() - translogPostSnapshot1.size(), + // translogPostDeletionOfSnapshot1.size()), 60, TimeUnit.SECONDS); } private Settings snapshotV2Settings(Path remoteStoreRepoPath) { Settings settings = Settings.builder() .put(remoteStoreClusterSettings(REMOTE_REPO_NAME, remoteStoreRepoPath)) .put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true) + .put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA.getKey(), false) .build(); return settings; } diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index 576ac9a26acc7..2fd5616774e92 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -869,7 +869,7 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException } // Check last fetch status of pinned timestamps. If stale, return. - if (RemoteStoreUtils.isPinnedTimestampStateStale()) { + if (lastNMetadataFilesToKeep != 0 && RemoteStoreUtils.isPinnedTimestampStateStale()) { logger.warn("Skipping remote segment store garbage collection as last fetch of pinned timestamp is stale"); return; } @@ -1010,6 +1010,17 @@ public static void remoteDirectoryCleanup( String indexUUID, ShardId shardId, RemoteStorePathStrategy pathStrategy + ) { + remoteDirectoryCleanup(remoteDirectoryFactory, remoteStoreRepoForIndex, indexUUID, shardId, pathStrategy, false); + } + + public static void remoteDirectoryCleanup( + RemoteSegmentStoreDirectoryFactory remoteDirectoryFactory, + String remoteStoreRepoForIndex, + String indexUUID, + ShardId shardId, + RemoteStorePathStrategy pathStrategy, + boolean forceClean ) { try { RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = (RemoteSegmentStoreDirectory) remoteDirectoryFactory.newDirectory( @@ -1018,8 +1029,12 @@ public static void remoteDirectoryCleanup( shardId, pathStrategy ); - remoteSegmentStoreDirectory.deleteStaleSegments(0); - remoteSegmentStoreDirectory.deleteIfEmpty(); + if (forceClean) { + remoteSegmentStoreDirectory.delete(); + } else { + remoteSegmentStoreDirectory.deleteStaleSegments(0); + remoteSegmentStoreDirectory.deleteIfEmpty(); + } } catch (Exception e) { staticLogger.error("Exception occurred while deleting directory", e); } @@ -1038,7 +1053,10 @@ private boolean deleteIfEmpty() throws IOException { logger.info("Remote directory still has files, not deleting the path"); return false; } + return delete(); + } + private boolean delete() { try { remoteDataDirectory.delete(); remoteMetadataDirectory.delete(); diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java index 27d34ec0d05af..e61a9606175ee 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java @@ -34,6 +34,7 @@ import java.util.Optional; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BooleanSupplier; import java.util.function.LongConsumer; @@ -61,6 +62,7 @@ public class RemoteFsTimestampAwareTranslog extends RemoteFsTranslog { private final Map> oldFormatMetadataFileGenerationMap; private final Map> oldFormatMetadataFilePrimaryTermMap; private final AtomicLong minPrimaryTermInRemote = new AtomicLong(Long.MAX_VALUE); + private final AtomicBoolean triggerTrimOnMinRemoteGenReferencedChange = new AtomicBoolean(false); public RemoteFsTimestampAwareTranslog( TranslogConfig config, @@ -105,6 +107,11 @@ protected void onDelete() { } } + @Override + protected void onMinRemoteGenReferencedChange() { + triggerTrimOnMinRemoteGenReferencedChange.set(true); + } + @Override public void trimUnreferencedReaders() throws IOException { trimUnreferencedReaders(false, true); @@ -135,14 +142,22 @@ protected void trimUnreferencedReaders(boolean indexDeleted, boolean trimLocal) // This is to ensure that after the permits are acquired during primary relocation, there are no further modification on remote // store. - if (startedPrimarySupplier.getAsBoolean() == false || pauseSync.get()) { + if (indexDeleted == false && (startedPrimarySupplier.getAsBoolean() == false || pauseSync.get())) { return; } // This is to fail fast and avoid listing md files un-necessarily. if (indexDeleted == false && RemoteStoreUtils.isPinnedTimestampStateStale()) { - logger.warn("Skipping remote segment store garbage collection as last fetch of pinned timestamp is stale"); + logger.warn("Skipping remote translog garbage collection as last fetch of pinned timestamp is stale"); + return; + } + + // This code block ensures parity with RemoteFsTranslog. Without this, we will end up making list translog metadata + // call in each invocation of trimUnreferencedReaders + if (indexDeleted == false && triggerTrimOnMinRemoteGenReferencedChange.get() == false) { return; + } else if (triggerTrimOnMinRemoteGenReferencedChange.get()) { + triggerTrimOnMinRemoteGenReferencedChange.set(false); } // Since remote generation deletion is async, this ensures that only one generation deletion happens at a time. @@ -158,7 +173,7 @@ public void onResponse(List blobMetadata) { List metadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList()); try { - if (metadataFiles.size() <= 1) { + if (indexDeleted == false && metadataFiles.size() <= 1) { logger.debug("No stale translog metadata files found"); remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS); return; @@ -166,16 +181,12 @@ public void onResponse(List blobMetadata) { // Check last fetch status of pinned timestamps. If stale, return. if (indexDeleted == false && RemoteStoreUtils.isPinnedTimestampStateStale()) { - logger.warn("Skipping remote segment store garbage collection as last fetch of pinned timestamp is stale"); + logger.warn("Skipping remote translog garbage collection as last fetch of pinned timestamp is stale"); remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS); return; } - List metadataFilesToBeDeleted = getMetadataFilesToBeDeleted( - metadataFiles, - metadataFilePinnedTimestampMap, - logger - ); + List metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(metadataFiles, indexDeleted); // If index is not deleted, make sure to keep latest metadata file if (indexDeleted == false) { @@ -194,10 +205,11 @@ public void onResponse(List blobMetadata) { metadataFilesNotToBeDeleted.removeAll(metadataFilesToBeDeleted); logger.debug(() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted); + Set generationsToBeDeleted = getGenerationsToBeDeleted( metadataFilesNotToBeDeleted, metadataFilesToBeDeleted, - indexDeleted + indexDeleted ? Long.MAX_VALUE : getMinGenerationToKeepInRemote() ); logger.debug(() -> "generationsToBeDeleted = " + generationsToBeDeleted); @@ -208,7 +220,11 @@ public void onResponse(List blobMetadata) { generationsToBeDeleted, remoteGenerationDeletionPermits::release ); + } else { + remoteGenerationDeletionPermits.release(); + } + if (metadataFilesToBeDeleted.isEmpty() == false) { // Delete stale metadata files translogTransferManager.deleteMetadataFilesAsync( metadataFilesToBeDeleted, @@ -217,11 +233,10 @@ public void onResponse(List blobMetadata) { // Update cache to keep only those metadata files that are not getting deleted oldFormatMetadataFileGenerationMap.keySet().retainAll(metadataFilesNotToBeDeleted); - // Delete stale primary terms deleteStaleRemotePrimaryTerms(metadataFilesNotToBeDeleted); } else { - remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS); + remoteGenerationDeletionPermits.release(); } } catch (Exception e) { remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS); @@ -237,18 +252,16 @@ public void onFailure(Exception e) { translogTransferManager.listTranslogMetadataFilesAsync(listMetadataFilesListener); } + private long getMinGenerationToKeepInRemote() { + return minRemoteGenReferenced - indexSettings().getRemoteTranslogExtraKeep(); + } + // Visible for testing protected Set getGenerationsToBeDeleted( List metadataFilesNotToBeDeleted, List metadataFilesToBeDeleted, - boolean indexDeleted + long minGenerationToKeepInRemote ) throws IOException { - long maxGenerationToBeDeleted = Long.MAX_VALUE; - - if (indexDeleted == false) { - maxGenerationToBeDeleted = minRemoteGenReferenced - 1 - indexSettings().getRemoteTranslogExtraKeep(); - } - Set generationsFromMetadataFilesToBeDeleted = new HashSet<>(); for (String mdFile : metadataFilesToBeDeleted) { Tuple minMaxGen = getMinMaxTranslogGenerationFromMetadataFile(mdFile, translogTransferManager); @@ -262,21 +275,31 @@ protected Set getGenerationsToBeDeleted( Set generationsToBeDeleted = new HashSet<>(); for (long generation : generationsFromMetadataFilesToBeDeleted) { // Check if the generation is not referred by metadata file matching pinned timestamps - if (generation <= maxGenerationToBeDeleted && isGenerationPinned(generation, pinnedGenerations) == false) { + // The check with minGenerationToKeep is redundant but kept as to make sure we don't delete generations + // that are not persisted in remote segment store yet. + if (generation < minGenerationToKeepInRemote && isGenerationPinned(generation, pinnedGenerations) == false) { generationsToBeDeleted.add(generation); } } return generationsToBeDeleted; } - protected List getMetadataFilesToBeDeleted(List metadataFiles) { - return getMetadataFilesToBeDeleted(metadataFiles, metadataFilePinnedTimestampMap, logger); + protected List getMetadataFilesToBeDeleted(List metadataFiles, boolean indexDeleted) { + return getMetadataFilesToBeDeleted( + metadataFiles, + metadataFilePinnedTimestampMap, + getMinGenerationToKeepInRemote(), + indexDeleted, + logger + ); } // Visible for testing protected static List getMetadataFilesToBeDeleted( List metadataFiles, Map metadataFilePinnedTimestampMap, + long minGenerationToKeepInRemote, + boolean indexDeleted, Logger logger ) { Tuple> pinnedTimestampsState = RemoteStorePinnedTimestampService.getPinnedTimestamps(); @@ -312,6 +335,22 @@ protected static List getMetadataFilesToBeDeleted( metadataFilesToBeDeleted.size() ); + if (indexDeleted == false) { + // Filter out metadata files based on minGenerationToKeep + List metadataFilesContainingMinGenerationToKeep = metadataFilesToBeDeleted.stream().filter(md -> { + long maxGeneration = TranslogTransferMetadata.getMaxGenerationFromFileName(md); + return maxGeneration == -1 || maxGeneration >= minGenerationToKeepInRemote; + }).collect(Collectors.toList()); + metadataFilesToBeDeleted.removeAll(metadataFilesContainingMinGenerationToKeep); + + logger.trace( + "metadataFilesContainingMinGenerationToKeep.size = {}, metadataFilesToBeDeleted based on minGenerationToKeep filtering = {}, minGenerationToKeep = {}", + metadataFilesContainingMinGenerationToKeep.size(), + metadataFilesToBeDeleted.size(), + minGenerationToKeepInRemote + ); + } + return metadataFilesToBeDeleted; } @@ -472,50 +511,60 @@ protected static Tuple getMinMaxPrimaryTermFromMetadataFile( } } - public static void cleanup(TranslogTransferManager translogTransferManager) throws IOException { - ActionListener> listMetadataFilesListener = new ActionListener<>() { - @Override - public void onResponse(List blobMetadata) { - List metadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList()); + public static void cleanupOfDeletedIndex(TranslogTransferManager translogTransferManager, boolean forceClean) throws IOException { + if (forceClean) { + translogTransferManager.delete(); + } else { + ActionListener> listMetadataFilesListener = new ActionListener<>() { + @Override + public void onResponse(List blobMetadata) { + List metadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList()); + + try { + if (metadataFiles.isEmpty()) { + staticLogger.debug("No stale translog metadata files found"); + return; + } + List metadataFilesToBeDeleted = getMetadataFilesToBeDeleted( + metadataFiles, + new HashMap<>(), + Long.MAX_VALUE, + true, // This method gets called when the index is no longer present + staticLogger + ); + if (metadataFilesToBeDeleted.isEmpty()) { + staticLogger.debug("No metadata files to delete"); + return; + } + staticLogger.debug(() -> "metadataFilesToBeDeleted = " + metadataFilesToBeDeleted); - try { - if (metadataFiles.isEmpty()) { - staticLogger.debug("No stale translog metadata files found"); - return; - } - List metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(metadataFiles, new HashMap<>(), staticLogger); - if (metadataFilesToBeDeleted.isEmpty()) { - staticLogger.debug("No metadata files to delete"); - return; - } - staticLogger.debug(() -> "metadataFilesToBeDeleted = " + metadataFilesToBeDeleted); + // For all the files that we are keeping, fetch min and max generations + List metadataFilesNotToBeDeleted = new ArrayList<>(metadataFiles); + metadataFilesNotToBeDeleted.removeAll(metadataFilesToBeDeleted); + staticLogger.debug(() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted); - // For all the files that we are keeping, fetch min and max generations - List metadataFilesNotToBeDeleted = new ArrayList<>(metadataFiles); - metadataFilesNotToBeDeleted.removeAll(metadataFilesToBeDeleted); - staticLogger.debug(() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted); + // Delete stale metadata files + translogTransferManager.deleteMetadataFilesAsync(metadataFilesToBeDeleted, () -> {}); - // Delete stale metadata files - translogTransferManager.deleteMetadataFilesAsync(metadataFilesToBeDeleted, () -> {}); + // Delete stale primary terms + deleteStaleRemotePrimaryTerms( + metadataFilesNotToBeDeleted, + translogTransferManager, + new HashMap<>(), + new AtomicLong(Long.MAX_VALUE), + staticLogger + ); + } catch (Exception e) { + staticLogger.error("Exception while cleaning up metadata and primary terms", e); + } + } - // Delete stale primary terms - deleteStaleRemotePrimaryTerms( - metadataFilesNotToBeDeleted, - translogTransferManager, - new HashMap<>(), - new AtomicLong(Long.MAX_VALUE), - staticLogger - ); - } catch (Exception e) { + @Override + public void onFailure(Exception e) { staticLogger.error("Exception while cleaning up metadata and primary terms", e); } - } - - @Override - public void onFailure(Exception e) { - staticLogger.error("Exception while cleaning up metadata and primary terms", e); - } - }; - translogTransferManager.listTranslogMetadataFilesAsync(listMetadataFilesListener); + }; + translogTransferManager.listTranslogMetadataFilesAsync(listMetadataFilesListener); + } } } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index e40c2e8d70b07..f22236eb0ca1c 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -683,12 +683,17 @@ private class RemoteFsTranslogTransferListener implements TranslogTransferListen @Override public void onUploadComplete(TransferSnapshot transferSnapshot) throws IOException { maxRemoteTranslogGenerationUploaded = generation; + long previousMinRemoteGenReferenced = minRemoteGenReferenced; minRemoteGenReferenced = getMinFileGeneration(); + if (previousMinRemoteGenReferenced != minRemoteGenReferenced) { + onMinRemoteGenReferencedChange(); + } logger.debug( - "Successfully uploaded translog for primary term = {}, generation = {}, maxSeqNo = {}", + "Successfully uploaded translog for primary term = {}, generation = {}, maxSeqNo = {}, minRemoteGenReferenced = {}", primaryTerm, generation, - maxSeqNo + maxSeqNo, + minRemoteGenReferenced ); } @@ -702,6 +707,10 @@ public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) thro } } + protected void onMinRemoteGenReferencedChange() { + + } + @Override public long getMinUnreferencedSeqNoInSegments(long minUnrefCheckpointInLastCommit) { return minSeqNoToKeep; diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java index 3b8885055e8f7..7fe3305545085 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java @@ -170,6 +170,16 @@ public static Tuple getMinMaxTranslogGenerationFromFilename(String f } } + public static long getMaxGenerationFromFileName(String filename) { + String[] tokens = filename.split(METADATA_SEPARATOR); + try { + return RemoteStoreUtils.invertLong(tokens[2]); + } catch (Exception e) { + logger.error(() -> new ParameterizedMessage("Exception while getting max generation from: {}", filename), e); + return -1; + } + } + public static Tuple getMinMaxPrimaryTermFromFilename(String filename) { String[] tokens = filename.split(METADATA_SEPARATOR); if (tokens.length < 7) { diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java index 71133615ed056..1448c46583f6a 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java @@ -240,6 +240,10 @@ public void unpinTimestamp(long timestamp, String pinningEntity, ActionListener< } } + public void forceSyncPinnedTimestamps() { + asyncUpdatePinnedTimestampTask.run(); + } + @Override public void close() throws IOException { asyncUpdatePinnedTimestampTask.close(); diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 3d066e4d70990..7353aacc97d7d 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -200,7 +200,6 @@ import static org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm.FNV_1A_COMPOSITE_1; import static org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName; import static org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat.SNAPSHOT_ONLY_FORMAT_PARAMS; -import static org.opensearch.snapshots.SnapshotsService.SNAPSHOT_PINNED_TIMESTAMP_DELIMITER; /** * BlobStore - based implementation of Snapshot Repository @@ -1383,7 +1382,7 @@ private void cleanUpRemoteStoreFilesForDeletedIndicesV2( } // iterate through all the indices and trigger remote store directory cleanup for deleted index segments for (String indexId : uniqueIndexIds) { - cleanRemoteStoreDirectoryIfNeeded(snapshotIds, indexId, repositoryData, remoteSegmentStoreDirectoryFactory); + cleanRemoteStoreDirectoryIfNeeded(snapshotIds, indexId, repositoryData, remoteSegmentStoreDirectoryFactory, false); } afterCleanupsListener.onResponse(null); } catch (Exception e) { @@ -1431,11 +1430,17 @@ private void removeSnapshotPinnedTimestamp( ) { remoteStorePinnedTimestampService.unpinTimestamp( timestampToUnpin, - repository + SNAPSHOT_PINNED_TIMESTAMP_DELIMITER + snapshotId.getUUID(), + SnapshotsService.getPinningEntity(repository, snapshotId.getUUID()), new ActionListener() { @Override public void onResponse(Void unused) { - logger.debug("Timestamp {} unpinned successfully for snapshot {}", timestampToUnpin, snapshotId.getName()); + logger.info("Timestamp {} unpinned successfully for snapshot {}", timestampToUnpin, snapshotId.getName()); + try { + remoteStorePinnedTimestampService.forceSyncPinnedTimestamps(); + logger.debug("Successfully synced pinned timestamp state"); + } catch (Exception e) { + logger.warn("Exception while updating pinning timestamp state, snapshot deletion will continue", e); + } listener.onResponse(null); } @@ -1540,7 +1545,8 @@ public static void remoteDirectoryCleanupAsync( String indexUUID, ShardId shardId, String threadPoolName, - RemoteStorePathStrategy pathStrategy + RemoteStorePathStrategy pathStrategy, + boolean forceClean ) { threadpool.executor(threadPoolName) .execute( @@ -1550,7 +1556,8 @@ public static void remoteDirectoryCleanupAsync( remoteStoreRepoForIndex, indexUUID, shardId, - pathStrategy + pathStrategy, + forceClean ), indexUUID, shardId @@ -1606,7 +1613,8 @@ protected void releaseRemoteStoreLockAndCleanup( indexUUID, new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.parseInt(shardId)), ThreadPool.Names.REMOTE_PURGE, - remoteStoreShardShallowCopySnapshot.getRemoteStorePathStrategy() + remoteStoreShardShallowCopySnapshot.getRemoteStorePathStrategy(), + false ); } } @@ -2169,7 +2177,7 @@ private void executeOneStaleIndexDelete( deleteResult = deleteResult.add(cleanUpStaleSnapshotShardPathsFile(matchingShardPaths, snapshotShardPaths)); if (remoteSegmentStoreDirectoryFactory != null) { - cleanRemoteStoreDirectoryIfNeeded(deletedSnapshots, indexSnId, oldRepoData, remoteSegmentStoreDirectoryFactory); + cleanRemoteStoreDirectoryIfNeeded(deletedSnapshots, indexSnId, oldRepoData, remoteSegmentStoreDirectoryFactory, true); } // Finally, we delete the [base_path]/indexId folder @@ -2241,7 +2249,8 @@ private void cleanRemoteStoreDirectoryIfNeeded( Collection deletedSnapshots, String indexSnId, RepositoryData oldRepoData, - RemoteSegmentStoreDirectoryFactory remoteSegmentStoreDirectoryFactory + RemoteSegmentStoreDirectoryFactory remoteSegmentStoreDirectoryFactory, + boolean forceClean ) { assert (indexSnId != null); @@ -2293,9 +2302,16 @@ private void cleanRemoteStoreDirectoryIfNeeded( prevIndexMetadata.getIndexUUID(), shard, ThreadPool.Names.REMOTE_PURGE, - remoteStorePathStrategy + remoteStorePathStrategy, + forceClean + ); + remoteTranslogCleanupAsync( + remoteTranslogRepository, + shard, + remoteStorePathStrategy, + prevIndexMetadata, + forceClean ); - remoteTranslogCleanupAsync(remoteTranslogRepository, shard, remoteStorePathStrategy, prevIndexMetadata); } } } catch (Exception e) { @@ -2319,7 +2335,8 @@ private void remoteTranslogCleanupAsync( Repository remoteTranslogRepository, ShardId shardId, RemoteStorePathStrategy remoteStorePathStrategy, - IndexMetadata prevIndexMetadata + IndexMetadata prevIndexMetadata, + boolean forceClean ) { assert remoteTranslogRepository instanceof BlobStoreRepository; boolean indexMetadataEnabled = RemoteStoreUtils.determineTranslogMetadataEnabled(prevIndexMetadata); @@ -2336,7 +2353,7 @@ private void remoteTranslogCleanupAsync( indexMetadataEnabled ); try { - RemoteFsTimestampAwareTranslog.cleanup(translogTransferManager); + RemoteFsTimestampAwareTranslog.cleanupOfDeletedIndex(translogTransferManager, forceClean); } catch (IOException e) { logger.error("Exception while cleaning up remote translog for shard: " + shardId, e); } diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java index 5307902692e50..37455f6568206 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java @@ -796,7 +796,7 @@ private void updateSnapshotPinnedTimestamp( ) { remoteStorePinnedTimestampService.pinTimestamp( timestampToPin, - snapshot.getRepository() + SNAPSHOT_PINNED_TIMESTAMP_DELIMITER + snapshot.getSnapshotId().getUUID(), + getPinningEntity(snapshot.getRepository(), snapshot.getSnapshotId().getUUID()), new ActionListener() { @Override public void onResponse(Void unused) { @@ -814,6 +814,10 @@ public void onFailure(Exception e) { ); } + public static String getPinningEntity(String repositoryName, String snapshotUUID) { + return repositoryName + SNAPSHOT_PINNED_TIMESTAMP_DELIMITER + snapshotUUID; + } + private void cloneSnapshotPinnedTimestamp( RepositoryData repositoryData, SnapshotId sourceSnapshot, @@ -823,8 +827,8 @@ private void cloneSnapshotPinnedTimestamp( ) { remoteStorePinnedTimestampService.cloneTimestamp( timestampToPin, - snapshot.getRepository() + SNAPSHOT_PINNED_TIMESTAMP_DELIMITER + sourceSnapshot.getUUID(), - snapshot.getRepository() + SNAPSHOT_PINNED_TIMESTAMP_DELIMITER + snapshot.getSnapshotId().getUUID(), + getPinningEntity(snapshot.getRepository(), sourceSnapshot.getUUID()), + getPinningEntity(snapshot.getRepository(), snapshot.getSnapshotId().getUUID()), new ActionListener() { @Override public void onResponse(Void unused) { diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index ecd6620dbea15..df3df81361a12 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -565,7 +565,8 @@ public void testCleanupAsync() throws Exception { repositoryName, indexUUID, shardId, - pathStrategy + pathStrategy, + false ); verify(remoteSegmentStoreDirectoryFactory).newDirectory(repositoryName, indexUUID, shardId, pathStrategy); verify(threadPool, times(0)).executor(ThreadPool.Names.REMOTE_PURGE); diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java index 4ec68f7fb75b4..e6871414cf5e0 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java @@ -214,6 +214,7 @@ public void onFailure(Exception e) { // Old format metadata file String oldFormatMdFilename = "metadata__9223372036438563903__9223372036854774799__9223370311919910393__31__1"; assertNull(TranslogTransferMetadata.getMinMaxTranslogGenerationFromFilename(oldFormatMdFilename)); + assertEquals(Long.MAX_VALUE - 9223372036854774799L, TranslogTransferMetadata.getMaxGenerationFromFileName(oldFormatMdFilename)); // Node id containing separator String nodeIdWithSeparator = @@ -221,10 +222,14 @@ public void onFailure(Exception e) { Tuple minMaxGen = TranslogTransferMetadata.getMinMaxTranslogGenerationFromFilename(nodeIdWithSeparator); Long minGen = Long.MAX_VALUE - 9223372036438563958L; assertEquals(minGen, minMaxGen.v1()); + Long maxGen = Long.MAX_VALUE - 9223372036854774799L; + assertEquals(maxGen, minMaxGen.v2()); + assertEquals(Long.MAX_VALUE - 9223372036854774799L, TranslogTransferMetadata.getMaxGenerationFromFileName(nodeIdWithSeparator)); // Malformed md filename String malformedMdFileName = "metadata__9223372036438563903__9223372036854774799__9223370311919910393__node1__xyz__3__1"; assertNull(TranslogTransferMetadata.getMinMaxTranslogGenerationFromFilename(malformedMdFileName)); + assertEquals(Long.MAX_VALUE - 9223372036854774799L, TranslogTransferMetadata.getMaxGenerationFromFileName(malformedMdFileName)); } public void testGetMinMaxPrimaryTermFromFilename() throws Exception { @@ -330,43 +335,60 @@ public void testSimpleOperationsUpload() throws Exception { addToTranslogAndListAndUpload(translog, ops, new Translog.Index("2", 2, primaryTerm.get(), new byte[] { 1 })); addToTranslogAndListAndUpload(translog, ops, new Translog.Index("3", 3, primaryTerm.get(), new byte[] { 1 })); - addToTranslogAndListAndUpload(translog, ops, new Translog.Index("4", 4, primaryTerm.get(), new byte[] { 1 })); - addToTranslogAndListAndUpload(translog, ops, new Translog.Index("5", 5, primaryTerm.get(), new byte[] { 1 })); - addToTranslogAndListAndUpload(translog, ops, new Translog.Index("6", 6, primaryTerm.get(), new byte[] { 1 })); assertBusy(() -> { assertEquals( - 16, + 10, blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size() ); }); - assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); - RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); // Fetch pinned timestamps so that it won't be stale updatePinnedTimstampTask.run(); + translog.setMinSeqNoToKeep(3); + translog.trimUnreferencedReaders(); - translog.setMinSeqNoToKeep(4); + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("4", 4, primaryTerm.get(), new byte[] { 1 })); + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("5", 5, primaryTerm.get(), new byte[] { 1 })); + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("6", 6, primaryTerm.get(), new byte[] { 1 })); + + assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); + // Fetch pinned timestamps so that it won't be stale + updatePinnedTimstampTask.run(); + translog.setMinSeqNoToKeep(6); translog.trimUnreferencedReaders(); + assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); + + assertEquals(1, translog.readers.size()); + assertBusy(() -> { + assertEquals(2, translog.allUploaded().size()); + assertEquals(4, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()); + assertEquals( + 16, + blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size() + ); + }, 30, TimeUnit.SECONDS); + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("7", 7, primaryTerm.get(), new byte[] { 1 })); addToTranslogAndListAndUpload(translog, ops, new Translog.Index("8", 8, primaryTerm.get(), new byte[] { 1 })); - assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); + assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); // Fetch pinned timestamps so that it won't be stale updatePinnedTimstampTask.run(); translog.trimUnreferencedReaders(); - assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); - assertEquals(5, translog.readers.size()); + + assertEquals(3, translog.readers.size()); assertBusy(() -> { - assertEquals(1, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()); - assertEquals(10, translog.allUploaded().size()); + assertEquals(6, translog.allUploaded().size()); + assertEquals(3, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()); assertEquals( - 10, + 12, blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size() ); - }, 60, TimeUnit.SECONDS); + }, 30, TimeUnit.SECONDS); } @Override @@ -397,7 +419,7 @@ public void testMetadataFileDeletion() throws Exception { ); updatePinnedTimstampTask.run(); translog.trimUnreferencedReaders(); - assertBusy(() -> { assertEquals(1, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()); }); + assertBusy(() -> { assertEquals(3, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()); }); } public void testMetadataFileDeletionWithPinnedTimestamps() throws Exception { @@ -568,7 +590,7 @@ public void testDrainSync() throws Exception { assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); assertEquals(1, translog.readers.size()); assertBusy(() -> assertEquals(2, translog.allUploaded().size())); - assertBusy(() -> assertEquals(1, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size())); + assertBusy(() -> assertEquals(2, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size())); } @Override @@ -647,7 +669,7 @@ public void testGetGenerationsToBeDeletedEmptyMetadataFilesNotToBeDeleted() thro Set generations = ((RemoteFsTimestampAwareTranslog) translog).getGenerationsToBeDeleted( metadataFilesNotToBeDeleted, metadataFilesToBeDeleted, - true + Long.MAX_VALUE ); Set md1Generations = LongStream.rangeClosed(4, 7).boxed().collect(Collectors.toSet()); Set md2Generations = LongStream.rangeClosed(17, 37).boxed().collect(Collectors.toSet()); @@ -683,7 +705,7 @@ public void testGetGenerationsToBeDeleted() throws IOException { Set generations = ((RemoteFsTimestampAwareTranslog) translog).getGenerationsToBeDeleted( metadataFilesNotToBeDeleted, metadataFilesToBeDeleted, - true + Long.MAX_VALUE ); Set md1Generations = LongStream.rangeClosed(5, 7).boxed().collect(Collectors.toSet()); Set md2Generations = LongStream.rangeClosed(17, 25).boxed().collect(Collectors.toSet()); @@ -708,7 +730,10 @@ public void testGetMetadataFilesToBeDeletedNoExclusion() { "metadata__9223372036438563903__9223372036854775701__9223370311919910403__31__9223372036854775701__1" ); - assertEquals(metadataFiles, ((RemoteFsTimestampAwareTranslog) translog).getMetadataFilesToBeDeleted(metadataFiles)); + assertEquals( + metadataFiles, + RemoteFsTimestampAwareTranslog.getMetadataFilesToBeDeleted(metadataFiles, new HashMap<>(), Long.MAX_VALUE, false, logger) + ); } public void testGetMetadataFilesToBeDeletedExclusionBasedOnAgeOnly() { @@ -724,7 +749,13 @@ public void testGetMetadataFilesToBeDeletedExclusionBasedOnAgeOnly() { "metadata__9223372036438563903__9223372036854775701__" + md3Timestamp + "__31__9223372036854775701__1" ); - List metadataFilesToBeDeleted = ((RemoteFsTimestampAwareTranslog) translog).getMetadataFilesToBeDeleted(metadataFiles); + List metadataFilesToBeDeleted = RemoteFsTimestampAwareTranslog.getMetadataFilesToBeDeleted( + metadataFiles, + new HashMap<>(), + Long.MAX_VALUE, + false, + logger + ); assertEquals(1, metadataFilesToBeDeleted.size()); assertEquals(metadataFiles.get(0), metadataFilesToBeDeleted.get(0)); } @@ -746,7 +777,13 @@ public void testGetMetadataFilesToBeDeletedExclusionBasedOnPinningOnly() throws "metadata__9223372036438563903__9223372036854775701__" + md3Timestamp + "__31__9223372036854775701__1" ); - List metadataFilesToBeDeleted = ((RemoteFsTimestampAwareTranslog) translog).getMetadataFilesToBeDeleted(metadataFiles); + List metadataFilesToBeDeleted = RemoteFsTimestampAwareTranslog.getMetadataFilesToBeDeleted( + metadataFiles, + new HashMap<>(), + Long.MAX_VALUE, + false, + logger + ); assertEquals(2, metadataFilesToBeDeleted.size()); assertEquals(metadataFiles.get(0), metadataFilesToBeDeleted.get(0)); assertEquals(metadataFiles.get(2), metadataFilesToBeDeleted.get(1)); @@ -769,11 +806,77 @@ public void testGetMetadataFilesToBeDeletedExclusionBasedOnAgeAndPinning() throw "metadata__9223372036438563903__9223372036854775701__" + md3Timestamp + "__31__9223372036854775701__1" ); - List metadataFilesToBeDeleted = ((RemoteFsTimestampAwareTranslog) translog).getMetadataFilesToBeDeleted(metadataFiles); + List metadataFilesToBeDeleted = RemoteFsTimestampAwareTranslog.getMetadataFilesToBeDeleted( + metadataFiles, + new HashMap<>(), + Long.MAX_VALUE, + false, + logger + ); assertEquals(1, metadataFilesToBeDeleted.size()); assertEquals(metadataFiles.get(2), metadataFilesToBeDeleted.get(0)); } + public void testGetMetadataFilesToBeDeletedExclusionBasedOnGenerationOnly() throws IOException { + long currentTimeInMillis = System.currentTimeMillis(); + String md1Timestamp = RemoteStoreUtils.invertLong(currentTimeInMillis - 200000); + String md2Timestamp = RemoteStoreUtils.invertLong(currentTimeInMillis - 300000); + String md3Timestamp = RemoteStoreUtils.invertLong(currentTimeInMillis - 600000); + + when(blobContainer.listBlobs()).thenReturn(Map.of()); + + updatePinnedTimstampTask.run(); + + List metadataFiles = List.of( + // MaxGen 7 + "metadata__9223372036438563903__9223372036854775800__" + md1Timestamp + "__31__9223372036854775106__1", + // MaxGen 12 + "metadata__9223372036438563903__9223372036854775795__" + md2Timestamp + "__31__9223372036854775803__1", + // MaxGen 10 + "metadata__9223372036438563903__9223372036854775798__" + md3Timestamp + "__31__9223372036854775701__1" + ); + + List metadataFilesToBeDeleted = RemoteFsTimestampAwareTranslog.getMetadataFilesToBeDeleted( + metadataFiles, + new HashMap<>(), + 10L, + false, + logger + ); + assertEquals(2, metadataFilesToBeDeleted.size()); + assertEquals(metadataFiles.get(0), metadataFilesToBeDeleted.get(0)); + assertEquals(metadataFiles.get(2), metadataFilesToBeDeleted.get(1)); + } + + public void testGetMetadataFilesToBeDeletedExclusionBasedOnGenerationDeleteIndex() throws IOException { + long currentTimeInMillis = System.currentTimeMillis(); + String md1Timestamp = RemoteStoreUtils.invertLong(currentTimeInMillis - 200000); + String md2Timestamp = RemoteStoreUtils.invertLong(currentTimeInMillis - 300000); + String md3Timestamp = RemoteStoreUtils.invertLong(currentTimeInMillis - 600000); + + when(blobContainer.listBlobs()).thenReturn(Map.of()); + + updatePinnedTimstampTask.run(); + + List metadataFiles = List.of( + // MaxGen 7 + "metadata__9223372036438563903__9223372036854775800__" + md1Timestamp + "__31__9223372036854775106__1", + // MaxGen 12 + "metadata__9223372036438563903__9223372036854775795__" + md2Timestamp + "__31__9223372036854775803__1", + // MaxGen 17 + "metadata__9223372036438563903__9223372036854775790__" + md3Timestamp + "__31__9223372036854775701__1" + ); + + List metadataFilesToBeDeleted = RemoteFsTimestampAwareTranslog.getMetadataFilesToBeDeleted( + metadataFiles, + new HashMap<>(), + 10L, + true, + logger + ); + assertEquals(metadataFiles, metadataFilesToBeDeleted); + } + public void testIsGenerationPinned() { TreeSet> pinnedGenerations = new TreeSet<>(new TreeSet<>((o1, o2) -> { if (Objects.equals(o1.v1(), o2.v1()) == false) {