diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java new file mode 100644 index 0000000000000..fc51923dd09bd --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java @@ -0,0 +1,102 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore; + +import org.opensearch.action.LatchedActionListener; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.collect.Tuple; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.action.ActionListener; +import org.opensearch.indices.RemoteStoreSettings; +import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.junit.annotations.TestIssueLogging; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.stream.Collectors; + +import static org.opensearch.index.IndexSettings.INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING; +import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS; +import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG; +import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA; +import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class RemoteStorePinnedTimestampsGarbageCollectionIT extends RemoteStoreBaseIntegTestCase { + static final String INDEX_NAME = "remote-store-test-idx-1"; + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true) + .build(); + } + + ActionListener noOpActionListener = new ActionListener<>() { + @Override + public void onResponse(Void unused) {} + + @Override + public void onFailure(Exception e) {} + }; + + private void keepPinnedTimestampSchedulerUpdated() throws InterruptedException { + long currentTime = System.currentTimeMillis(); + int maxRetry = 10; + while(maxRetry > 0 && RemoteStorePinnedTimestampService.getPinnedTimestamps().v1() <= currentTime) { + Thread.sleep(1000); + maxRetry--; + } + } + + public void testLiveIndexNoPinnedTimestamps() throws Exception { + prepareCluster(1, 1, Settings.EMPTY); + Settings indexSettings = Settings.builder() + .put(remoteStoreIndexSettings(0, 1)) + .put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 0) + .build(); + createIndex(INDEX_NAME, indexSettings); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + + RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance( + RemoteStorePinnedTimestampService.class, + primaryNodeName(INDEX_NAME) + ); + + remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1)); + + int numDocs = randomIntBetween(5, 10); + for (int i = 0; i < numDocs; i++) { + keepPinnedTimestampSchedulerUpdated(); + indexSingleDoc(INDEX_NAME, true); + } + + String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings()); + String shardDataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, DATA, translogPathFixedPrefix) + .buildAsString(); + Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath); + String shardMetadataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, METADATA, translogPathFixedPrefix) + .buildAsString(); + Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath); + + assertBusy(() -> { + assertEquals(1, Files.list(translogDataPath).collect(Collectors.toList()).size()); + assertEquals(1, Files.list(translogMetadataPath).collect(Collectors.toList()).size()); + }); + } + +} diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java index 27d34ec0d05af..e0466e64954ee 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java @@ -61,6 +61,7 @@ public class RemoteFsTimestampAwareTranslog extends RemoteFsTranslog { private final Map> oldFormatMetadataFileGenerationMap; private final Map> oldFormatMetadataFilePrimaryTermMap; private final AtomicLong minPrimaryTermInRemote = new AtomicLong(Long.MAX_VALUE); + private long maxDeletedGenerationOnRemote = 0; public RemoteFsTimestampAwareTranslog( TranslogConfig config, @@ -141,7 +142,7 @@ protected void trimUnreferencedReaders(boolean indexDeleted, boolean trimLocal) // This is to fail fast and avoid listing md files un-necessarily. if (indexDeleted == false && RemoteStoreUtils.isPinnedTimestampStateStale()) { - logger.warn("Skipping remote segment store garbage collection as last fetch of pinned timestamp is stale"); + logger.warn("Skipping remote translog garbage collection as last fetch of pinned timestamp is stale"); return; } @@ -152,6 +153,12 @@ protected void trimUnreferencedReaders(boolean indexDeleted, boolean trimLocal) return; } + // This code block ensures parity with RemoteFsTranslog. Without this, we will end up making list translog metadata + // call in each invocation of trimUnreferencedReaders + if (indexDeleted == false && (minRemoteGenReferenced - maxDeletedGenerationOnRemote) < indexSettings().getRemoteTranslogExtraKeep()) { + return; + } + ActionListener> listMetadataFilesListener = new ActionListener<>() { @Override public void onResponse(List blobMetadata) { @@ -166,7 +173,7 @@ public void onResponse(List blobMetadata) { // Check last fetch status of pinned timestamps. If stale, return. if (indexDeleted == false && RemoteStoreUtils.isPinnedTimestampStateStale()) { - logger.warn("Skipping remote segment store garbage collection as last fetch of pinned timestamp is stale"); + logger.warn("Skipping remote translog garbage collection as last fetch of pinned timestamp is stale"); remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS); return; } @@ -196,12 +203,13 @@ public void onResponse(List blobMetadata) { logger.debug(() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted); Set generationsToBeDeleted = getGenerationsToBeDeleted( metadataFilesNotToBeDeleted, - metadataFilesToBeDeleted, - indexDeleted + metadataFilesToBeDeleted ); logger.debug(() -> "generationsToBeDeleted = " + generationsToBeDeleted); if (generationsToBeDeleted.isEmpty() == false) { + maxDeletedGenerationOnRemote = generationsToBeDeleted.stream().max(Long::compareTo).get(); + // Delete stale generations translogTransferManager.deleteGenerationAsync( primaryTermSupplier.getAsLong(), @@ -240,15 +248,8 @@ public void onFailure(Exception e) { // Visible for testing protected Set getGenerationsToBeDeleted( List metadataFilesNotToBeDeleted, - List metadataFilesToBeDeleted, - boolean indexDeleted + List metadataFilesToBeDeleted ) throws IOException { - long maxGenerationToBeDeleted = Long.MAX_VALUE; - - if (indexDeleted == false) { - maxGenerationToBeDeleted = minRemoteGenReferenced - 1 - indexSettings().getRemoteTranslogExtraKeep(); - } - Set generationsFromMetadataFilesToBeDeleted = new HashSet<>(); for (String mdFile : metadataFilesToBeDeleted) { Tuple minMaxGen = getMinMaxTranslogGenerationFromMetadataFile(mdFile, translogTransferManager); @@ -262,7 +263,7 @@ protected Set getGenerationsToBeDeleted( Set generationsToBeDeleted = new HashSet<>(); for (long generation : generationsFromMetadataFilesToBeDeleted) { // Check if the generation is not referred by metadata file matching pinned timestamps - if (generation <= maxGenerationToBeDeleted && isGenerationPinned(generation, pinnedGenerations) == false) { + if (isGenerationPinned(generation, pinnedGenerations) == false) { generationsToBeDeleted.add(generation); } } diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java index 71133615ed056..acb6afe71b067 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java @@ -278,6 +278,7 @@ protected void runInternal() { try { Map pinnedTimestampList = blobContainer.listBlobs(); if (pinnedTimestampList.isEmpty()) { + logger.debug("Fetched empty pinned timestamps from remote store: {}", triggerTimestamp); pinnedTimestampsSet = new Tuple<>(triggerTimestamp, Set.of()); return; } diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java index 4ec68f7fb75b4..a33f85cc6f318 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java @@ -646,8 +646,7 @@ public void testGetGenerationsToBeDeletedEmptyMetadataFilesNotToBeDeleted() thro ); Set generations = ((RemoteFsTimestampAwareTranslog) translog).getGenerationsToBeDeleted( metadataFilesNotToBeDeleted, - metadataFilesToBeDeleted, - true + metadataFilesToBeDeleted ); Set md1Generations = LongStream.rangeClosed(4, 7).boxed().collect(Collectors.toSet()); Set md2Generations = LongStream.rangeClosed(17, 37).boxed().collect(Collectors.toSet()); @@ -682,8 +681,7 @@ public void testGetGenerationsToBeDeleted() throws IOException { ); Set generations = ((RemoteFsTimestampAwareTranslog) translog).getGenerationsToBeDeleted( metadataFilesNotToBeDeleted, - metadataFilesToBeDeleted, - true + metadataFilesToBeDeleted ); Set md1Generations = LongStream.rangeClosed(5, 7).boxed().collect(Collectors.toSet()); Set md2Generations = LongStream.rangeClosed(17, 25).boxed().collect(Collectors.toSet());