From c62626d25430bea94123d6176caef56ae94deb3f Mon Sep 17 00:00:00 2001 From: Anshu Agarwal Date: Tue, 3 Sep 2024 22:09:17 +0530 Subject: [PATCH 1/3] SnapshotInfo pinned timestamp version change to V_2_17_0 (#15616) Signed-off-by: Anshu Agarwal Co-authored-by: Anshu Agarwal --- .../src/main/java/org/opensearch/snapshots/SnapshotInfo.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java b/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java index bc33928acf3da..08433dc6f2e0b 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java @@ -473,7 +473,7 @@ public SnapshotInfo(final StreamInput in) throws IOException { if (in.getVersion().onOrAfter(Version.V_2_9_0)) { remoteStoreIndexShallowCopy = in.readOptionalBoolean(); } - if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + if (in.getVersion().onOrAfter(Version.V_2_17_0)) { pinnedTimestamp = in.readVLong(); } } @@ -940,7 +940,7 @@ public void writeTo(final StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_2_9_0)) { out.writeOptionalBoolean(remoteStoreIndexShallowCopy); } - if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + if (out.getVersion().onOrAfter(Version.V_2_17_0)) { out.writeVLong(pinnedTimestamp); } } From deeb2ded71f05652900ca2915623fd3d0c81c97f Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Tue, 3 Sep 2024 23:26:35 +0530 Subject: [PATCH 2/3] Abstract out RemoteFsTranslogWithPinnedTimestamps (#15579) * Abstract out RemoteFsTranslog with pinned timestamps Signed-off-by: Sachin Kale --- ...emoteBlobStoreInternalTranslogFactory.java | 42 +- .../RemoteFsTimestampAwareTranslog.java | 377 ++++++++++++++++++ .../index/translog/RemoteFsTranslog.java | 309 ++------------ ... RemoteFsTimestampAwareTranslogTests.java} | 75 +++- .../index/translog/RemoteFsTranslogTests.java | 38 +- 5 files changed, 531 insertions(+), 310 deletions(-) create mode 100644 server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java rename server/src/test/java/org/opensearch/index/translog/{RemoteFsTranslogWithPinnedTimestampTests.java => RemoteFsTimestampAwareTranslogTests.java} (93%) diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java b/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java index 4599aa32325c1..5dc2ad076d21c 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java @@ -69,19 +69,35 @@ public Translog newTranslog( assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository"; BlobStoreRepository blobStoreRepository = ((BlobStoreRepository) repository); - return new RemoteFsTranslog( - config, - translogUUID, - deletionPolicy, - globalCheckpointSupplier, - primaryTermSupplier, - persistedSequenceNumberConsumer, - blobStoreRepository, - threadPool, - startedPrimarySupplier, - remoteTranslogTransferTracker, - remoteStoreSettings - ); + if (RemoteStoreSettings.isPinnedTimestampsEnabled()) { + return new RemoteFsTimestampAwareTranslog( + config, + translogUUID, + deletionPolicy, + globalCheckpointSupplier, + primaryTermSupplier, + persistedSequenceNumberConsumer, + blobStoreRepository, + threadPool, + startedPrimarySupplier, + remoteTranslogTransferTracker, + remoteStoreSettings + ); + } else { + return new RemoteFsTranslog( + config, + translogUUID, + deletionPolicy, + globalCheckpointSupplier, + primaryTermSupplier, + persistedSequenceNumberConsumer, + blobStoreRepository, + threadPool, + startedPrimarySupplier, + remoteTranslogTransferTracker, + remoteStoreSettings + ); + } } public Repository getRepository() { diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java new file mode 100644 index 0000000000000..0b134b3bddbec --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java @@ -0,0 +1,377 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog; + +import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.blobstore.BlobMetadata; +import org.opensearch.common.collect.Tuple; +import org.opensearch.common.logging.Loggers; +import org.opensearch.core.action.ActionListener; +import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.index.remote.RemoteTranslogTransferTracker; +import org.opensearch.index.translog.transfer.TranslogTransferManager; +import org.opensearch.index.translog.transfer.TranslogTransferMetadata; +import org.opensearch.indices.RemoteStoreSettings; +import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.TreeSet; +import java.util.function.BooleanSupplier; +import java.util.function.LongConsumer; +import java.util.function.LongSupplier; +import java.util.stream.Collectors; +import java.util.stream.LongStream; + +import static org.opensearch.index.translog.transfer.TranslogTransferMetadata.METADATA_SEPARATOR; + +/** + * A Translog implementation which syncs local FS with a remote store + * The current impl uploads translog , ckp and metadata to remote store + * for every sync, post syncing to disk. Post that, a new generation is + * created. This implementation is also aware of pinned timestamp and makes + * sure data against pinned timestamp is retained. + * + * @opensearch.internal + */ +public class RemoteFsTimestampAwareTranslog extends RemoteFsTranslog { + + private final Logger logger; + private final Map metadataFilePinnedTimestampMap; + // For metadata files, with no min generation in the name, we cache generation data to avoid multiple reads. + private final Map> oldFormatMetadataFileGenerationMap; + + public RemoteFsTimestampAwareTranslog( + TranslogConfig config, + String translogUUID, + TranslogDeletionPolicy deletionPolicy, + LongSupplier globalCheckpointSupplier, + LongSupplier primaryTermSupplier, + LongConsumer persistedSequenceNumberConsumer, + BlobStoreRepository blobStoreRepository, + ThreadPool threadPool, + BooleanSupplier startedPrimarySupplier, + RemoteTranslogTransferTracker remoteTranslogTransferTracker, + RemoteStoreSettings remoteStoreSettings + ) throws IOException { + super( + config, + translogUUID, + deletionPolicy, + globalCheckpointSupplier, + primaryTermSupplier, + persistedSequenceNumberConsumer, + blobStoreRepository, + threadPool, + startedPrimarySupplier, + remoteTranslogTransferTracker, + remoteStoreSettings + ); + logger = Loggers.getLogger(getClass(), shardId); + this.metadataFilePinnedTimestampMap = new HashMap<>(); + this.oldFormatMetadataFileGenerationMap = new HashMap<>(); + } + + @Override + protected void onDelete() { + ClusterService.assertClusterOrClusterManagerStateThread(); + // clean up all remote translog files + try { + trimUnreferencedReaders(true, false); + } catch (IOException e) { + logger.error("Exception while deleting translog files from remote store", e); + } + } + + @Override + public void trimUnreferencedReaders() throws IOException { + trimUnreferencedReaders(false, true); + } + + // Visible for testing + protected void trimUnreferencedReaders(boolean indexDeleted, boolean trimLocal) throws IOException { + if (trimLocal) { + // clean up local translog files and updates readers + super.trimUnreferencedReaders(); + } + + // Update file tracker to reflect local translog state + Optional minLiveGeneration = readers.stream().map(BaseTranslogReader::getGeneration).min(Long::compareTo); + if (minLiveGeneration.isPresent()) { + List staleFilesInTracker = new ArrayList<>(); + for (String file : fileTransferTracker.allUploaded()) { + if (file.endsWith(TRANSLOG_FILE_SUFFIX)) { + long generation = Translog.parseIdFromFileName(file); + if (generation < minLiveGeneration.get()) { + staleFilesInTracker.add(file); + staleFilesInTracker.add(Translog.getCommitCheckpointFileName(generation)); + } + } + fileTransferTracker.delete(staleFilesInTracker); + } + } + + // This is to ensure that after the permits are acquired during primary relocation, there are no further modification on remote + // store. + if (startedPrimarySupplier.getAsBoolean() == false || pauseSync.get()) { + return; + } + + // This is to fail fast and avoid listing md files un-necessarily. + if (indexDeleted == false && RemoteStoreUtils.isPinnedTimestampStateStale()) { + logger.warn("Skipping remote segment store garbage collection as last fetch of pinned timestamp is stale"); + return; + } + + // Since remote generation deletion is async, this ensures that only one generation deletion happens at a time. + // Remote generations involves 2 async operations - 1) Delete translog generation files 2) Delete metadata files + // We try to acquire 2 permits and if we can not, we return from here itself. + if (remoteGenerationDeletionPermits.tryAcquire(REMOTE_DELETION_PERMITS) == false) { + return; + } + + ActionListener> listMetadataFilesListener = new ActionListener<>() { + @Override + public void onResponse(List blobMetadata) { + List metadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList()); + + try { + if (metadataFiles.size() <= 1) { + logger.debug("No stale translog metadata files found"); + remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS); + return; + } + + // Check last fetch status of pinned timestamps. If stale, return. + if (indexDeleted == false && RemoteStoreUtils.isPinnedTimestampStateStale()) { + logger.warn("Skipping remote segment store garbage collection as last fetch of pinned timestamp is stale"); + remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS); + return; + } + + List metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(metadataFiles); + + // If index is not deleted, make sure to keep latest metadata file + if (indexDeleted == false) { + metadataFilesToBeDeleted.remove(metadataFiles.get(0)); + } + + if (metadataFilesToBeDeleted.isEmpty()) { + logger.debug("No metadata files to delete"); + remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS); + return; + } + + logger.debug(() -> "metadataFilesToBeDeleted = " + metadataFilesToBeDeleted); + // For all the files that we are keeping, fetch min and max generations + List metadataFilesNotToBeDeleted = new ArrayList<>(metadataFiles); + metadataFilesNotToBeDeleted.removeAll(metadataFilesToBeDeleted); + + logger.debug(() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted); + Set generationsToBeDeleted = getGenerationsToBeDeleted( + metadataFilesNotToBeDeleted, + metadataFilesToBeDeleted, + indexDeleted + ); + + logger.debug(() -> "generationsToBeDeleted = " + generationsToBeDeleted); + if (generationsToBeDeleted.isEmpty() == false) { + // Delete stale generations + translogTransferManager.deleteGenerationAsync( + primaryTermSupplier.getAsLong(), + generationsToBeDeleted, + remoteGenerationDeletionPermits::release + ); + + // Delete stale metadata files + translogTransferManager.deleteMetadataFilesAsync( + metadataFilesToBeDeleted, + remoteGenerationDeletionPermits::release + ); + + // Update cache to keep only those metadata files that are not getting deleted + oldFormatMetadataFileGenerationMap.keySet().retainAll(metadataFilesNotToBeDeleted); + + // Delete stale primary terms + deleteStaleRemotePrimaryTerms(metadataFiles); + } else { + remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS); + } + } catch (Exception e) { + remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS); + } + } + + @Override + public void onFailure(Exception e) { + remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS); + logger.error("Exception while listing translog metadata files", e); + } + }; + translogTransferManager.listTranslogMetadataFilesAsync(listMetadataFilesListener); + } + + // Visible for testing + protected Set getGenerationsToBeDeleted( + List metadataFilesNotToBeDeleted, + List metadataFilesToBeDeleted, + boolean indexDeleted + ) throws IOException { + long maxGenerationToBeDeleted = Long.MAX_VALUE; + + if (indexDeleted == false) { + maxGenerationToBeDeleted = minRemoteGenReferenced - 1 - indexSettings().getRemoteTranslogExtraKeep(); + } + + Set generationsFromMetadataFilesToBeDeleted = new HashSet<>(); + for (String mdFile : metadataFilesToBeDeleted) { + Tuple minMaxGen = getMinMaxTranslogGenerationFromMetadataFile(mdFile, translogTransferManager); + generationsFromMetadataFilesToBeDeleted.addAll( + LongStream.rangeClosed(minMaxGen.v1(), minMaxGen.v2()).boxed().collect(Collectors.toList()) + ); + } + + Map> metadataFileNotToBeDeletedGenerationMap = getGenerationForMetadataFiles(metadataFilesNotToBeDeleted); + TreeSet> pinnedGenerations = getOrderedPinnedMetadataGenerations(metadataFileNotToBeDeletedGenerationMap); + Set generationsToBeDeleted = new HashSet<>(); + for (long generation : generationsFromMetadataFilesToBeDeleted) { + // Check if the generation is not referred by metadata file matching pinned timestamps + if (generation <= maxGenerationToBeDeleted && isGenerationPinned(generation, pinnedGenerations) == false) { + generationsToBeDeleted.add(generation); + } + } + return generationsToBeDeleted; + } + + // Visible for testing + protected List getMetadataFilesToBeDeleted(List metadataFiles) { + Tuple> pinnedTimestampsState = RemoteStorePinnedTimestampService.getPinnedTimestamps(); + + // Keep files since last successful run of scheduler + List metadataFilesToBeDeleted = RemoteStoreUtils.filterOutMetadataFilesBasedOnAge( + metadataFiles, + file -> RemoteStoreUtils.invertLong(file.split(METADATA_SEPARATOR)[3]), + pinnedTimestampsState.v1() + ); + + logger.trace( + "metadataFiles.size = {}, metadataFilesToBeDeleted based on age based filtering = {}", + metadataFiles.size(), + metadataFilesToBeDeleted.size() + ); + + // Get md files matching pinned timestamps + Set implicitLockedFiles = RemoteStoreUtils.getPinnedTimestampLockedFiles( + metadataFilesToBeDeleted, + pinnedTimestampsState.v2(), + metadataFilePinnedTimestampMap, + file -> RemoteStoreUtils.invertLong(file.split(METADATA_SEPARATOR)[3]), + TranslogTransferMetadata::getNodeIdByPrimaryTermAndGen + ); + + // Filter out metadata files matching pinned timestamps + metadataFilesToBeDeleted.removeAll(implicitLockedFiles); + + logger.trace( + "implicitLockedFiles.size = {}, metadataFilesToBeDeleted based on pinned timestamp filtering = {}", + implicitLockedFiles.size(), + metadataFilesToBeDeleted.size() + ); + + return metadataFilesToBeDeleted; + } + + // Visible for testing + protected boolean isGenerationPinned(long generation, TreeSet> pinnedGenerations) { + Tuple ceilingGenerationRange = pinnedGenerations.ceiling(new Tuple<>(generation, generation)); + if (ceilingGenerationRange != null && generation >= ceilingGenerationRange.v1() && generation <= ceilingGenerationRange.v2()) { + return true; + } + Tuple floorGenerationRange = pinnedGenerations.floor(new Tuple<>(generation, generation)); + if (floorGenerationRange != null && generation >= floorGenerationRange.v1() && generation <= floorGenerationRange.v2()) { + return true; + } + return false; + } + + private TreeSet> getOrderedPinnedMetadataGenerations(Map> metadataFileGenerationMap) { + TreeSet> pinnedGenerations = new TreeSet<>((o1, o2) -> { + if (Objects.equals(o1.v1(), o2.v1()) == false) { + return o1.v1().compareTo(o2.v1()); + } else { + return o1.v2().compareTo(o2.v2()); + } + }); + pinnedGenerations.addAll(metadataFileGenerationMap.values()); + return pinnedGenerations; + } + + // Visible for testing + protected Map> getGenerationForMetadataFiles(List metadataFiles) throws IOException { + Map> metadataFileGenerationMap = new HashMap<>(); + for (String metadataFile : metadataFiles) { + metadataFileGenerationMap.put(metadataFile, getMinMaxTranslogGenerationFromMetadataFile(metadataFile, translogTransferManager)); + } + return metadataFileGenerationMap; + } + + // Visible for testing + protected Tuple getMinMaxTranslogGenerationFromMetadataFile( + String metadataFile, + TranslogTransferManager translogTransferManager + ) throws IOException { + Tuple minMaxGenerationFromFileName = TranslogTransferMetadata.getMinMaxTranslogGenerationFromFilename(metadataFile); + if (minMaxGenerationFromFileName != null) { + return minMaxGenerationFromFileName; + } else { + if (oldFormatMetadataFileGenerationMap.containsKey(metadataFile)) { + return oldFormatMetadataFileGenerationMap.get(metadataFile); + } else { + TranslogTransferMetadata metadata = translogTransferManager.readMetadata(metadataFile); + Tuple minMaxGenTuple = new Tuple<>(metadata.getMinTranslogGeneration(), metadata.getGeneration()); + oldFormatMetadataFileGenerationMap.put(metadataFile, minMaxGenTuple); + return minMaxGenTuple; + } + } + } + + /** + * This method must be called only after there are valid generations to delete in trimUnreferencedReaders as it ensures + * implicitly that minimum primary term in latest translog metadata in remote store is the current primary term. + *
+ * This will also delete all stale translog metadata files from remote except the latest basis the metadata file comparator. + */ + private void deleteStaleRemotePrimaryTerms(List metadataFiles) { + // The deletion of older translog files in remote store is on best-effort basis, there is a possibility that there + // are older files that are no longer needed and should be cleaned up. In here, we delete all files that are part + // of older primary term. + if (olderPrimaryCleaned.trySet(Boolean.TRUE)) { + if (metadataFiles.isEmpty()) { + logger.trace("No metadata is uploaded yet, returning from deleteStaleRemotePrimaryTerms"); + return; + } + Optional minPrimaryTerm = metadataFiles.stream() + .map(file -> RemoteStoreUtils.invertLong(file.split(METADATA_SEPARATOR)[1])) + .min(Long::compareTo); + // First we delete all stale primary terms folders from remote store + long minimumReferencedPrimaryTerm = minPrimaryTerm.get() - 1; + translogTransferManager.deletePrimaryTermsAsync(minimumReferencedPrimaryTerm); + } + } +} diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 16ebac0a44139..812852d107682 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -11,19 +11,15 @@ import org.apache.logging.log4j.Logger; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.SetOnce; -import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; -import org.opensearch.common.collect.Tuple; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasables; import org.opensearch.common.logging.Loggers; import org.opensearch.common.util.concurrent.ReleasableLock; import org.opensearch.common.util.io.IOUtils; -import org.opensearch.core.action.ActionListener; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.util.FileSystemUtils; import org.opensearch.index.remote.RemoteStorePathStrategy; -import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.index.remote.RemoteTranslogTransferTracker; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.translog.transfer.BlobStoreTransferService; @@ -34,7 +30,6 @@ import org.opensearch.index.translog.transfer.TranslogTransferMetadata; import org.opensearch.index.translog.transfer.listener.TranslogTransferListener; import org.opensearch.indices.RemoteStoreSettings; -import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService; import org.opensearch.repositories.Repository; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.threadpool.ThreadPool; @@ -44,16 +39,11 @@ import java.nio.file.Files; import java.nio.file.NoSuchFileException; import java.nio.file.Path; -import java.util.ArrayList; -import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.Set; -import java.util.TreeSet; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -61,13 +51,10 @@ import java.util.function.BooleanSupplier; import java.util.function.LongConsumer; import java.util.function.LongSupplier; -import java.util.stream.Collectors; -import java.util.stream.LongStream; import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG; import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA; import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA; -import static org.opensearch.index.translog.transfer.TranslogTransferMetadata.METADATA_SEPARATOR; /** * A Translog implementation which syncs local FS with a remote store @@ -80,36 +67,31 @@ public class RemoteFsTranslog extends Translog { private final Logger logger; - private final TranslogTransferManager translogTransferManager; - // This tracker keeps track of local tranlog files that are uploaded to remote store. - // Once tlog files are deleted from local, we remove them from tracker even if the files still exist in remote translog. - private final FileTransferTracker fileTransferTracker; - private final BooleanSupplier startedPrimarySupplier; + protected final TranslogTransferManager translogTransferManager; + protected final FileTransferTracker fileTransferTracker; + protected final BooleanSupplier startedPrimarySupplier; private final RemoteTranslogTransferTracker remoteTranslogTransferTracker; private volatile long maxRemoteTranslogGenerationUploaded; private volatile long minSeqNoToKeep; // min generation referred by last uploaded translog - private volatile long minRemoteGenReferenced; + protected volatile long minRemoteGenReferenced; // clean up translog folder uploaded by previous primaries once - private final SetOnce olderPrimaryCleaned = new SetOnce<>(); + protected final SetOnce olderPrimaryCleaned = new SetOnce<>(); - private static final int REMOTE_DELETION_PERMITS = 2; + protected static final int REMOTE_DELETION_PERMITS = 2; private static final int DOWNLOAD_RETRIES = 2; // Semaphore used to allow only single remote generation to happen at a time - private final Semaphore remoteGenerationDeletionPermits = new Semaphore(REMOTE_DELETION_PERMITS); + protected final Semaphore remoteGenerationDeletionPermits = new Semaphore(REMOTE_DELETION_PERMITS); // These permits exist to allow any inflight background triggered upload. private static final int SYNC_PERMIT = 1; private final Semaphore syncPermit = new Semaphore(SYNC_PERMIT); - private final AtomicBoolean pauseSync = new AtomicBoolean(false); + protected final AtomicBoolean pauseSync = new AtomicBoolean(false); private final boolean isTranslogMetadataEnabled; - private final Map metadataFilePinnedTimestampMap; - // For metadata files, with no min generation in the name, we cache generation data to avoid multiple reads. - private final Map> oldFormatMetadataFileGenerationMap; public RemoteFsTranslog( TranslogConfig config, @@ -130,8 +112,6 @@ public RemoteFsTranslog( this.remoteTranslogTransferTracker = remoteTranslogTransferTracker; fileTransferTracker = new FileTransferTracker(shardId, remoteTranslogTransferTracker); isTranslogMetadataEnabled = indexSettings().isTranslogMetadataEnabled(); - this.metadataFilePinnedTimestampMap = new HashMap<>(); - this.oldFormatMetadataFileGenerationMap = new HashMap<>(); this.translogTransferManager = buildTranslogTransferManager( blobStoreRepository, threadPool, @@ -569,31 +549,8 @@ protected Releasable drainSync() { @Override public void trimUnreferencedReaders() throws IOException { - trimUnreferencedReaders(false, true); - } - - // Visible for testing - protected void trimUnreferencedReaders(boolean indexDeleted, boolean trimLocal) throws IOException { - if (trimLocal) { - // clean up local translog files and updates readers - super.trimUnreferencedReaders(); - } - - // Update file tracker to reflect local translog state - Optional minLiveGeneration = readers.stream().map(BaseTranslogReader::getGeneration).min(Long::compareTo); - if (minLiveGeneration.isPresent()) { - List staleFilesInTracker = new ArrayList<>(); - for (String file : fileTransferTracker.allUploaded()) { - if (file.endsWith(TRANSLOG_FILE_SUFFIX)) { - long generation = Translog.parseIdFromFileName(file); - if (generation < minLiveGeneration.get()) { - staleFilesInTracker.add(file); - staleFilesInTracker.add(Translog.getCommitCheckpointFileName(generation)); - } - } - fileTransferTracker.delete(staleFilesInTracker); - } - } + // clean up local translog files and updates readers + super.trimUnreferencedReaders(); // This is to ensure that after the permits are acquired during primary relocation, there are no further modification on remote // store. @@ -601,12 +558,6 @@ protected void trimUnreferencedReaders(boolean indexDeleted, boolean trimLocal) return; } - // This is to fail fast and avoid listing md files un-necessarily. - if (indexDeleted == false && RemoteStoreUtils.isPinnedTimestampStateStale()) { - logger.warn("Skipping remote segment store garbage collection as last fetch of pinned timestamp is stale"); - return; - } - // Since remote generation deletion is async, this ensures that only one generation deletion happens at a time. // Remote generations involves 2 async operations - 1) Delete translog generation files 2) Delete metadata files // We try to acquire 2 permits and if we can not, we return from here itself. @@ -614,209 +565,34 @@ protected void trimUnreferencedReaders(boolean indexDeleted, boolean trimLocal) return; } - ActionListener> listMetadataFilesListener = new ActionListener<>() { - @Override - public void onResponse(List blobMetadata) { - List metadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList()); - - try { - if (metadataFiles.size() <= 1) { - logger.debug("No stale translog metadata files found"); - remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS); - return; - } - - // Check last fetch status of pinned timestamps. If stale, return. - if (indexDeleted == false && RemoteStoreUtils.isPinnedTimestampStateStale()) { - logger.warn("Skipping remote segment store garbage collection as last fetch of pinned timestamp is stale"); - remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS); - return; - } - - List metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(metadataFiles); - - // If index is not deleted, make sure to keep latest metadata file - if (indexDeleted == false) { - metadataFilesToBeDeleted.remove(metadataFiles.get(0)); - } - - if (metadataFilesToBeDeleted.isEmpty()) { - logger.debug("No metadata files to delete"); - remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS); - return; - } - - logger.debug("metadataFilesToBeDeleted = {}", metadataFilesToBeDeleted); - // For all the files that we are keeping, fetch min and max generations - List metadataFilesNotToBeDeleted = new ArrayList<>(metadataFiles); - metadataFilesNotToBeDeleted.removeAll(metadataFilesToBeDeleted); - - logger.debug("metadataFilesNotToBeDeleted = {}", metadataFilesNotToBeDeleted); - Set generationsToBeDeleted = getGenerationsToBeDeleted( - metadataFilesNotToBeDeleted, - metadataFilesToBeDeleted, - indexDeleted - ); - - logger.debug("generationsToBeDeleted = {}", generationsToBeDeleted); - if (generationsToBeDeleted.isEmpty() == false) { - // Delete stale generations - translogTransferManager.deleteGenerationAsync( - primaryTermSupplier.getAsLong(), - generationsToBeDeleted, - remoteGenerationDeletionPermits::release - ); - - // Delete stale metadata files - translogTransferManager.deleteMetadataFilesAsync( - metadataFilesToBeDeleted, - remoteGenerationDeletionPermits::release - ); - - // Update cache to keep only those metadata files that are not getting deleted - oldFormatMetadataFileGenerationMap.keySet().retainAll(metadataFilesNotToBeDeleted); - - // Delete stale primary terms - deleteStaleRemotePrimaryTerms(metadataFiles); - } else { - remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS); - } - } catch (Exception e) { - remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS); - } - } - - @Override - public void onFailure(Exception e) { - remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS); - logger.error("Exception while listing translog metadata files", e); + // cleans up remote translog files not referenced in latest uploaded metadata. + // This enables us to restore translog from the metadata in case of failover or relocation. + Set generationsToDelete = new HashSet<>(); + for (long generation = minRemoteGenReferenced - 1 - indexSettings().getRemoteTranslogExtraKeep(); generation >= 0; generation--) { + if (fileTransferTracker.uploaded(Translog.getFilename(generation)) == false) { + break; } - }; - translogTransferManager.listTranslogMetadataFilesAsync(listMetadataFilesListener); - } - - // Visible for testing - protected Set getGenerationsToBeDeleted( - List metadataFilesNotToBeDeleted, - List metadataFilesToBeDeleted, - boolean indexDeleted - ) throws IOException { - long maxGenerationToBeDeleted = Long.MAX_VALUE; - - if (indexDeleted == false) { - maxGenerationToBeDeleted = minRemoteGenReferenced - 1 - indexSettings().getRemoteTranslogExtraKeep(); + generationsToDelete.add(generation); } - - Set generationsFromMetadataFilesToBeDeleted = new HashSet<>(); - for (String mdFile : metadataFilesToBeDeleted) { - Tuple minMaxGen = getMinMaxTranslogGenerationFromMetadataFile(mdFile, translogTransferManager); - generationsFromMetadataFilesToBeDeleted.addAll( - LongStream.rangeClosed(minMaxGen.v1(), minMaxGen.v2()).boxed().collect(Collectors.toList()) - ); - } - - Map> metadataFileNotToBeDeletedGenerationMap = getGenerationForMetadataFiles(metadataFilesNotToBeDeleted); - TreeSet> pinnedGenerations = getOrderedPinnedMetadataGenerations(metadataFileNotToBeDeletedGenerationMap); - Set generationsToBeDeleted = new HashSet<>(); - for (long generation : generationsFromMetadataFilesToBeDeleted) { - // Check if the generation is not referred by metadata file matching pinned timestamps - if (generation <= maxGenerationToBeDeleted && isGenerationPinned(generation, pinnedGenerations) == false) { - generationsToBeDeleted.add(generation); - } + if (generationsToDelete.isEmpty() == false) { + deleteRemoteGeneration(generationsToDelete); + translogTransferManager.deleteStaleTranslogMetadataFilesAsync(remoteGenerationDeletionPermits::release); + deleteStaleRemotePrimaryTerms(); + } else { + remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS); } - return generationsToBeDeleted; } - // Visible for testing - protected List getMetadataFilesToBeDeleted(List metadataFiles) { - Tuple> pinnedTimestampsState = RemoteStorePinnedTimestampService.getPinnedTimestamps(); - - // Keep files since last successful run of scheduler - List metadataFilesToBeDeleted = RemoteStoreUtils.filterOutMetadataFilesBasedOnAge( - metadataFiles, - file -> RemoteStoreUtils.invertLong(file.split(METADATA_SEPARATOR)[3]), - pinnedTimestampsState.v1() - ); - - logger.trace( - "metadataFiles.size = {}, metadataFilesToBeDeleted based on age based filtering = {}", - metadataFiles.size(), - metadataFilesToBeDeleted.size() - ); - - // Get md files matching pinned timestamps - Set implicitLockedFiles = RemoteStoreUtils.getPinnedTimestampLockedFiles( - metadataFilesToBeDeleted, - pinnedTimestampsState.v2(), - metadataFilePinnedTimestampMap, - file -> RemoteStoreUtils.invertLong(file.split(METADATA_SEPARATOR)[3]), - TranslogTransferMetadata::getNodeIdByPrimaryTermAndGen - ); - - // Filter out metadata files matching pinned timestamps - metadataFilesToBeDeleted.removeAll(implicitLockedFiles); - - logger.trace( - "implicitLockedFiles.size = {}, metadataFilesToBeDeleted based on pinned timestamp filtering = {}", - implicitLockedFiles.size(), - metadataFilesToBeDeleted.size() + /** + * Deletes remote translog and metadata files asynchronously corresponding to the generations. + * @param generations generations to be deleted. + */ + private void deleteRemoteGeneration(Set generations) { + translogTransferManager.deleteGenerationAsync( + primaryTermSupplier.getAsLong(), + generations, + remoteGenerationDeletionPermits::release ); - - return metadataFilesToBeDeleted; - } - - // Visible for testing - protected boolean isGenerationPinned(long generation, TreeSet> pinnedGenerations) { - Tuple ceilingGenerationRange = pinnedGenerations.ceiling(new Tuple<>(generation, generation)); - if (ceilingGenerationRange != null && generation >= ceilingGenerationRange.v1() && generation <= ceilingGenerationRange.v2()) { - return true; - } - Tuple floorGenerationRange = pinnedGenerations.floor(new Tuple<>(generation, generation)); - if (floorGenerationRange != null && generation >= floorGenerationRange.v1() && generation <= floorGenerationRange.v2()) { - return true; - } - return false; - } - - private TreeSet> getOrderedPinnedMetadataGenerations(Map> metadataFileGenerationMap) { - TreeSet> pinnedGenerations = new TreeSet<>((o1, o2) -> { - if (Objects.equals(o1.v1(), o2.v1()) == false) { - return o1.v1().compareTo(o2.v1()); - } else { - return o1.v2().compareTo(o2.v2()); - } - }); - pinnedGenerations.addAll(metadataFileGenerationMap.values()); - return pinnedGenerations; - } - - // Visible for testing - protected Map> getGenerationForMetadataFiles(List metadataFiles) throws IOException { - Map> metadataFileGenerationMap = new HashMap<>(); - for (String metadataFile : metadataFiles) { - metadataFileGenerationMap.put(metadataFile, getMinMaxTranslogGenerationFromMetadataFile(metadataFile, translogTransferManager)); - } - return metadataFileGenerationMap; - } - - // Visible for testing - protected Tuple getMinMaxTranslogGenerationFromMetadataFile( - String metadataFile, - TranslogTransferManager translogTransferManager - ) throws IOException { - Tuple minMaxGenerationFromFileName = TranslogTransferMetadata.getMinMaxTranslogGenerationFromFilename(metadataFile); - if (minMaxGenerationFromFileName != null) { - return minMaxGenerationFromFileName; - } else { - if (oldFormatMetadataFileGenerationMap.containsKey(metadataFile)) { - return oldFormatMetadataFileGenerationMap.get(metadataFile); - } else { - TranslogTransferMetadata metadata = translogTransferManager.readMetadata(metadataFile); - Tuple minMaxGenTuple = new Tuple<>(metadata.getMinTranslogGeneration(), metadata.getGeneration()); - oldFormatMetadataFileGenerationMap.put(metadataFile, minMaxGenTuple); - return minMaxGenTuple; - } - } } /** @@ -825,20 +601,17 @@ protected Tuple getMinMaxTranslogGenerationFromMetadataFile( *
* This will also delete all stale translog metadata files from remote except the latest basis the metadata file comparator. */ - private void deleteStaleRemotePrimaryTerms(List metadataFiles) { + private void deleteStaleRemotePrimaryTerms() { // The deletion of older translog files in remote store is on best-effort basis, there is a possibility that there // are older files that are no longer needed and should be cleaned up. In here, we delete all files that are part // of older primary term. if (olderPrimaryCleaned.trySet(Boolean.TRUE)) { - if (metadataFiles.isEmpty()) { - logger.trace("No metadata is uploaded yet, returning from deleteStaleRemotePrimaryTerms"); + if (readers.isEmpty()) { + logger.trace("Translog reader list is empty, returning from deleteStaleRemotePrimaryTerms"); return; } - Optional minPrimaryTerm = metadataFiles.stream() - .map(file -> RemoteStoreUtils.invertLong(file.split(METADATA_SEPARATOR)[1])) - .min(Long::compareTo); // First we delete all stale primary terms folders from remote store - long minimumReferencedPrimaryTerm = minPrimaryTerm.get() - 1; + long minimumReferencedPrimaryTerm = readers.stream().map(BaseTranslogReader::getPrimaryTerm).min(Long::compare).get(); translogTransferManager.deletePrimaryTermsAsync(minimumReferencedPrimaryTerm); } } @@ -874,15 +647,7 @@ public static void cleanup( protected void onDelete() { ClusterService.assertClusterOrClusterManagerStateThread(); // clean up all remote translog files - if (RemoteStoreSettings.isPinnedTimestampsEnabled()) { - try { - trimUnreferencedReaders(true, false); - } catch (IOException e) { - logger.error("Exception while deleting translog files from remote store", e); - } - } else { - translogTransferManager.delete(); - } + translogTransferManager.delete(); } // Visible for testing diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogWithPinnedTimestampTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java similarity index 93% rename from server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogWithPinnedTimestampTests.java rename to server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java index 386dde4dffc48..1f82dd9d7e641 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogWithPinnedTimestampTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java @@ -29,6 +29,7 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.translog.transfer.TranslogTransferManager; import org.opensearch.index.translog.transfer.TranslogTransferMetadata; +import org.opensearch.index.translog.transfer.TranslogUploadFailedException; import org.opensearch.indices.DefaultRemoteStoreSettings; import org.opensearch.indices.RemoteStoreSettings; import org.opensearch.node.Node; @@ -42,6 +43,7 @@ import org.junit.Before; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -60,14 +62,13 @@ import static org.opensearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; import static org.opensearch.index.translog.transfer.TranslogTransferMetadata.METADATA_SEPARATOR; import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED; -import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @LuceneTestCase.SuppressFileSystems("ExtrasFS") -public class RemoteFsTranslogWithPinnedTimestampTests extends RemoteFsTranslogTests { +public class RemoteFsTimestampAwareTranslogTests extends RemoteFsTranslogTests { Runnable updatePinnedTimstampTask; BlobContainer blobContainer; @@ -125,6 +126,48 @@ public void setUp() throws Exception { remoteStorePinnedTimestampServiceSpy.start(); } + @Override + protected RemoteFsTranslog createTranslogInstance( + TranslogConfig translogConfig, + String translogUUID, + TranslogDeletionPolicy deletionPolicy + ) throws IOException { + return new RemoteFsTimestampAwareTranslog( + translogConfig, + translogUUID, + deletionPolicy, + () -> globalCheckpoint.get(), + primaryTerm::get, + getPersistedSeqNoConsumer(), + repository, + threadPool, + primaryMode::get, + new RemoteTranslogTransferTracker(shardId, 10), + DefaultRemoteStoreSettings.INSTANCE + ); + } + + @Override + public void testSyncUpAlwaysFailure() throws IOException { + int translogOperations = randomIntBetween(1, 20); + int count = 0; + fail.failAlways(); + for (int op = 0; op < translogOperations; op++) { + translog.add( + new Translog.Index(String.valueOf(op), count, primaryTerm.get(), Integer.toString(count).getBytes(StandardCharsets.UTF_8)) + ); + try { + translog.sync(); + fail("io exception expected"); + } catch (TranslogUploadFailedException e) { + assertTrue("at least one operation pending", translog.syncNeeded()); + } + } + assertTrue(translog.isOpen()); + fail.failNever(); + translog.sync(); + } + public void testGetMinMaxTranslogGenerationFromFilename() throws Exception { RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); ArrayList ops = new ArrayList<>(); @@ -198,7 +241,7 @@ public void testIndexDeletionWithNoPinnedTimestampNoRecentMdFiles() throws Excep assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); updatePinnedTimstampTask.run(); - translog.trimUnreferencedReaders(true, false); + ((RemoteFsTimestampAwareTranslog) translog).trimUnreferencedReaders(true, false); assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); @@ -221,7 +264,7 @@ public void testIndexDeletionWithNoPinnedTimestampButRecentFiles() throws Except addToTranslogAndListAndUpload(translog, ops, new Translog.Index("4", 4, primaryTerm.get(), new byte[] { 1 })); updatePinnedTimstampTask.run(); - translog.trimUnreferencedReaders(true, false); + ((RemoteFsTimestampAwareTranslog) translog).trimUnreferencedReaders(true, false); assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); assertBusy(() -> { @@ -567,7 +610,11 @@ public void testGetGenerationsToBeDeletedEmptyMetadataFilesNotToBeDeleted() thro // 27 to 42 "metadata__9223372036438563903__9223372036854775765__9223370311919910403__31__9223372036854775780__1" ); - Set generations = translog.getGenerationsToBeDeleted(metadataFilesNotToBeDeleted, metadataFilesToBeDeleted, true); + Set generations = ((RemoteFsTimestampAwareTranslog) translog).getGenerationsToBeDeleted( + metadataFilesNotToBeDeleted, + metadataFilesToBeDeleted, + true + ); Set md1Generations = LongStream.rangeClosed(4, 7).boxed().collect(Collectors.toSet()); Set md2Generations = LongStream.rangeClosed(17, 37).boxed().collect(Collectors.toSet()); Set md3Generations = LongStream.rangeClosed(27, 42).boxed().collect(Collectors.toSet()); @@ -598,7 +645,11 @@ public void testGetGenerationsToBeDeleted() throws IOException { // 27 to 42 "metadata__9223372036438563903__9223372036854775765__9223370311919910403__31__9223372036854775780__1" ); - Set generations = translog.getGenerationsToBeDeleted(metadataFilesNotToBeDeleted, metadataFilesToBeDeleted, true); + Set generations = ((RemoteFsTimestampAwareTranslog) translog).getGenerationsToBeDeleted( + metadataFilesNotToBeDeleted, + metadataFilesToBeDeleted, + true + ); Set md1Generations = LongStream.rangeClosed(5, 7).boxed().collect(Collectors.toSet()); Set md2Generations = LongStream.rangeClosed(17, 25).boxed().collect(Collectors.toSet()); Set md3Generations = LongStream.rangeClosed(31, 41).boxed().collect(Collectors.toSet()); @@ -621,7 +672,7 @@ public void testGetMetadataFilesToBeDeletedNoExclusion() { "metadata__9223372036438563903__9223372036854775701__9223370311919910403__31__9223372036854775701__1" ); - assertEquals(metadataFiles, translog.getMetadataFilesToBeDeleted(metadataFiles)); + assertEquals(metadataFiles, ((RemoteFsTimestampAwareTranslog) translog).getMetadataFilesToBeDeleted(metadataFiles)); } public void testGetMetadataFilesToBeDeletedExclusionBasedOnAgeOnly() { @@ -637,7 +688,7 @@ public void testGetMetadataFilesToBeDeletedExclusionBasedOnAgeOnly() { "metadata__9223372036438563903__9223372036854775701__" + md3Timestamp + "__31__9223372036854775701__1" ); - List metadataFilesToBeDeleted = translog.getMetadataFilesToBeDeleted(metadataFiles); + List metadataFilesToBeDeleted = ((RemoteFsTimestampAwareTranslog) translog).getMetadataFilesToBeDeleted(metadataFiles); assertEquals(1, metadataFilesToBeDeleted.size()); assertEquals(metadataFiles.get(0), metadataFilesToBeDeleted.get(0)); } @@ -659,7 +710,7 @@ public void testGetMetadataFilesToBeDeletedExclusionBasedOnPinningOnly() throws "metadata__9223372036438563903__9223372036854775701__" + md3Timestamp + "__31__9223372036854775701__1" ); - List metadataFilesToBeDeleted = translog.getMetadataFilesToBeDeleted(metadataFiles); + List metadataFilesToBeDeleted = ((RemoteFsTimestampAwareTranslog) translog).getMetadataFilesToBeDeleted(metadataFiles); assertEquals(2, metadataFilesToBeDeleted.size()); assertEquals(metadataFiles.get(0), metadataFilesToBeDeleted.get(0)); assertEquals(metadataFiles.get(2), metadataFilesToBeDeleted.get(1)); @@ -682,7 +733,7 @@ public void testGetMetadataFilesToBeDeletedExclusionBasedOnAgeAndPinning() throw "metadata__9223372036438563903__9223372036854775701__" + md3Timestamp + "__31__9223372036854775701__1" ); - List metadataFilesToBeDeleted = translog.getMetadataFilesToBeDeleted(metadataFiles); + List metadataFilesToBeDeleted = ((RemoteFsTimestampAwareTranslog) translog).getMetadataFilesToBeDeleted(metadataFiles); assertEquals(1, metadataFilesToBeDeleted.size()); assertEquals(metadataFiles.get(2), metadataFilesToBeDeleted.get(0)); } @@ -707,6 +758,8 @@ public void testIsGenerationPinned() { pinnedGenerations.add(new Tuple<>(142L, 180L)); pinnedGenerations.add(new Tuple<>(4L, 9L)); + RemoteFsTimestampAwareTranslog translog = (RemoteFsTimestampAwareTranslog) this.translog; + assertFalse(translog.isGenerationPinned(3, pinnedGenerations)); assertFalse(translog.isGenerationPinned(10, pinnedGenerations)); assertFalse(translog.isGenerationPinned(141, pinnedGenerations)); @@ -724,6 +777,8 @@ public void testIsGenerationPinned() { public void testGetMinMaxTranslogGenerationFromMetadataFile() throws IOException { TranslogTransferManager translogTransferManager = mock(TranslogTransferManager.class); + RemoteFsTimestampAwareTranslog translog = (RemoteFsTimestampAwareTranslog) this.translog; + // Fetch generations directly from the filename assertEquals( new Tuple<>(701L, 1008L), diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java index 163340e8ec7d5..339d876274557 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java @@ -118,11 +118,11 @@ public class RemoteFsTranslogTests extends OpenSearchTestCase { protected final ShardId shardId = new ShardId("index", "_na_", 1); protected RemoteFsTranslog translog; - private AtomicLong globalCheckpoint; + protected AtomicLong globalCheckpoint; protected Path translogDir; // A default primary term is used by translog instances created in this test. protected final AtomicLong primaryTerm = new AtomicLong(); - private final AtomicBoolean primaryMode = new AtomicBoolean(true); + protected final AtomicBoolean primaryMode = new AtomicBoolean(true); private final AtomicReference persistedSeqNoConsumer = new AtomicReference<>(); protected ThreadPool threadPool; protected final static String METADATA_DIR = "metadata"; @@ -136,7 +136,7 @@ public class RemoteFsTranslogTests extends OpenSearchTestCase { TestTranslog.SlowDownWriteSwitch slowDown; - private LongConsumer getPersistedSeqNoConsumer() { + protected LongConsumer getPersistedSeqNoConsumer() { return seqNo -> { final LongConsumer consumer = persistedSeqNoConsumer.get(); if (consumer != null) { @@ -167,7 +167,7 @@ public void tearDown() throws Exception { } } - private RemoteFsTranslog create(Path path) throws IOException { + protected RemoteFsTranslog create(Path path) throws IOException { final String translogUUID = Translog.createEmptyTranslog(path, SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get()); return create(path, createRepository(), translogUUID, 0); } @@ -179,6 +179,14 @@ private RemoteFsTranslog create(Path path, BlobStoreRepository repository, Strin final TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(translogConfig.getIndexSettings()); threadPool = new TestThreadPool(getClass().getName()); blobStoreTransferService = new BlobStoreTransferService(repository.blobStore(), threadPool); + return createTranslogInstance(translogConfig, translogUUID, deletionPolicy); + } + + protected RemoteFsTranslog createTranslogInstance( + TranslogConfig translogConfig, + String translogUUID, + TranslogDeletionPolicy deletionPolicy + ) throws IOException { return new RemoteFsTranslog( translogConfig, translogUUID, @@ -671,13 +679,13 @@ public void testSimpleOperationsUpload() throws Exception { assertThat(snapshot.totalOperations(), equalTo(ops.size())); } - assertEquals(2, translog.allUploaded().size()); + assertEquals(4, translog.allUploaded().size()); addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 1, primaryTerm.get(), new byte[] { 1 })); - assertEquals(4, translog.allUploaded().size()); + assertEquals(6, translog.allUploaded().size()); translog.rollGeneration(); - assertEquals(4, translog.allUploaded().size()); + assertEquals(6, translog.allUploaded().size()); Set mdFiles = blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)); assertEquals(2, mdFiles.size()); @@ -736,7 +744,7 @@ public void testSimpleOperationsUpload() throws Exception { assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); assertEquals(1, translog.readers.size()); assertBusy(() -> { - assertEquals(2, translog.allUploaded().size()); + assertEquals(4, translog.allUploaded().size()); assertEquals( 4, blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size() @@ -755,7 +763,7 @@ public void testSimpleOperationsUpload() throws Exception { assertEquals(1, translog.readers.size()); assertEquals(1, translog.stats().estimatedNumberOfOperations()); assertBusy(() -> { - assertEquals(2, translog.allUploaded().size()); + assertEquals(4, translog.allUploaded().size()); assertEquals( 4, blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size() @@ -774,7 +782,7 @@ public void testMetadataFileDeletion() throws Exception { assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); assertEquals(1, translog.readers.size()); } - assertBusy(() -> assertEquals(2, translog.allUploaded().size())); + assertBusy(() -> assertEquals(4, translog.allUploaded().size())); assertBusy(() -> assertEquals(1, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size())); int moreDocs = randomIntBetween(3, 10); logger.info("numDocs={} moreDocs={}", numDocs, moreDocs); @@ -872,7 +880,7 @@ public void testDrainSync() throws Exception { assertBusy(() -> assertEquals(0, latch.getCount())); assertEquals(0, translog.availablePermits()); slowDown.setSleepSeconds(0); - assertEquals(4, translog.allUploaded().size()); + assertEquals(6, translog.allUploaded().size()); assertEquals(2, translog.readers.size()); Set mdFiles = blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)); @@ -881,7 +889,7 @@ public void testDrainSync() throws Exception { translog.trimUnreferencedReaders(); assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); assertEquals(1, translog.readers.size()); - assertEquals(2, translog.allUploaded().size()); + assertEquals(6, translog.allUploaded().size()); assertEquals(mdFiles, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR))); // Case 4 - After drainSync, if an upload is an attempted, we do not upload to remote store. @@ -891,21 +899,21 @@ public void testDrainSync() throws Exception { new Translog.Index(String.valueOf(2), 2, primaryTerm.get(), new byte[] { 1 }) ); assertEquals(1, translog.readers.size()); - assertEquals(2, translog.allUploaded().size()); + assertEquals(6, translog.allUploaded().size()); assertEquals(mdFiles, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR))); // Refill the permits back Releasables.close(releasable); addToTranslogAndListAndUpload(translog, ops, new Translog.Index(String.valueOf(3), 3, primaryTerm.get(), new byte[] { 1 })); assertEquals(2, translog.readers.size()); - assertEquals(4, translog.allUploaded().size()); + assertEquals(8, translog.allUploaded().size()); assertEquals(3, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()); translog.setMinSeqNoToKeep(3); translog.trimUnreferencedReaders(); assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); assertEquals(1, translog.readers.size()); - assertBusy(() -> assertEquals(2, translog.allUploaded().size())); + assertBusy(() -> assertEquals(4, translog.allUploaded().size())); assertBusy(() -> assertEquals(1, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size())); } From 3fc0139ca68a1ff843ec1492c3cd52c2c4c67f02 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Tue, 3 Sep 2024 23:42:26 +0530 Subject: [PATCH 3/3] Optimise snapshot deletion to speed up snapshot deletion and creation (#15568) --------- Signed-off-by: Ashish Singh --- CHANGELOG.md | 1 + .../blobstore/BlobStoreRepository.java | 92 ++++++++++++++++--- .../org/opensearch/threadpool/ThreadPool.java | 7 ++ .../threadpool/ScalingThreadPoolTests.java | 1 + 4 files changed, 86 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 93a98de2c5af8..4df028f12d2fe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -46,6 +46,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Remote Publication] Add remote download stats ([#15291](https://github.com/opensearch-project/OpenSearch/pull/15291))) - Add support for comma-separated list of index names to be used with Snapshot Status API ([#15409](https://github.com/opensearch-project/OpenSearch/pull/15409)) - Add prefix support to hashed prefix & infix path types on remote store ([#15557](https://github.com/opensearch-project/OpenSearch/pull/15557)) +- Optimise snapshot deletion to speed up snapshot deletion and creation ([#15568](https://github.com/opensearch-project/OpenSearch/pull/15568)) - [Remote Publication] Added checksum validation for cluster state behind a cluster setting ([#15218](https://github.com/opensearch-project/OpenSearch/pull/15218)) ### Dependencies diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index d193ab3c14154..aff48f717eb44 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -66,6 +66,7 @@ import org.opensearch.common.Nullable; import org.opensearch.common.Numbers; import org.opensearch.common.Priority; +import org.opensearch.common.Randomness; import org.opensearch.common.SetOnce; import org.opensearch.common.UUIDs; import org.opensearch.common.blobstore.BlobContainer; @@ -831,7 +832,7 @@ boolean getPrefixModeVerification() { * maintains single lazy instance of {@link BlobContainer} */ protected BlobContainer blobContainer() { - // assertSnapshotOrGenericThread(); + assertSnapshotOrGenericThread(); BlobContainer blobContainer = this.blobContainer.get(); if (blobContainer == null) { @@ -1204,6 +1205,10 @@ private void asyncCleanupUnlinkedShardLevelBlobs( ActionListener listener ) { final List> filesToDelete = resolveFilesToDelete(oldRepositoryData, snapshotIds, deleteResults); + long startTimeNs = System.nanoTime(); + Randomness.shuffle(filesToDelete); + logger.debug("[{}] shuffled the filesToDelete with timeElapsedNs={}", metadata.name(), (System.nanoTime() - startTimeNs)); + if (filesToDelete.isEmpty()) { listener.onResponse(null); return; @@ -1221,8 +1226,8 @@ private void asyncCleanupUnlinkedShardLevelBlobs( staleFilesToDeleteInBatch.size() ); - // Start as many workers as fit into the snapshot pool at once at the most - final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), staleFilesToDeleteInBatch.size()); + // Start as many workers as fit into the snapshot_deletion pool at once at the most + final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT_DELETION).getMax(), staleFilesToDeleteInBatch.size()); for (int i = 0; i < workers; ++i) { executeStaleShardDelete(staleFilesToDeleteInBatch, remoteStoreLockManagerFactory, groupedListener); } @@ -1326,7 +1331,7 @@ private void executeStaleShardDelete( if (filesToDelete == null) { return; } - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> { + threadPool.executor(ThreadPool.Names.SNAPSHOT_DELETION).execute(ActionRunnable.wrap(listener, l -> { try { // filtering files for which remote store lock release and cleanup succeeded, // remaining files for which it failed will be retried in next snapshot delete run. @@ -1390,7 +1395,7 @@ private void writeUpdatedShardMetaDataAndComputeDeletes( ActionListener> onAllShardsCompleted ) { - final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); + final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT_DELETION); final List indices = oldRepositoryData.indicesToUpdateAfterRemovingSnapshot(snapshotIds); if (indices.isEmpty()) { @@ -1578,7 +1583,7 @@ private void cleanupStaleBlobs( listener.onResponse(deleteResult); }, listener::onFailure), 2); - final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); + final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT_DELETION); final List staleRootBlobs = staleRootBlobs(newRepoData, rootBlobs.keySet()); if (staleRootBlobs.isEmpty()) { groupedListener.onResponse(DeleteResult.ZERO); @@ -1781,7 +1786,7 @@ void cleanupStaleIndices( // Start as many workers as fit into the snapshot pool at once at the most final int workers = Math.min( - threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), + threadPool.info(ThreadPool.Names.SNAPSHOT_DELETION).getMax(), foundIndices.size() - survivingIndexIds.size() ); for (int i = 0; i < workers; ++i) { @@ -1833,7 +1838,7 @@ private void executeOneStaleIndexDelete( return; } final String indexSnId = indexEntry.getKey(); - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(listener, () -> { + threadPool.executor(ThreadPool.Names.SNAPSHOT_DELETION).execute(ActionRunnable.supply(listener, () -> { try { logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId); List matchingShardPaths = findMatchingShardPaths(indexSnId, snapshotShardPaths); @@ -2097,8 +2102,7 @@ public void finalizeSnapshot( stateTransformer, repositoryUpdatePriority, ActionListener.wrap(newRepoData -> { - cleanupOldShardGens(existingRepositoryData, updatedRepositoryData); - listener.onResponse(newRepoData); + cleanupOldShardGens(existingRepositoryData, updatedRepositoryData, newRepoData, listener); }, onUpdateFailure) ); }, onUpdateFailure), 2 + indices.size()); @@ -2254,7 +2258,12 @@ private void logShardPathsOperationWarning(IndexId indexId, SnapshotId snapshotI } // Delete all old shard gen blobs that aren't referenced any longer as a result from moving to updated repository data - private void cleanupOldShardGens(RepositoryData existingRepositoryData, RepositoryData updatedRepositoryData) { + private void cleanupOldShardGens( + RepositoryData existingRepositoryData, + RepositoryData updatedRepositoryData, + RepositoryData newRepositoryData, + ActionListener listener + ) { final List toDelete = new ArrayList<>(); updatedRepositoryData.shardGenerations() .obsoleteShardGenerations(existingRepositoryData.shardGenerations()) @@ -2263,10 +2272,62 @@ private void cleanupOldShardGens(RepositoryData existingRepositoryData, Reposito (shardId, oldGen) -> toDelete.add(shardPath(indexId, shardId).buildAsString() + INDEX_FILE_PREFIX + oldGen) ) ); + if (toDelete.isEmpty()) { + listener.onResponse(newRepositoryData); + return; + } try { - deleteFromContainer(rootBlobContainer(), toDelete); + AtomicInteger counter = new AtomicInteger(); + Collection> subList = toDelete.stream() + .collect(Collectors.groupingBy(it -> counter.getAndIncrement() / maxShardBlobDeleteBatch)) + .values(); + final BlockingQueue> staleFilesToDeleteInBatch = new LinkedBlockingQueue<>(subList); + logger.info( + "[{}] cleanupOldShardGens toDeleteSize={} groupSize={}", + metadata.name(), + toDelete.size(), + staleFilesToDeleteInBatch.size() + ); + final GroupedActionListener groupedListener = new GroupedActionListener<>(ActionListener.wrap(r -> { + logger.info("[{}] completed cleanupOldShardGens", metadata.name()); + listener.onResponse(newRepositoryData); + }, ex -> { + logger.error(new ParameterizedMessage("[{}] exception in cleanupOldShardGens", metadata.name()), ex); + listener.onResponse(newRepositoryData); + }), staleFilesToDeleteInBatch.size()); + + // Start as many workers as fit into the snapshot pool at once at the most + final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT_DELETION).getMax(), staleFilesToDeleteInBatch.size()); + for (int i = 0; i < workers; ++i) { + executeOldShardGensCleanup(staleFilesToDeleteInBatch, groupedListener); + } } catch (Exception e) { - logger.warn("Failed to clean up old shard generation blobs", e); + logger.warn(new ParameterizedMessage(" [{}] Failed to clean up old shard generation blobs", metadata.name()), e); + listener.onResponse(newRepositoryData); + } + } + + private void executeOldShardGensCleanup(BlockingQueue> staleFilesToDeleteInBatch, GroupedActionListener listener) + throws InterruptedException { + List filesToDelete = staleFilesToDeleteInBatch.poll(0L, TimeUnit.MILLISECONDS); + if (filesToDelete != null) { + threadPool.executor(ThreadPool.Names.SNAPSHOT_DELETION).execute(ActionRunnable.wrap(listener, l -> { + try { + deleteFromContainer(rootBlobContainer(), filesToDelete); + l.onResponse(null); + } catch (Exception e) { + logger.warn( + () -> new ParameterizedMessage( + "[{}] Failed to delete following blobs during cleanupOldFiles : {}", + metadata.name(), + filesToDelete + ), + e + ); + l.onFailure(e); + } + executeOldShardGensCleanup(staleFilesToDeleteInBatch, listener); + })); } } @@ -2383,10 +2444,11 @@ public long getRemoteDownloadThrottleTimeInNanos() { } protected void assertSnapshotOrGenericThread() { - assert Thread.currentThread().getName().contains('[' + ThreadPool.Names.SNAPSHOT + ']') + assert Thread.currentThread().getName().contains('[' + ThreadPool.Names.SNAPSHOT_DELETION + ']') + || Thread.currentThread().getName().contains('[' + ThreadPool.Names.SNAPSHOT + ']') || Thread.currentThread().getName().contains('[' + ThreadPool.Names.GENERIC + ']') : "Expected current thread [" + Thread.currentThread() - + "] to be the snapshot or generic thread."; + + "] to be the snapshot_deletion or snapshot or generic thread."; } @Override diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java index 056ef0fac0153..81220ab171b34 100644 --- a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java @@ -105,6 +105,7 @@ public static class Names { public static final String REFRESH = "refresh"; public static final String WARMER = "warmer"; public static final String SNAPSHOT = "snapshot"; + public static final String SNAPSHOT_DELETION = "snapshot_deletion"; public static final String FORCE_MERGE = "force_merge"; public static final String FETCH_SHARD_STARTED = "fetch_shard_started"; public static final String FETCH_SHARD_STORE = "fetch_shard_store"; @@ -176,6 +177,7 @@ public static ThreadPoolType fromType(String type) { map.put(Names.REFRESH, ThreadPoolType.SCALING); map.put(Names.WARMER, ThreadPoolType.SCALING); map.put(Names.SNAPSHOT, ThreadPoolType.SCALING); + map.put(Names.SNAPSHOT_DELETION, ThreadPoolType.SCALING); map.put(Names.FORCE_MERGE, ThreadPoolType.FIXED); map.put(Names.FETCH_SHARD_STARTED, ThreadPoolType.SCALING); map.put(Names.FETCH_SHARD_STORE, ThreadPoolType.SCALING); @@ -234,6 +236,7 @@ public ThreadPool( final int halfProcMaxAt5 = halfAllocatedProcessorsMaxFive(allocatedProcessors); final int halfProcMaxAt10 = halfAllocatedProcessorsMaxTen(allocatedProcessors); final int genericThreadPoolMax = boundedBy(4 * allocatedProcessors, 128, 512); + final int snapshotDeletionPoolMax = boundedBy(4 * allocatedProcessors, 64, 256); builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30))); builders.put(Names.WRITE, new FixedExecutorBuilder(settings, Names.WRITE, allocatedProcessors, 10000)); builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, allocatedProcessors, 1000)); @@ -251,6 +254,10 @@ public ThreadPool( builders.put(Names.REFRESH, new ScalingExecutorBuilder(Names.REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5))); builders.put(Names.WARMER, new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); builders.put(Names.SNAPSHOT, new ScalingExecutorBuilder(Names.SNAPSHOT, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); + builders.put( + Names.SNAPSHOT_DELETION, + new ScalingExecutorBuilder(Names.SNAPSHOT_DELETION, 1, snapshotDeletionPoolMax, TimeValue.timeValueMinutes(5)) + ); builders.put( Names.FETCH_SHARD_STARTED, new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * allocatedProcessors, TimeValue.timeValueMinutes(5)) diff --git a/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java b/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java index d8f04a11fe494..4f59f9688fb7e 100644 --- a/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java +++ b/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java @@ -148,6 +148,7 @@ private int expectedSize(final String threadPoolName, final int numberOfProcesso sizes.put(ThreadPool.Names.REFRESH, ThreadPool::halfAllocatedProcessorsMaxTen); sizes.put(ThreadPool.Names.WARMER, ThreadPool::halfAllocatedProcessorsMaxFive); sizes.put(ThreadPool.Names.SNAPSHOT, ThreadPool::halfAllocatedProcessorsMaxFive); + sizes.put(ThreadPool.Names.SNAPSHOT_DELETION, n -> ThreadPool.boundedBy(4 * n, 64, 256)); sizes.put(ThreadPool.Names.FETCH_SHARD_STARTED, ThreadPool::twiceAllocatedProcessors); sizes.put(ThreadPool.Names.FETCH_SHARD_STORE, ThreadPool::twiceAllocatedProcessors); sizes.put(ThreadPool.Names.TRANSLOG_TRANSFER, ThreadPool::halfAllocatedProcessors);