From b92e53f0e74ccfc9c95750f922329a3124dce8fd Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Thu, 5 Sep 2024 00:55:14 +0530 Subject: [PATCH] Integrate translog cleanup with snapshot deletion and fix primary term deletion logic (#15657) --------- Signed-off-by: Sachin Kale Co-authored-by: Sachin Kale Signed-off-by: Sachin Kale --- ...pshotITV2.java => DeleteSnapshotV2IT.java} | 11 +- .../store/RemoteSegmentStoreDirectory.java | 2 +- .../RemoteSegmentStoreDirectoryFactory.java | 4 + .../RemoteFsTimestampAwareTranslog.java | 172 ++++++++++++++++-- .../transfer/TranslogTransferManager.java | 8 + .../transfer/TranslogTransferMetadata.java | 50 ++++- .../blobstore/BlobStoreRepository.java | 46 ++++- .../index/remote/RemoteStoreUtilsTests.java | 4 +- .../RemoteSegmentStoreDirectoryTests.java | 8 +- .../RemoteFsTimestampAwareTranslogTests.java | 160 ++++++++++++++-- .../TranslogTransferManagerTests.java | 2 +- 11 files changed, 422 insertions(+), 45 deletions(-) rename server/src/internalClusterTest/java/org/opensearch/snapshots/{DeleteSnapshotITV2.java => DeleteSnapshotV2IT.java} (96%) diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotITV2.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotV2IT.java similarity index 96% rename from server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotITV2.java rename to server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotV2IT.java index 381093b6e6238..1d7a58384c0be 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotITV2.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotV2IT.java @@ -32,7 +32,7 @@ import static org.hamcrest.Matchers.lessThan; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) -public class DeleteSnapshotITV2 extends AbstractSnapshotIntegTestCase { +public class DeleteSnapshotV2IT extends AbstractSnapshotIntegTestCase { private static final String REMOTE_REPO_NAME = "remote-store-repo-name"; @@ -277,9 +277,11 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exceptio Path indexPath = Path.of(String.valueOf(remoteStoreRepoPath), indexUUID); Path shardPath = Path.of(String.valueOf(indexPath), "0"); Path segmentsPath = Path.of(String.valueOf(shardPath), "segments"); + Path translogPath = Path.of(String.valueOf(shardPath), "translog"); // Get total segments remote store directory file count for deleted index and shard 0 int segmentFilesCountBeforeDeletingSnapshot1 = RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath); + int translogFilesCountBeforeDeletingSnapshot1 = RemoteStoreBaseIntegTestCase.getFileCount(translogPath); RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); @@ -313,6 +315,13 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exceptio assertThat(RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath), lessThan(segmentFilesCountAfterDeletingSnapshot1)); } catch (Exception e) {} }, 60, TimeUnit.SECONDS); + + assertBusy(() -> { + try { + assertThat(RemoteStoreBaseIntegTestCase.getFileCount(translogPath), lessThan(translogFilesCountBeforeDeletingSnapshot1)); + } catch (Exception e) {} + }, 60, TimeUnit.SECONDS); + } private Settings snapshotV2Settings(Path remoteStoreRepoPath) { diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index 5e5686ab3a419..576ac9a26acc7 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -412,7 +412,7 @@ static long getGeneration(String[] filenameTokens) { public static long getTimestamp(String filename) { String[] filenameTokens = filename.split(SEPARATOR); - return RemoteStoreUtils.invertLong(filenameTokens[6]); + return RemoteStoreUtils.invertLong(filenameTokens[filenameTokens.length - 2]); } public static Tuple getNodeIdByPrimaryTermAndGen(String filename) { diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java index 85b3f25d950ef..32d886562f22d 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java @@ -121,4 +121,8 @@ public Directory newDirectory(String repositoryName, String indexUUID, ShardId s } } + public Supplier getRepositoriesService() { + return this.repositoriesService; + } + } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java index 0b134b3bddbec..27d34ec0d05af 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java @@ -8,6 +8,7 @@ package org.opensearch.index.translog; +import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.blobstore.BlobMetadata; @@ -33,6 +34,7 @@ import java.util.Optional; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.BooleanSupplier; import java.util.function.LongConsumer; import java.util.function.LongSupplier; @@ -52,10 +54,13 @@ */ public class RemoteFsTimestampAwareTranslog extends RemoteFsTranslog { + private static Logger staticLogger = LogManager.getLogger(RemoteFsTimestampAwareTranslog.class); private final Logger logger; private final Map metadataFilePinnedTimestampMap; // For metadata files, with no min generation in the name, we cache generation data to avoid multiple reads. private final Map> oldFormatMetadataFileGenerationMap; + private final Map> oldFormatMetadataFilePrimaryTermMap; + private final AtomicLong minPrimaryTermInRemote = new AtomicLong(Long.MAX_VALUE); public RemoteFsTimestampAwareTranslog( TranslogConfig config, @@ -86,6 +91,7 @@ public RemoteFsTimestampAwareTranslog( logger = Loggers.getLogger(getClass(), shardId); this.metadataFilePinnedTimestampMap = new HashMap<>(); this.oldFormatMetadataFileGenerationMap = new HashMap<>(); + this.oldFormatMetadataFilePrimaryTermMap = new HashMap<>(); } @Override @@ -165,7 +171,11 @@ public void onResponse(List blobMetadata) { return; } - List metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(metadataFiles); + List metadataFilesToBeDeleted = getMetadataFilesToBeDeleted( + metadataFiles, + metadataFilePinnedTimestampMap, + logger + ); // If index is not deleted, make sure to keep latest metadata file if (indexDeleted == false) { @@ -209,7 +219,7 @@ public void onResponse(List blobMetadata) { oldFormatMetadataFileGenerationMap.keySet().retainAll(metadataFilesNotToBeDeleted); // Delete stale primary terms - deleteStaleRemotePrimaryTerms(metadataFiles); + deleteStaleRemotePrimaryTerms(metadataFilesNotToBeDeleted); } else { remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS); } @@ -259,8 +269,16 @@ protected Set getGenerationsToBeDeleted( return generationsToBeDeleted; } - // Visible for testing protected List getMetadataFilesToBeDeleted(List metadataFiles) { + return getMetadataFilesToBeDeleted(metadataFiles, metadataFilePinnedTimestampMap, logger); + } + + // Visible for testing + protected static List getMetadataFilesToBeDeleted( + List metadataFiles, + Map metadataFilePinnedTimestampMap, + Logger logger + ) { Tuple> pinnedTimestampsState = RemoteStorePinnedTimestampService.getPinnedTimestamps(); // Keep files since last successful run of scheduler @@ -351,27 +369,153 @@ protected Tuple getMinMaxTranslogGenerationFromMetadataFile( } } + private void deleteStaleRemotePrimaryTerms(List metadataFiles) { + deleteStaleRemotePrimaryTerms( + metadataFiles, + translogTransferManager, + oldFormatMetadataFilePrimaryTermMap, + minPrimaryTermInRemote, + logger + ); + } + /** * This method must be called only after there are valid generations to delete in trimUnreferencedReaders as it ensures * implicitly that minimum primary term in latest translog metadata in remote store is the current primary term. *
* This will also delete all stale translog metadata files from remote except the latest basis the metadata file comparator. */ - private void deleteStaleRemotePrimaryTerms(List metadataFiles) { + protected static void deleteStaleRemotePrimaryTerms( + List metadataFiles, + TranslogTransferManager translogTransferManager, + Map> oldFormatMetadataFilePrimaryTermMap, + AtomicLong minPrimaryTermInRemoteAtomicLong, + Logger logger + ) { // The deletion of older translog files in remote store is on best-effort basis, there is a possibility that there // are older files that are no longer needed and should be cleaned up. In here, we delete all files that are part // of older primary term. - if (olderPrimaryCleaned.trySet(Boolean.TRUE)) { - if (metadataFiles.isEmpty()) { - logger.trace("No metadata is uploaded yet, returning from deleteStaleRemotePrimaryTerms"); - return; + if (metadataFiles.isEmpty()) { + logger.trace("No metadata is uploaded yet, returning from deleteStaleRemotePrimaryTerms"); + return; + } + Optional minPrimaryTermFromMetadataFiles = metadataFiles.stream().map(file -> { + try { + return getMinMaxPrimaryTermFromMetadataFile(file, translogTransferManager, oldFormatMetadataFilePrimaryTermMap).v1(); + } catch (IOException e) { + return Long.MAX_VALUE; + } + }).min(Long::compareTo); + // First we delete all stale primary terms folders from remote store + Long minPrimaryTermInRemote = getMinPrimaryTermInRemote(minPrimaryTermInRemoteAtomicLong, translogTransferManager, logger); + if (minPrimaryTermFromMetadataFiles.get() > minPrimaryTermInRemote) { + translogTransferManager.deletePrimaryTermsAsync(minPrimaryTermFromMetadataFiles.get()); + minPrimaryTermInRemoteAtomicLong.set(minPrimaryTermFromMetadataFiles.get()); + } else { + logger.debug( + "Skipping primary term cleanup. minimumReferencedPrimaryTerm = {}, minPrimaryTermInRemote = {}", + minPrimaryTermFromMetadataFiles.get(), + minPrimaryTermInRemote + ); + } + } + + private static Long getMinPrimaryTermInRemote( + AtomicLong minPrimaryTermInRemote, + TranslogTransferManager translogTransferManager, + Logger logger + ) { + if (minPrimaryTermInRemote.get() == Long.MAX_VALUE) { + try { + Set primaryTermsInRemote = translogTransferManager.listPrimaryTermsInRemote(); + if (primaryTermsInRemote.isEmpty() == false) { + Optional minPrimaryTerm = primaryTermsInRemote.stream().min(Long::compareTo); + minPrimaryTerm.ifPresent(minPrimaryTermInRemote::set); + } + } catch (IOException e) { + logger.error("Exception while listing primary terms in remote translog", e); + } + } + return minPrimaryTermInRemote.get(); + } + + protected static Tuple getMinMaxPrimaryTermFromMetadataFile( + String metadataFile, + TranslogTransferManager translogTransferManager, + Map> oldFormatMetadataFilePrimaryTermMap + ) throws IOException { + Tuple minMaxPrimaryTermFromFileName = TranslogTransferMetadata.getMinMaxPrimaryTermFromFilename(metadataFile); + if (minMaxPrimaryTermFromFileName != null) { + return minMaxPrimaryTermFromFileName; + } else { + if (oldFormatMetadataFilePrimaryTermMap.containsKey(metadataFile)) { + return oldFormatMetadataFilePrimaryTermMap.get(metadataFile); + } else { + TranslogTransferMetadata metadata = translogTransferManager.readMetadata(metadataFile); + long maxPrimaryTem = TranslogTransferMetadata.getPrimaryTermFromFileName(metadataFile); + long minPrimaryTem = -1; + if (metadata.getGenerationToPrimaryTermMapper() != null + && metadata.getGenerationToPrimaryTermMapper().values().isEmpty() == false) { + Optional primaryTerm = metadata.getGenerationToPrimaryTermMapper() + .values() + .stream() + .map(s -> Long.parseLong(s)) + .min(Long::compareTo); + if (primaryTerm.isPresent()) { + minPrimaryTem = primaryTerm.get(); + } + } + Tuple minMaxPrimaryTermTuple = new Tuple<>(minPrimaryTem, maxPrimaryTem); + oldFormatMetadataFilePrimaryTermMap.put(metadataFile, minMaxPrimaryTermTuple); + return minMaxPrimaryTermTuple; } - Optional minPrimaryTerm = metadataFiles.stream() - .map(file -> RemoteStoreUtils.invertLong(file.split(METADATA_SEPARATOR)[1])) - .min(Long::compareTo); - // First we delete all stale primary terms folders from remote store - long minimumReferencedPrimaryTerm = minPrimaryTerm.get() - 1; - translogTransferManager.deletePrimaryTermsAsync(minimumReferencedPrimaryTerm); } } + + public static void cleanup(TranslogTransferManager translogTransferManager) throws IOException { + ActionListener> listMetadataFilesListener = new ActionListener<>() { + @Override + public void onResponse(List blobMetadata) { + List metadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList()); + + try { + if (metadataFiles.isEmpty()) { + staticLogger.debug("No stale translog metadata files found"); + return; + } + List metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(metadataFiles, new HashMap<>(), staticLogger); + if (metadataFilesToBeDeleted.isEmpty()) { + staticLogger.debug("No metadata files to delete"); + return; + } + staticLogger.debug(() -> "metadataFilesToBeDeleted = " + metadataFilesToBeDeleted); + + // For all the files that we are keeping, fetch min and max generations + List metadataFilesNotToBeDeleted = new ArrayList<>(metadataFiles); + metadataFilesNotToBeDeleted.removeAll(metadataFilesToBeDeleted); + staticLogger.debug(() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted); + + // Delete stale metadata files + translogTransferManager.deleteMetadataFilesAsync(metadataFilesToBeDeleted, () -> {}); + + // Delete stale primary terms + deleteStaleRemotePrimaryTerms( + metadataFilesNotToBeDeleted, + translogTransferManager, + new HashMap<>(), + new AtomicLong(Long.MAX_VALUE), + staticLogger + ); + } catch (Exception e) { + staticLogger.error("Exception while cleaning up metadata and primary terms", e); + } + } + + @Override + public void onFailure(Exception e) { + staticLogger.error("Exception while cleaning up metadata and primary terms", e); + } + }; + translogTransferManager.listTranslogMetadataFilesAsync(listMetadataFilesListener); + } } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 56a9aa6447dec..291218ea47499 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -545,6 +545,14 @@ public void onFailure(Exception e) { }); } + public Set listPrimaryTermsInRemote() throws IOException { + Set primaryTermsStr = transferService.listFolders(remoteDataTransferPath); + if (primaryTermsStr != null) { + return primaryTermsStr.stream().map(Long::parseLong).collect(Collectors.toSet()); + } + return new HashSet<>(); + } + /** * Handles deletion of all translog files associated with a primary term. * diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java index 745fa9a8a219a..3b8885055e8f7 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java @@ -19,6 +19,7 @@ import java.util.Arrays; import java.util.Map; import java.util.Objects; +import java.util.Optional; /** * The metadata associated with every transfer {@link TransferSnapshot}. The metadata is uploaded at the end of the @@ -108,11 +109,28 @@ public String getFileName() { RemoteStoreUtils.invertLong(createdAt), String.valueOf(Objects.hash(nodeId)), RemoteStoreUtils.invertLong(minTranslogGeneration), + String.valueOf(getMinPrimaryTermReferred()), String.valueOf(CURRENT_VERSION) ) ); } + private long getMinPrimaryTermReferred() { + if (generationToPrimaryTermMapper.get() == null || generationToPrimaryTermMapper.get().values().isEmpty()) { + return -1; + } + Optional minPrimaryTerm = generationToPrimaryTermMapper.get() + .values() + .stream() + .map(s -> Long.parseLong(s)) + .min(Long::compareTo); + if (minPrimaryTerm.isPresent()) { + return minPrimaryTerm.get(); + } else { + return -1; + } + } + public static Tuple, String> getNodeIdByPrimaryTermAndGeneration(String filename) { String[] tokens = filename.split(METADATA_SEPARATOR); if (tokens.length < 6) { @@ -143,15 +161,43 @@ public static Tuple getMinMaxTranslogGenerationFromFilename(String f assert Version.CURRENT.onOrAfter(Version.V_2_17_0); try { // instead of direct index, we go backwards to avoid running into same separator in nodeId - String minGeneration = tokens[tokens.length - 2]; + String minGeneration = tokens[tokens.length - 3]; String maxGeneration = tokens[2]; return new Tuple<>(RemoteStoreUtils.invertLong(minGeneration), RemoteStoreUtils.invertLong(maxGeneration)); - } catch (NumberFormatException e) { + } catch (Exception e) { logger.error(() -> new ParameterizedMessage("Exception while getting min and max translog generation from: {}", filename), e); return null; } } + public static Tuple getMinMaxPrimaryTermFromFilename(String filename) { + String[] tokens = filename.split(METADATA_SEPARATOR); + if (tokens.length < 7) { + // For versions < 2.17, we don't have min primary term. + return null; + } + assert Version.CURRENT.onOrAfter(Version.V_2_17_0); + try { + // instead of direct index, we go backwards to avoid running into same separator in nodeId + String minPrimaryTerm = tokens[tokens.length - 2]; + String maxPrimaryTerm = tokens[1]; + return new Tuple<>(Long.parseLong(minPrimaryTerm), RemoteStoreUtils.invertLong(maxPrimaryTerm)); + } catch (Exception e) { + logger.error(() -> new ParameterizedMessage("Exception while getting min and max primary term from: {}", filename), e); + return null; + } + } + + public static long getPrimaryTermFromFileName(String filename) { + String[] tokens = filename.split(METADATA_SEPARATOR); + try { + return RemoteStoreUtils.invertLong(tokens[1]); + } catch (Exception e) { + logger.error(() -> new ParameterizedMessage("Exception while getting max primary term from: {}", filename), e); + return -1; + } + } + @Override public int hashCode() { return Objects.hash(primaryTerm, generation); diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index d07c84be37d41..a014eec89fd94 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -116,6 +116,7 @@ import org.opensearch.index.remote.RemoteStorePathStrategy.BasePathInput; import org.opensearch.index.remote.RemoteStorePathStrategy.SnapshotShardPathInput; import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.index.remote.RemoteTranslogTransferTracker; import org.opensearch.index.snapshots.IndexShardRestoreFailedException; import org.opensearch.index.snapshots.IndexShardSnapshotStatus; import org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; @@ -132,6 +133,10 @@ import org.opensearch.index.store.lockmanager.FileLockInfo; import org.opensearch.index.store.lockmanager.RemoteStoreLockManager; import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory; +import org.opensearch.index.translog.RemoteFsTimestampAwareTranslog; +import org.opensearch.index.translog.RemoteFsTranslog; +import org.opensearch.index.translog.transfer.FileTransferTracker; +import org.opensearch.index.translog.transfer.TranslogTransferManager; import org.opensearch.indices.RemoteStoreSettings; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryState; @@ -2217,25 +2222,35 @@ private void cleanRemoteStoreDirectoryIfNeeded( } IndexMetadata prevIndexMetadata = this.getSnapshotIndexMetaData(oldRepoData, snapshotId, indexId); if (prevIndexMetadata != null && !isIndexPresent(clusterService, prevIndexMetadata.getIndexUUID())) { - String remoteStoreRepository = IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING.get( + String remoteStoreRepository = IndexMetadata.INDEX_REMOTE_SEGMENT_STORE_REPOSITORY_SETTING.get( prevIndexMetadata.getSettings() ); assert (remoteStoreRepository != null); + String remoteTranslogRepositoryName = IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING.get( + prevIndexMetadata.getSettings() + ); + assert (remoteTranslogRepositoryName != null); + Repository remoteTranslogRepository = remoteSegmentStoreDirectoryFactory.getRepositoriesService() + .get() + .repository(remoteTranslogRepositoryName); + RemoteStorePathStrategy remoteStorePathStrategy = RemoteStoreUtils.determineRemoteStorePathStrategy( prevIndexMetadata ); for (int shardId = 0; shardId < prevIndexMetadata.getNumberOfShards(); shardId++) { + ShardId shard = new ShardId(Index.UNKNOWN_INDEX_NAME, prevIndexMetadata.getIndexUUID(), shardId); remoteDirectoryCleanupAsync( remoteSegmentStoreDirectoryFactory, threadPool, remoteStoreRepository, prevIndexMetadata.getIndexUUID(), - new ShardId(Index.UNKNOWN_INDEX_NAME, prevIndexMetadata.getIndexUUID(), shardId), + shard, ThreadPool.Names.REMOTE_PURGE, remoteStorePathStrategy ); + remoteTranslogCleanupAsync(remoteTranslogRepository, shard, remoteStorePathStrategy, prevIndexMetadata); } } } catch (Exception e) { @@ -2255,6 +2270,33 @@ private void cleanRemoteStoreDirectoryIfNeeded( } + private void remoteTranslogCleanupAsync( + Repository remoteTranslogRepository, + ShardId shardId, + RemoteStorePathStrategy remoteStorePathStrategy, + IndexMetadata prevIndexMetadata + ) { + assert remoteTranslogRepository instanceof BlobStoreRepository; + boolean indexMetadataEnabled = RemoteStoreUtils.determineTranslogMetadataEnabled(prevIndexMetadata); + RemoteTranslogTransferTracker remoteTranslogTransferTracker = new RemoteTranslogTransferTracker(shardId, 1000); + FileTransferTracker fileTransferTracker = new FileTransferTracker(shardId, remoteTranslogTransferTracker); + TranslogTransferManager translogTransferManager = RemoteFsTranslog.buildTranslogTransferManager( + (BlobStoreRepository) remoteTranslogRepository, + threadPool, + shardId, + fileTransferTracker, + remoteTranslogTransferTracker, + remoteStorePathStrategy, + remoteStoreSettings, + indexMetadataEnabled + ); + try { + RemoteFsTimestampAwareTranslog.cleanup(translogTransferManager); + } catch (IOException e) { + logger.error("Exception while cleaning up remote translog for shard: " + shardId, e); + } + } + /** * Finds and returns a list of shard paths that match the given index ID. * diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteStoreUtilsTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteStoreUtilsTests.java index 2a34dd3580948..be30de97ee830 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteStoreUtilsTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteStoreUtilsTests.java @@ -637,7 +637,7 @@ private Tuple, Set> testGetPinnedTimestampLockedFilesW String metadataPrefix = "metadata__1__2__3__4__5__"; Map metadataFiles = new HashMap<>(); for (Long metadataFileTimestamp : metadataFileTimestamps) { - metadataFiles.put(metadataFileTimestamp, metadataPrefix + RemoteStoreUtils.invertLong(metadataFileTimestamp)); + metadataFiles.put(metadataFileTimestamp, metadataPrefix + RemoteStoreUtils.invertLong(metadataFileTimestamp) + "__1"); } return new Tuple<>( metadataFiles, @@ -662,7 +662,7 @@ private Tuple, Set> testGetPinnedTimestampLockedFilesW String primaryTerm = RemoteStoreUtils.invertLong(metadataFileTimestampPrimaryTerm.getValue()); String metadataPrefix = "metadata__" + primaryTerm + "__2__3__4__5__"; long metadataFileTimestamp = metadataFileTimestampPrimaryTerm.getKey(); - metadataFiles.put(metadataFileTimestamp, metadataPrefix + RemoteStoreUtils.invertLong(metadataFileTimestamp)); + metadataFiles.put(metadataFileTimestamp, metadataPrefix + RemoteStoreUtils.invertLong(metadataFileTimestamp) + "__1"); } return new Tuple<>( metadataFiles, diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index 336d4bafd4b66..ecd6620dbea15 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -1170,9 +1170,9 @@ public void testInitializeToSpecificTimestampNoMdMatchingTimestamp() throws IOEx public void testInitializeToSpecificTimestampMatchingMdFile() throws IOException { String metadataPrefix = "metadata__1__2__3__4__5__"; List metadataFiles = new ArrayList<>(); - metadataFiles.add(metadataPrefix + RemoteStoreUtils.invertLong(1000)); - metadataFiles.add(metadataPrefix + RemoteStoreUtils.invertLong(2000)); - metadataFiles.add(metadataPrefix + RemoteStoreUtils.invertLong(3000)); + metadataFiles.add(metadataPrefix + RemoteStoreUtils.invertLong(1000) + "__1"); + metadataFiles.add(metadataPrefix + RemoteStoreUtils.invertLong(2000) + "__1"); + metadataFiles.add(metadataPrefix + RemoteStoreUtils.invertLong(3000) + "__1"); Map metadata = new HashMap<>(); metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234::512::" + Version.LATEST.major); @@ -1184,7 +1184,7 @@ public void testInitializeToSpecificTimestampMatchingMdFile() throws IOException Integer.MAX_VALUE ) ).thenReturn(metadataFiles); - when(remoteMetadataDirectory.getBlobStream(metadataPrefix + RemoteStoreUtils.invertLong(1000))).thenReturn( + when(remoteMetadataDirectory.getBlobStream(metadataPrefix + RemoteStoreUtils.invertLong(1000) + "__1")).thenReturn( createMetadataFileBytes(metadata, indexShard.getLatestReplicationCheckpoint(), segmentInfos) ); diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java index f66d8f4a894b0..4c9da7e95dfa7 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java @@ -8,6 +8,8 @@ package org.opensearch.index.translog; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.tests.util.LuceneTestCase; import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.metadata.RepositoryMetadata; @@ -53,6 +55,7 @@ import java.util.Set; import java.util.TreeSet; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.LongStream; @@ -63,7 +66,9 @@ import static org.opensearch.index.translog.transfer.TranslogTransferMetadata.METADATA_SEPARATOR; import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -211,16 +216,44 @@ public void onFailure(Exception e) { // Node id containing separator String nodeIdWithSeparator = - "metadata__9223372036438563903__9223372036854774799__9223370311919910393__node__1__9223372036438563958__1"; + "metadata__9223372036438563903__9223372036854774799__9223370311919910393__node__1__9223372036438563958__2__1"; Tuple minMaxGen = TranslogTransferMetadata.getMinMaxTranslogGenerationFromFilename(nodeIdWithSeparator); Long minGen = Long.MAX_VALUE - 9223372036438563958L; assertEquals(minGen, minMaxGen.v1()); // Malformed md filename - String malformedMdFileName = "metadata__9223372036438563903__9223372036854774799__9223370311919910393__node1__xyz__1"; + String malformedMdFileName = "metadata__9223372036438563903__9223372036854774799__9223370311919910393__node1__xyz__3__1"; assertNull(TranslogTransferMetadata.getMinMaxTranslogGenerationFromFilename(malformedMdFileName)); } + public void testGetMinMaxPrimaryTermFromFilename() throws Exception { + // New format metadata file + String newFormatMetadataFile = + "metadata__9223372036854775800__9223372036854774799__9223370311919910393__node1__9223372036438563958__2__1"; + Tuple minMaxPrimaryterm = TranslogTransferMetadata.getMinMaxPrimaryTermFromFilename(newFormatMetadataFile); + Long minPrimaryTerm = 2L; + Long maxPrimaryTerm = 7L; + assertEquals(minPrimaryTerm, minMaxPrimaryterm.v1()); + assertEquals(maxPrimaryTerm, minMaxPrimaryterm.v2()); + + // Old format metadata file + String oldFormatMdFilename = "metadata__9223372036438563903__9223372036854774799__9223370311919910393__31__1"; + assertNull(TranslogTransferMetadata.getMinMaxPrimaryTermFromFilename(oldFormatMdFilename)); + + // Node id containing separator + String nodeIdWithSeparator = + "metadata__9223372036854775800__9223372036854774799__9223370311919910393__node__1__9223372036438563958__2__1"; + minMaxPrimaryterm = TranslogTransferMetadata.getMinMaxPrimaryTermFromFilename(nodeIdWithSeparator); + minPrimaryTerm = 2L; + maxPrimaryTerm = 7L; + assertEquals(minPrimaryTerm, minMaxPrimaryterm.v1()); + assertEquals(maxPrimaryTerm, minMaxPrimaryterm.v2()); + + // Malformed md filename + String malformedMdFileName = "metadata__9223372036854775800__9223372036854774799__9223370311919910393__node1__xyz__3qwe__1"; + assertNull(TranslogTransferMetadata.getMinMaxPrimaryTermFromFilename(malformedMdFileName)); + } + public void testIndexDeletionWithNoPinnedTimestampNoRecentMdFiles() throws Exception { RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); ArrayList ops = new ArrayList<>(); @@ -605,11 +638,11 @@ public void testGetGenerationsToBeDeletedEmptyMetadataFilesNotToBeDeleted() thro List metadataFilesNotToBeDeleted = new ArrayList<>(); List metadataFilesToBeDeleted = List.of( // 4 to 7 - "metadata__9223372036438563903__9223372036854775800__9223370311919910398__31__9223372036854775803__1", + "metadata__9223372036854775806__9223372036854775800__9223370311919910398__31__9223372036854775803__1__1", // 17 to 37 - "metadata__9223372036438563903__9223372036854775770__9223370311919910398__31__9223372036854775790__1", + "metadata__9223372036854775806__9223372036854775770__9223370311919910398__31__9223372036854775790__1__1", // 27 to 42 - "metadata__9223372036438563903__9223372036854775765__9223370311919910403__31__9223372036854775780__1" + "metadata__9223372036854775806__9223372036854775765__9223370311919910403__31__9223372036854775780__1__1" ); Set generations = ((RemoteFsTimestampAwareTranslog) translog).getGenerationsToBeDeleted( metadataFilesNotToBeDeleted, @@ -619,6 +652,7 @@ public void testGetGenerationsToBeDeletedEmptyMetadataFilesNotToBeDeleted() thro Set md1Generations = LongStream.rangeClosed(4, 7).boxed().collect(Collectors.toSet()); Set md2Generations = LongStream.rangeClosed(17, 37).boxed().collect(Collectors.toSet()); Set md3Generations = LongStream.rangeClosed(27, 42).boxed().collect(Collectors.toSet()); + assertTrue(generations.containsAll(md1Generations)); assertTrue(generations.containsAll(md2Generations)); assertTrue(generations.containsAll(md3Generations)); @@ -632,19 +666,19 @@ public void testGetGenerationsToBeDeletedEmptyMetadataFilesNotToBeDeleted() thro public void testGetGenerationsToBeDeleted() throws IOException { List metadataFilesNotToBeDeleted = List.of( // 1 to 4 - "metadata__9223372036438563903__9223372036854775803__9223370311919910398__31__9223372036854775806__1", + "metadata__9223372036854775806__9223372036854775803__9223370311919910398__31__9223372036854775806__1__1", // 26 to 30 - "metadata__9223372036438563903__9223372036854775777__9223370311919910398__31__9223372036854775781__1", + "metadata__9223372036854775806__9223372036854775777__9223370311919910398__31__9223372036854775781__1__1", // 42 to 100 - "metadata__9223372036438563903__9223372036854775707__9223370311919910403__31__9223372036854775765__1" + "metadata__9223372036854775806__9223372036854775707__9223370311919910403__31__9223372036854775765__1__1" ); List metadataFilesToBeDeleted = List.of( // 4 to 7 - "metadata__9223372036438563903__9223372036854775800__9223370311919910398__31__9223372036854775803__1", + "metadata__9223372036854775806__9223372036854775800__9223370311919910398__31__9223372036854775803__1__1", // 17 to 37 - "metadata__9223372036438563903__9223372036854775770__9223370311919910398__31__9223372036854775790__1", + "metadata__9223372036854775806__9223372036854775770__9223370311919910398__31__9223372036854775790__1__1", // 27 to 42 - "metadata__9223372036438563903__9223372036854775765__9223370311919910403__31__9223372036854775780__1" + "metadata__9223372036854775806__9223372036854775765__9223370311919910403__31__9223372036854775780__1__1" ); Set generations = ((RemoteFsTimestampAwareTranslog) translog).getGenerationsToBeDeleted( metadataFilesNotToBeDeleted, @@ -654,6 +688,7 @@ public void testGetGenerationsToBeDeleted() throws IOException { Set md1Generations = LongStream.rangeClosed(5, 7).boxed().collect(Collectors.toSet()); Set md2Generations = LongStream.rangeClosed(17, 25).boxed().collect(Collectors.toSet()); Set md3Generations = LongStream.rangeClosed(31, 41).boxed().collect(Collectors.toSet()); + assertTrue(generations.containsAll(md1Generations)); assertTrue(generations.containsAll(md2Generations)); assertTrue(generations.containsAll(md3Generations)); @@ -784,49 +819,49 @@ public void testGetMinMaxTranslogGenerationFromMetadataFile() throws IOException assertEquals( new Tuple<>(701L, 1008L), translog.getMinMaxTranslogGenerationFromMetadataFile( - "metadata__9223372036438563903__9223372036854774799__9223370311919910393__31__9223372036854775106__1", + "metadata__9223372036438563903__9223372036854774799__9223370311919910393__31__9223372036854775106__1__1", translogTransferManager ) ); assertEquals( new Tuple<>(4L, 7L), translog.getMinMaxTranslogGenerationFromMetadataFile( - "metadata__9223372036438563903__9223372036854775800__9223370311919910398__31__9223372036854775803__1", + "metadata__9223372036438563903__9223372036854775800__9223370311919910398__31__9223372036854775803__2__1", translogTransferManager ) ); assertEquals( new Tuple<>(106L, 106L), translog.getMinMaxTranslogGenerationFromMetadataFile( - "metadata__9223372036438563903__9223372036854775701__9223370311919910403__31__9223372036854775701__1", + "metadata__9223372036438563903__9223372036854775701__9223370311919910403__31__9223372036854775701__3__1", translogTransferManager ) ); assertEquals( new Tuple<>(4573L, 99964L), translog.getMinMaxTranslogGenerationFromMetadataFile( - "metadata__9223372036438563903__9223372036854675843__9223370311919910408__31__9223372036854771234__1", + "metadata__9223372036438563903__9223372036854675843__9223370311919910408__31__9223372036854771234__4__1", translogTransferManager ) ); assertEquals( new Tuple<>(1L, 4L), translog.getMinMaxTranslogGenerationFromMetadataFile( - "metadata__9223372036438563903__9223372036854775803__9223370311919910413__31__9223372036854775806__1", + "metadata__9223372036438563903__9223372036854775803__9223370311919910413__31__9223372036854775806__5__1", translogTransferManager ) ); assertEquals( new Tuple<>(2474L, 3462L), translog.getMinMaxTranslogGenerationFromMetadataFile( - "metadata__9223372036438563903__9223372036854772345__9223370311919910429__31__9223372036854773333__1", + "metadata__9223372036438563903__9223372036854772345__9223370311919910429__31__9223372036854773333__6__1", translogTransferManager ) ); assertEquals( new Tuple<>(5807L, 7917L), translog.getMinMaxTranslogGenerationFromMetadataFile( - "metadata__9223372036438563903__9223372036854767890__9223370311919910434__31__9223372036854770000__1", + "metadata__9223372036438563903__9223372036854767890__9223370311919910434__31__9223372036854770000__7__1", translogTransferManager ) ); @@ -860,4 +895,93 @@ public void testGetMinMaxTranslogGenerationFromMetadataFile() throws IOException verify(translogTransferManager).readMetadata("metadata__9223372036438563903__9223372036854774799__9223370311919910393__31__1"); verify(translogTransferManager).readMetadata("metadata__9223372036438563903__9223372036854775800__9223370311919910398__31__1"); } + + public void testDeleteStaleRemotePrimaryTerms() throws IOException { + TranslogTransferManager translogTransferManager = mock(TranslogTransferManager.class); + + List metadataFiles = List.of( + // PT 4 to 9 + "metadata__9223372036854775798__9223372036854774799__9223370311919910393__node1__9223372036438563958__4__1", + // PT 2 to 7 + "metadata__9223372036854775800__9223372036854774799__9223370311919910393__node1__9223372036438563958__2__1", + // PT 2 to 6 + "metadata__9223372036854775801__9223372036854774799__9223370311919910393__node1__9223372036438563958__2__1" + ); + + Logger staticLogger = LogManager.getLogger(RemoteFsTimestampAwareTranslogTests.class); + when(translogTransferManager.listPrimaryTermsInRemote()).thenReturn(Set.of(1L, 2L, 3L, 4L)); + AtomicLong minPrimaryTermInRemote = new AtomicLong(Long.MAX_VALUE); + RemoteFsTimestampAwareTranslog.deleteStaleRemotePrimaryTerms( + metadataFiles, + translogTransferManager, + new HashMap<>(), + minPrimaryTermInRemote, + staticLogger + ); + verify(translogTransferManager).deletePrimaryTermsAsync(2L); + assertEquals(2, minPrimaryTermInRemote.get()); + + RemoteFsTimestampAwareTranslog.deleteStaleRemotePrimaryTerms( + metadataFiles, + translogTransferManager, + new HashMap<>(), + minPrimaryTermInRemote, + staticLogger + ); + // This means there are no new invocations of deletePrimaryTermAsync + verify(translogTransferManager, times(1)).deletePrimaryTermsAsync(anyLong()); + } + + public void testDeleteStaleRemotePrimaryTermsNoPrimaryTermInRemote() throws IOException { + TranslogTransferManager translogTransferManager = mock(TranslogTransferManager.class); + + List metadataFiles = List.of( + // PT 4 to 9 + "metadata__9223372036854775798__9223372036854774799__9223370311919910393__node1__9223372036438563958__4__1", + // PT 2 to 7 + "metadata__9223372036854775800__9223372036854774799__9223370311919910393__node1__9223372036438563958__2__1", + // PT 2 to 6 + "metadata__9223372036854775801__9223372036854774799__9223370311919910393__node1__9223372036438563958__2__1" + ); + + Logger staticLogger = LogManager.getLogger(RemoteFsTimestampAwareTranslogTests.class); + when(translogTransferManager.listPrimaryTermsInRemote()).thenReturn(Set.of()); + AtomicLong minPrimaryTermInRemote = new AtomicLong(Long.MAX_VALUE); + RemoteFsTimestampAwareTranslog.deleteStaleRemotePrimaryTerms( + metadataFiles, + translogTransferManager, + new HashMap<>(), + minPrimaryTermInRemote, + staticLogger + ); + verify(translogTransferManager, times(0)).deletePrimaryTermsAsync(anyLong()); + assertEquals(Long.MAX_VALUE, minPrimaryTermInRemote.get()); + } + + public void testDeleteStaleRemotePrimaryTermsPrimaryTermInRemoteIsBigger() throws IOException { + TranslogTransferManager translogTransferManager = mock(TranslogTransferManager.class); + + List metadataFiles = List.of( + // PT 4 to 9 + "metadata__9223372036854775798__9223372036854774799__9223370311919910393__node1__9223372036438563958__4__1", + // PT 2 to 7 + "metadata__9223372036854775800__9223372036854774799__9223370311919910393__node1__9223372036438563958__2__1", + // PT 2 to 6 + "metadata__9223372036854775801__9223372036854774799__9223370311919910393__node1__9223372036438563958__2__1" + ); + + Logger staticLogger = LogManager.getLogger(RemoteFsTimestampAwareTranslogTests.class); + when(translogTransferManager.listPrimaryTermsInRemote()).thenReturn(Set.of(2L, 3L, 4L)); + AtomicLong minPrimaryTermInRemote = new AtomicLong(Long.MAX_VALUE); + RemoteFsTimestampAwareTranslog.deleteStaleRemotePrimaryTerms( + metadataFiles, + translogTransferManager, + new HashMap<>(), + minPrimaryTermInRemote, + staticLogger + ); + verify(translogTransferManager, times(0)).deletePrimaryTermsAsync(anyLong()); + assertEquals(2, minPrimaryTermInRemote.get()); + } + } diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index 8605043ddd5b5..ed0d6b7d50706 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -628,7 +628,7 @@ public void testMetadataConflict() throws InterruptedException { String mdFilename = tm.getFileName(); long count = mdFilename.chars().filter(ch -> ch == METADATA_SEPARATOR.charAt(0)).count(); // There should not be any `_` in mdFile name as it is used a separator . - assertEquals(12, count); + assertEquals(14, count); Thread.sleep(1); TranslogTransferMetadata tm2 = new TranslogTransferMetadata(1, 1, 1, 2, "node--2"); String mdFilename2 = tm2.getFileName();