diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java index 78f365d34b914..718c966619247 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java @@ -42,13 +42,14 @@ protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder() .put(super.nodeSettings(nodeOrdinal)) .put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true) + .put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA.getKey(), false) .build(); } private void keepPinnedTimestampSchedulerUpdated() throws InterruptedException { long currentTime = System.currentTimeMillis(); int maxRetry = 10; - while(maxRetry > 0 && RemoteStorePinnedTimestampService.getPinnedTimestamps().v1() <= currentTime) { + while (maxRetry > 0 && RemoteStorePinnedTimestampService.getPinnedTimestamps().v1() <= currentTime) { Thread.sleep(1000); maxRetry--; } @@ -88,11 +89,25 @@ public void testLiveIndexNoPinnedTimestamps() throws Exception { } String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings()); - String shardDataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, DATA, translogPathFixedPrefix) - .buildAsString(); + String shardDataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + DATA, + translogPathFixedPrefix + ).buildAsString(); Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1"); - String shardMetadataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, METADATA, translogPathFixedPrefix) - .buildAsString(); + String shardMetadataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + METADATA, + translogPathFixedPrefix + ).buildAsString(); Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath); assertBusy(() -> { @@ -122,18 +137,32 @@ public void testLiveIndexNoPinnedTimestampsWithExtraGenSettingWithinLimit() thro remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1)); - int numDocs = randomIntBetween(5, 10); + int numDocs = randomIntBetween(5, 9); for (int i = 0; i < numDocs; i++) { keepPinnedTimestampSchedulerUpdated(); indexSingleDoc(INDEX_NAME, true); } String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings()); - String shardDataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, DATA, translogPathFixedPrefix) - .buildAsString(); + String shardDataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + DATA, + translogPathFixedPrefix + ).buildAsString(); Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1"); - String shardMetadataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, METADATA, translogPathFixedPrefix) - .buildAsString(); + String shardMetadataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + METADATA, + translogPathFixedPrefix + ).buildAsString(); Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath); assertBusy(() -> { @@ -148,7 +177,7 @@ public void testLiveIndexNoPinnedTimestampsWithExtraGenSetting() throws Exceptio prepareCluster(1, 1, Settings.EMPTY); Settings indexSettings = Settings.builder() .put(remoteStoreIndexSettings(0, 1)) - .put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 4) + .put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 3) .build(); createIndex(INDEX_NAME, indexSettings); ensureYellowAndNoInitializingShards(INDEX_NAME); @@ -170,11 +199,25 @@ public void testLiveIndexNoPinnedTimestampsWithExtraGenSetting() throws Exceptio } String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings()); - String shardDataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, DATA, translogPathFixedPrefix) - .buildAsString(); + String shardDataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + DATA, + translogPathFixedPrefix + ).buildAsString(); Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1"); - String shardMetadataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, METADATA, translogPathFixedPrefix) - .buildAsString(); + String shardMetadataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + METADATA, + translogPathFixedPrefix + ).buildAsString(); Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath); assertBusy(() -> { @@ -185,7 +228,7 @@ public void testLiveIndexNoPinnedTimestampsWithExtraGenSetting() throws Exceptio }); } - public void testIndexDeletionNoPinnedTimestamps() throws Exception { + public void testLiveIndexWithPinnedTimestamps() throws Exception { prepareCluster(1, 1, Settings.EMPTY); Settings indexSettings = Settings.builder() .put(remoteStoreIndexSettings(0, 1)) @@ -208,33 +251,44 @@ public void testIndexDeletionNoPinnedTimestamps() throws Exception { for (int i = 0; i < numDocs; i++) { keepPinnedTimestampSchedulerUpdated(); indexSingleDoc(INDEX_NAME, true); + if (i == 2) { + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.timeValueMinutes(1)); + remoteStorePinnedTimestampService.pinTimestamp(System.currentTimeMillis(), "xyz", noOpActionListener); + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + } } String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings()); - String shardDataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, DATA, translogPathFixedPrefix) - .buildAsString(); + String shardDataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + DATA, + translogPathFixedPrefix + ).buildAsString(); Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1"); - String shardMetadataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, METADATA, translogPathFixedPrefix) - .buildAsString(); + String shardMetadataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + METADATA, + translogPathFixedPrefix + ).buildAsString(); Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath); assertBusy(() -> { List metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList()); - assertEquals(1, metadataFiles.size()); + assertEquals(2, metadataFiles.size()); verifyTranslogDataFileCount(metadataFiles, translogDataPath); }); - - keepPinnedTimestampSchedulerUpdated(); - client().admin().indices().prepareDelete(INDEX_NAME).get(); - - assertBusy(() -> { - assertEquals(0, Files.list(translogMetadataPath).collect(Collectors.toList()).size()); - assertEquals(0, Files.list(translogDataPath).collect(Collectors.toList()).size()); - }, 30, TimeUnit.SECONDS); } - public void testLiveIndexWithPinnedTimestamps() throws Exception { + public void testIndexDeletionNoPinnedTimestamps() throws Exception { prepareCluster(1, 1, Settings.EMPTY); Settings indexSettings = Settings.builder() .put(remoteStoreIndexSettings(0, 1)) @@ -257,24 +311,43 @@ public void testLiveIndexWithPinnedTimestamps() throws Exception { for (int i = 0; i < numDocs; i++) { keepPinnedTimestampSchedulerUpdated(); indexSingleDoc(INDEX_NAME, true); - if (i == 2) { - RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.timeValueMinutes(1)); - remoteStorePinnedTimestampService.pinTimestamp(System.currentTimeMillis(), "xyz", noOpActionListener); - RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); - } } String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings()); - String shardDataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, DATA, translogPathFixedPrefix) - .buildAsString(); + String shardDataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + DATA, + translogPathFixedPrefix + ).buildAsString(); Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1"); - String shardMetadataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, METADATA, translogPathFixedPrefix) - .buildAsString(); + String shardMetadataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + METADATA, + translogPathFixedPrefix + ).buildAsString(); Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath); assertBusy(() -> { List metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList()); - assertEquals(2, metadataFiles.size()); + assertEquals(1, metadataFiles.size()); + + verifyTranslogDataFileCount(metadataFiles, translogDataPath); + }); + + keepPinnedTimestampSchedulerUpdated(); + client().admin().indices().prepareDelete(INDEX_NAME).get(); + + assertBusy(() -> { + List metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList()); + assertEquals(1, metadataFiles.size()); verifyTranslogDataFileCount(metadataFiles, translogDataPath); }); @@ -311,11 +384,25 @@ public void testIndexDeletionWithPinnedTimestamps() throws Exception { } String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings()); - String shardDataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, DATA, translogPathFixedPrefix) - .buildAsString(); + String shardDataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + DATA, + translogPathFixedPrefix + ).buildAsString(); Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1"); - String shardMetadataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, METADATA, translogPathFixedPrefix) - .buildAsString(); + String shardMetadataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + METADATA, + translogPathFixedPrefix + ).buildAsString(); Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath); assertBusy(() -> { @@ -323,14 +410,14 @@ public void testIndexDeletionWithPinnedTimestamps() throws Exception { assertEquals(2, metadataFiles.size()); verifyTranslogDataFileCount(metadataFiles, translogDataPath); - }); + }, 30, TimeUnit.SECONDS); keepPinnedTimestampSchedulerUpdated(); client().admin().indices().prepareDelete(INDEX_NAME).get(); assertBusy(() -> { List metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList()); - assertEquals(1, metadataFiles.size()); + assertEquals(2, metadataFiles.size()); verifyTranslogDataFileCount(metadataFiles, translogDataPath); }); @@ -339,11 +426,10 @@ public void testIndexDeletionWithPinnedTimestamps() throws Exception { private void verifyTranslogDataFileCount(List metadataFiles, Path translogDataPath) throws IOException { List mdFiles = metadataFiles.stream().map(p -> p.getFileName().toString()).collect(Collectors.toList()); Set generations = new HashSet<>(); - for(String mdFile : mdFiles) { + for (String mdFile : mdFiles) { Tuple minMaxGen = TranslogTransferMetadata.getMinMaxTranslogGenerationFromFilename(mdFile); generations.addAll(LongStream.rangeClosed(minMaxGen.v1(), minMaxGen.v2()).boxed().collect(Collectors.toList())); } assertEquals(generations.size() * 2, Files.list(translogDataPath).collect(Collectors.toList()).size()); } - } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java index 191ff592c02ed..98b821126502b 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java @@ -148,7 +148,8 @@ protected void trimUnreferencedReaders(boolean indexDeleted, boolean trimLocal) // This code block ensures parity with RemoteFsTranslog. Without this, we will end up making list translog metadata // call in each invocation of trimUnreferencedReaders - if (indexDeleted == false && (minRemoteGenReferenced - maxDeletedGenerationOnRemote) < indexSettings().getRemoteTranslogExtraKeep()) { + long minGenerationToKeep = minRemoteGenReferenced - indexSettings().getRemoteTranslogExtraKeep(); + if (indexDeleted == false && (minGenerationToKeep <= maxDeletedGenerationOnRemote)) { return; } @@ -178,11 +179,7 @@ public void onResponse(List blobMetadata) { return; } - List metadataFilesToBeDeleted = getMetadataFilesToBeDeleted( - metadataFiles, - metadataFilePinnedTimestampMap, - logger - ); + List metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(metadataFiles); // If index is not deleted, make sure to keep latest metadata file if (indexDeleted == false) { @@ -202,10 +199,7 @@ public void onResponse(List blobMetadata) { logger.debug(() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted); - Set generationsToBeDeleted = getGenerationsToBeDeleted( - metadataFilesNotToBeDeleted, - metadataFilesToBeDeleted - ); + Set generationsToBeDeleted = getGenerationsToBeDeleted(metadataFilesNotToBeDeleted, metadataFilesToBeDeleted); logger.debug(() -> "generationsToBeDeleted = " + generationsToBeDeleted); if (generationsToBeDeleted.isEmpty() == false) { @@ -247,10 +241,8 @@ public void onFailure(Exception e) { } // Visible for testing - protected Set getGenerationsToBeDeleted( - List metadataFilesNotToBeDeleted, - List metadataFilesToBeDeleted - ) throws IOException { + protected Set getGenerationsToBeDeleted(List metadataFilesNotToBeDeleted, List metadataFilesToBeDeleted) + throws IOException { Set generationsFromMetadataFilesToBeDeleted = new HashSet<>(); for (String mdFile : metadataFilesToBeDeleted) { Tuple minMaxGen = getMinMaxTranslogGenerationFromMetadataFile(mdFile, translogTransferManager); @@ -264,7 +256,9 @@ protected Set getGenerationsToBeDeleted( Set generationsToBeDeleted = new HashSet<>(); for (long generation : generationsFromMetadataFilesToBeDeleted) { // Check if the generation is not referred by metadata file matching pinned timestamps - if (isGenerationPinned(generation, pinnedGenerations) == false) { + // The check with minRemoteGenReferenced is redundant but kept as to make sure we don't delete generations + // that are not persisted in remote segment store yet. + if (generation < minRemoteGenReferenced && isGenerationPinned(generation, pinnedGenerations) == false) { generationsToBeDeleted.add(generation); } } @@ -272,13 +266,14 @@ protected Set getGenerationsToBeDeleted( } protected List getMetadataFilesToBeDeleted(List metadataFiles) { - return getMetadataFilesToBeDeleted(metadataFiles, metadataFilePinnedTimestampMap, logger); + return getMetadataFilesToBeDeleted(metadataFiles, metadataFilePinnedTimestampMap, minRemoteGenReferenced, logger); } // Visible for testing protected static List getMetadataFilesToBeDeleted( List metadataFiles, Map metadataFilePinnedTimestampMap, + long minRemoteGenReferenced, Logger logger ) { Tuple> pinnedTimestampsState = RemoteStorePinnedTimestampService.getPinnedTimestamps(); @@ -314,6 +309,20 @@ protected static List getMetadataFilesToBeDeleted( metadataFilesToBeDeleted.size() ); + // Filter out metadata files based on minRemoteGenReferenced + List metadataFilesContainingMinRemoteGenReferenced = metadataFilesToBeDeleted.stream().filter(md -> { + long maxGeneration = TranslogTransferMetadata.getMaxGenerationFromFileName(md); + return maxGeneration == -1 || maxGeneration > minRemoteGenReferenced; + }).collect(Collectors.toList()); + metadataFilesToBeDeleted.removeAll(metadataFilesContainingMinRemoteGenReferenced); + + logger.trace( + "metadataFilesContainingMinRemoteGenReferenced.size = {}, metadataFilesToBeDeleted based on minRemoteGenReferenced filtering = {}, minRemoteGenReferenced = {}", + metadataFilesContainingMinRemoteGenReferenced.size(), + metadataFilesToBeDeleted.size(), + minRemoteGenReferenced + ); + return metadataFilesToBeDeleted; } @@ -485,7 +494,12 @@ public void onResponse(List blobMetadata) { staticLogger.debug("No stale translog metadata files found"); return; } - List metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(metadataFiles, new HashMap<>(), staticLogger); + List metadataFilesToBeDeleted = getMetadataFilesToBeDeleted( + metadataFiles, + new HashMap<>(), + Long.MAX_VALUE, + staticLogger + ); if (metadataFilesToBeDeleted.isEmpty()) { staticLogger.debug("No metadata files to delete"); return; diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java index 3b8885055e8f7..7fe3305545085 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java @@ -170,6 +170,16 @@ public static Tuple getMinMaxTranslogGenerationFromFilename(String f } } + public static long getMaxGenerationFromFileName(String filename) { + String[] tokens = filename.split(METADATA_SEPARATOR); + try { + return RemoteStoreUtils.invertLong(tokens[2]); + } catch (Exception e) { + logger.error(() -> new ParameterizedMessage("Exception while getting max generation from: {}", filename), e); + return -1; + } + } + public static Tuple getMinMaxPrimaryTermFromFilename(String filename) { String[] tokens = filename.split(METADATA_SEPARATOR); if (tokens.length < 7) {