From 7586b7d1d98a97786983bf36e052b7f76a232531 Mon Sep 17 00:00:00 2001 From: Sandeep Kumawat <2025sandeepkumawat@gmail.com> Date: Tue, 14 May 2024 17:37:14 +0530 Subject: [PATCH] address commens and add ut's Signed-off-by: Sandeep Kumawat <2025sandeepkumawat@gmail.com> --- .../metadata/MetadataCreateIndexService.java | 4 +- .../common/blobstore/BlobStore.java | 4 +- .../common/settings/ClusterSettings.java | 2 +- .../org/opensearch/index/IndexSettings.java | 8 +- .../RemoteMigrationIndexMetadataUpdater.java | 2 +- .../RemoteStoreCustomMetadataResolver.java | 4 +- .../index/remote/RemoteStoreEnums.java | 2 +- .../index/remote/RemoteStoreUtils.java | 14 +- .../opensearch/index/shard/IndexShard.java | 4 +- .../index/translog/RemoteFsTranslog.java | 27 +- .../transfer/BlobStoreTransferService.java | 29 +- .../index/translog/transfer/FileSnapshot.java | 10 + .../translog/transfer/TransferService.java | 3 +- .../translog/transfer/TransferSnapshot.java | 8 +- .../TranslogCheckpointTransferSnapshot.java | 14 +- .../transfer/TranslogTransferManager.java | 60 ++-- .../indices/RemoteStoreSettings.java | 18 +- ...oteMigrationIndexMetadataUpdaterTests.java | 2 +- ...emoteStoreCustomMetadataResolverTests.java | 14 +- ...oreTransferServiceMockRepositoryTests.java | 21 +- .../BlobStoreTransferServiceTests.java | 39 +++ .../TranslogTransferManagerTests.java | 265 +++++++++++++----- 22 files changed, 353 insertions(+), 201 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java index dcdf33f7216aa..fc53e86d1ec8d 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java @@ -590,8 +590,8 @@ public void addRemoteStoreCustomMetadata(IndexMetadata.Builder tmpImdBuilder, bo Map remoteCustomData = new HashMap<>(); // Determine if the ckp would be stored as translog metadata - boolean isCkpAsTranslogMetadata = remoteStoreCustomMetadataResolver.isCkpAsTranslogMetadata(); - remoteCustomData.put(RemoteStoreEnums.CKP_AS_METADATA, Boolean.toString(isCkpAsTranslogMetadata)); + boolean isTranslogMetadataEnabled = remoteStoreCustomMetadataResolver.isTranslogMetadataEnabled(); + remoteCustomData.put(RemoteStoreEnums.TRANSLOG_METADATA, Boolean.toString(isTranslogMetadataEnabled)); // Determine the path type for use using the remoteStorePathResolver. RemoteStorePathStrategy newPathStrategy = remoteStoreCustomMetadataResolver.getPathStrategy(); diff --git a/server/src/main/java/org/opensearch/common/blobstore/BlobStore.java b/server/src/main/java/org/opensearch/common/blobstore/BlobStore.java index b3c09c2bc81eb..406ccc6aa4a18 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/BlobStore.java +++ b/server/src/main/java/org/opensearch/common/blobstore/BlobStore.java @@ -72,9 +72,9 @@ default Map> extendedStats() { default void reload(RepositoryMetadata repositoryMetadata) {} /** - * Returns a boolean indicating if blobStore support object metadata upload + * Returns a boolean indicating if blobStore has object metadata support enabled */ - default boolean isBlobMetadataSupported() { + default boolean isBlobMetadataEnabled() { return false; } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 78c8adbd379d3..dc69d397d6402 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -740,7 +740,7 @@ public void apply(Settings value, Settings current, Settings previous) { RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING, RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING, RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS, - RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_CKP_AS_METADATA + RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_TRANSLOG_METADATA ) ) ); diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index b1b805fbe6b7a..8ca8faa262a90 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -763,7 +763,7 @@ public static IndexMergePolicy fromString(String text) { private final boolean widenIndexSortType; private final boolean assignedOnRemoteNode; private final RemoteStorePathStrategy remoteStorePathStrategy; - private final boolean ckpAsTranslogMetadata; + private final boolean isTranslogMetadataEnabled; /** * The maximum age of a retention lease before it is considered expired. @@ -990,7 +990,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti assignedOnRemoteNode = RemoteStoreNodeAttribute.isRemoteDataAttributePresent(this.getNodeSettings()); remoteStorePathStrategy = RemoteStoreUtils.determineRemoteStorePathStrategy(indexMetadata); - ckpAsTranslogMetadata = RemoteStoreUtils.determineCkpAsTranslogMetadata(indexMetadata); + isTranslogMetadataEnabled = RemoteStoreUtils.determineisTranslogMetadataEnabled(indexMetadata); setEnableFuzzySetForDocId(scopedSettings.get(INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING)); setDocIdFuzzySetFalsePositiveProbability(scopedSettings.get(INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING)); @@ -1915,7 +1915,7 @@ public RemoteStorePathStrategy getRemoteStorePathStrategy() { return remoteStorePathStrategy; } - public boolean isCkpAsTranslogMetadata() { - return ckpAsTranslogMetadata; + public boolean isTranslogMetadataEnabled() { + return isTranslogMetadataEnabled; } } diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteMigrationIndexMetadataUpdater.java b/server/src/main/java/org/opensearch/index/remote/RemoteMigrationIndexMetadataUpdater.java index 5a03ad8eb14dd..b9fff0d8ae197 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteMigrationIndexMetadataUpdater.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteMigrationIndexMetadataUpdater.java @@ -171,7 +171,7 @@ public static boolean indexHasRemoteCustomMetadata(IndexMetadata indexMetadata) Map customMetadata = indexMetadata.getCustomData(REMOTE_STORE_CUSTOM_KEY); return Objects.nonNull(customMetadata) && Objects.nonNull(customMetadata.get(PathType.NAME)) - && Objects.nonNull(customMetadata.get(RemoteStoreEnums.CKP_AS_METADATA)); + && Objects.nonNull(customMetadata.get(RemoteStoreEnums.TRANSLOG_METADATA)); } public static void updateRemoteStoreSettings(Settings.Builder settingsBuilder, String segmentRepository, String translogRepository) { diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStoreCustomMetadataResolver.java b/server/src/main/java/org/opensearch/index/remote/RemoteStoreCustomMetadataResolver.java index d481a9cc5421c..83025a2a11b60 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteStoreCustomMetadataResolver.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStoreCustomMetadataResolver.java @@ -42,8 +42,8 @@ public RemoteStorePathStrategy getPathStrategy() { return new RemoteStorePathStrategy(pathType, pathHashAlgorithm); } - public boolean isCkpAsTranslogMetadata() { - return Version.CURRENT.compareTo(minNodeVersionSupplier.get()) <= 0 && remoteStoreSettings.isCkpAsTranslogMetadata(); + public boolean isTranslogMetadataEnabled() { + return Version.V_2_15_0.compareTo(minNodeVersionSupplier.get()) <= 0 && remoteStoreSettings.isTranslogMetadataEnabled(); } } diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStoreEnums.java b/server/src/main/java/org/opensearch/index/remote/RemoteStoreEnums.java index 8da780c1f477f..1d5c7a217f1e7 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteStoreEnums.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStoreEnums.java @@ -36,7 +36,7 @@ @ExperimentalApi public class RemoteStoreEnums { - public static final String CKP_AS_METADATA = "ckp-as-metadata"; + public static final String TRANSLOG_METADATA = "translog-metadata"; /** * Categories of the data in Remote store. diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java b/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java index 3b307e72dafa5..7df9214be18f3 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java @@ -32,7 +32,7 @@ import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING; import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING; -import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_CKP_AS_METADATA; +import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_TRANSLOG_METADATA; /** * Utils for remote store @@ -185,11 +185,11 @@ public static RemoteStorePathStrategy determineRemoteStorePathStrategy(IndexMeta /** * Determines whether translog ckp upload as metadata allowed or not */ - public static boolean determineCkpAsTranslogMetadata(IndexMetadata indexMetadata) { + public static boolean determineisTranslogMetadataEnabled(IndexMetadata indexMetadata) { Map remoteCustomData = indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY); - assert remoteCustomData == null || remoteCustomData.containsKey(RemoteStoreEnums.CKP_AS_METADATA); - if (remoteCustomData != null && remoteCustomData.containsKey(RemoteStoreEnums.CKP_AS_METADATA)) { - return Boolean.parseBoolean(remoteCustomData.get(RemoteStoreEnums.CKP_AS_METADATA)); + assert remoteCustomData == null || remoteCustomData.containsKey(RemoteStoreEnums.TRANSLOG_METADATA); + if (remoteCustomData != null && remoteCustomData.containsKey(RemoteStoreEnums.TRANSLOG_METADATA)) { + return Boolean.parseBoolean(remoteCustomData.get(RemoteStoreEnums.TRANSLOG_METADATA)); } return false; } @@ -209,8 +209,8 @@ public static Map determineRemoteStoreCustomMetadataDuringMigrat Version minNodeVersion = discoveryNodes.getMinNodeVersion(); boolean ckpAsMetadata = Version.CURRENT.compareTo(minNodeVersion) <= 0 - && CLUSTER_REMOTE_STORE_TRANSLOG_CKP_AS_METADATA.get(clusterSettings); - remoteCustomData.put(RemoteStoreEnums.CKP_AS_METADATA, Boolean.toString(ckpAsMetadata)); + && CLUSTER_REMOTE_STORE_TRANSLOG_TRANSLOG_METADATA.get(clusterSettings); + remoteCustomData.put(RemoteStoreEnums.TRANSLOG_METADATA, Boolean.toString(ckpAsMetadata)); RemoteStoreEnums.PathType pathType = Version.CURRENT.compareTo(minNodeVersion) <= 0 ? CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.get(clusterSettings) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 72439651a63db..f154e6f15cd2c 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -4982,7 +4982,7 @@ public void deleteTranslogFilesFromRemoteTranslog() throws IOException { getThreadPool(), indexSettings.getRemoteStorePathStrategy(), remoteStoreSettings, - indexSettings().isCkpAsTranslogMetadata() + indexSettings().isTranslogMetadataEnabled() ); } @@ -5009,7 +5009,7 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException { remoteStoreSettings, logger, shouldSeedRemoteStore(), - indexSettings().isCkpAsTranslogMetadata() + indexSettings().isTranslogMetadataEnabled() ); } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 71eee063d64ce..e294efea3854f 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -91,7 +91,7 @@ public class RemoteFsTranslog extends Translog { private static final int SYNC_PERMIT = 1; private final Semaphore syncPermit = new Semaphore(SYNC_PERMIT); private final AtomicBoolean pauseSync = new AtomicBoolean(false); - boolean ckpAsTranslogMetadata; + boolean isTranslogMetadataEnabled; public RemoteFsTranslog( TranslogConfig config, @@ -111,8 +111,7 @@ public RemoteFsTranslog( this.startedPrimarySupplier = startedPrimarySupplier; this.remoteTranslogTransferTracker = remoteTranslogTransferTracker; fileTransferTracker = new FileTransferTracker(shardId, remoteTranslogTransferTracker); - ckpAsTranslogMetadata = isCkpAsTranslogMetadata(indexSettings().isCkpAsTranslogMetadata(), blobStoreRepository); - ; + isTranslogMetadataEnabled = isTranslogMetadataEnabled(indexSettings().isTranslogMetadataEnabled(), blobStoreRepository); this.translogTransferManager = buildTranslogTransferManager( blobStoreRepository, threadPool, @@ -121,7 +120,7 @@ public RemoteFsTranslog( remoteTranslogTransferTracker, indexSettings().getRemoteStorePathStrategy(), remoteStoreSettings, - ckpAsTranslogMetadata + isTranslogMetadataEnabled ); try { download(translogTransferManager, location, logger, config.shouldSeedRemote()); @@ -160,8 +159,8 @@ public RemoteFsTranslog( } } - private static boolean isCkpAsTranslogMetadata(boolean ckpAsTranslogMetadata, BlobStoreRepository blobStoreRepository) { - return blobStoreRepository.blobStore().isBlobMetadataSupported() && ckpAsTranslogMetadata; + private static boolean isTranslogMetadataEnabled(boolean isTranslogMetadataEnabled, BlobStoreRepository blobStoreRepository) { + return blobStoreRepository.blobStore().isBlobMetadataEnabled() && isTranslogMetadataEnabled; } // visible for testing @@ -178,7 +177,7 @@ public static void download( RemoteStoreSettings remoteStoreSettings, Logger logger, boolean seedRemote, - boolean ckpAsTranslogMetadata + boolean isTranslogMetadataEnabled ) throws IOException { assert repository instanceof BlobStoreRepository : String.format( Locale.ROOT, @@ -188,7 +187,7 @@ public static void download( BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository; // We use a dummy stats tracker to ensure the flow doesn't break. // TODO: To be revisited as part of https://github.com/opensearch-project/OpenSearch/issues/7567 - ckpAsTranslogMetadata = isCkpAsTranslogMetadata(ckpAsTranslogMetadata, blobStoreRepository); + isTranslogMetadataEnabled = isTranslogMetadataEnabled(isTranslogMetadataEnabled, blobStoreRepository); RemoteTranslogTransferTracker remoteTranslogTransferTracker = new RemoteTranslogTransferTracker(shardId, 1000); FileTransferTracker fileTransferTracker = new FileTransferTracker(shardId, remoteTranslogTransferTracker); TranslogTransferManager translogTransferManager = buildTranslogTransferManager( @@ -199,7 +198,7 @@ public static void download( remoteTranslogTransferTracker, pathStrategy, remoteStoreSettings, - ckpAsTranslogMetadata + isTranslogMetadataEnabled ); RemoteFsTranslog.download(translogTransferManager, location, logger, seedRemote); logger.trace(remoteTranslogTransferTracker.toString()); @@ -305,7 +304,7 @@ public static TranslogTransferManager buildTranslogTransferManager( RemoteTranslogTransferTracker tracker, RemoteStorePathStrategy pathStrategy, RemoteStoreSettings remoteStoreSettings, - boolean ckpAsTranslogMetadata + boolean isTranslogMetadataEnabled ) { assert Objects.nonNull(pathStrategy); String indexUUID = shardId.getIndex().getUUID(); @@ -335,7 +334,7 @@ public static TranslogTransferManager buildTranslogTransferManager( fileTransferTracker, tracker, remoteStoreSettings, - ckpAsTranslogMetadata + isTranslogMetadataEnabled ); } @@ -614,13 +613,13 @@ public static void cleanup( ThreadPool threadPool, RemoteStorePathStrategy pathStrategy, RemoteStoreSettings remoteStoreSettings, - boolean ckpAsTranslogMetadata + boolean isTranslogMetadataEnabled ) throws IOException { assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository"; BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository; // We use a dummy stats tracker to ensure the flow doesn't break. // TODO: To be revisited as part of https://github.com/opensearch-project/OpenSearch/issues/7567 - ckpAsTranslogMetadata = isCkpAsTranslogMetadata(ckpAsTranslogMetadata, blobStoreRepository); + isTranslogMetadataEnabled = isTranslogMetadataEnabled(isTranslogMetadataEnabled, blobStoreRepository); RemoteTranslogTransferTracker remoteTranslogTransferTracker = new RemoteTranslogTransferTracker(shardId, 1000); FileTransferTracker fileTransferTracker = new FileTransferTracker(shardId, remoteTranslogTransferTracker); TranslogTransferManager translogTransferManager = buildTranslogTransferManager( @@ -631,7 +630,7 @@ public static void cleanup( remoteTranslogTransferTracker, pathStrategy, remoteStoreSettings, - ckpAsTranslogMetadata + isTranslogMetadataEnabled ); // clean up all remote translog files translogTransferManager.deleteTranslogFiles(); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java index 7c81b6bebff6c..4d6d6c394e829 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -95,29 +95,34 @@ public void uploadBlobs( Set fileSnapshots, final Map blobPaths, ActionListener listener, - WritePriority writePriority, - final Map transferFileMetadata + WritePriority writePriority ) { fileSnapshots.forEach(fileSnapshot -> { BlobPath blobPath = blobPaths.get(fileSnapshot.getPrimaryTerm()); - InputStream fileMetadata = transferFileMetadata.get(fileSnapshot); if (!(blobStore.blobContainer(blobPath) instanceof AsyncMultiStreamBlobContainer)) { uploadBlob(ThreadPool.Names.TRANSLOG_TRANSFER, fileSnapshot, blobPath, listener, writePriority); } else { - uploadBlob(fileSnapshot, fileMetadata, listener, blobPath, writePriority); + uploadBlob(fileSnapshot, listener, blobPath, writePriority); } }); } - public Map buildTransferFileMetadata(InputStream fileMetadata) throws IOException { + // this function creates metadata of checkpoint file data to be associated with translog file. + static Map buildTransferFileMetadata(InputStream metadataInputStream) throws IOException { Map metadata = new HashMap<>(); - try (fileMetadata; ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) { - byte[] buffer = new byte[4096]; + try (metadataInputStream) { + byte[] buffer = new byte[128]; int bytesRead; + int totalBytesRead = 0; + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - while ((bytesRead = fileMetadata.read(buffer)) != -1) { + while ((bytesRead = metadataInputStream.read(buffer)) != -1) { byteArrayOutputStream.write(buffer, 0, bytesRead); + totalBytesRead += bytesRead; + if (totalBytesRead > 1024) { + throw new AssertionError("Input stream exceeds 1KB limit"); + } } byte[] bytes = byteArrayOutputStream.toByteArray(); @@ -129,7 +134,6 @@ public Map buildTransferFileMetadata(InputStream fileMetadata) t private void uploadBlob( TransferFileSnapshot fileSnapshot, - InputStream fileMetadata, ActionListener listener, BlobPath blobPath, WritePriority writePriority @@ -138,9 +142,10 @@ private void uploadBlob( try { ChannelFactory channelFactory = FileChannel::open; Map metadata = null; - if (fileMetadata != null) { - metadata = buildTransferFileMetadata(fileMetadata); + if (fileSnapshot.getMetadataFileInputStream() != null) { + metadata = buildTransferFileMetadata(fileSnapshot.getMetadataFileInputStream()); } + long contentLength; try (FileChannel channel = channelFactory.open(fileSnapshot.getPath(), StandardOpenOption.READ)) { contentLength = channel.size(); @@ -198,7 +203,7 @@ public InputStream downloadBlob(Iterable path, String fileName) throws I @ExperimentalApi @Override public FetchBlobResult downloadBlobWithMetadata(Iterable path, String fileName) throws IOException { - assert blobStore.isBlobMetadataSupported(); + assert blobStore.isBlobMetadataEnabled(); return blobStore.blobContainer((BlobPath) path).readBlobWithMetadata(fileName); } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java index dcec94edd694f..86f042af0584b 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java @@ -108,6 +108,8 @@ public static class TransferFileSnapshot extends FileSnapshot { private final long primaryTerm; private Long checksum; + @Nullable + private InputStream metadataFileInputStream; public TransferFileSnapshot(Path path, long primaryTerm, Long checksum) throws IOException { super(path); @@ -128,6 +130,14 @@ public long getPrimaryTerm() { return primaryTerm; } + public void setMetadataFileInputStream(InputStream inputStream) { + this.metadataFileInputStream = inputStream; + } + + public InputStream getMetadataFileInputStream() { + return metadataFileInputStream; + } + @Override public int hashCode() { return Objects.hash(primaryTerm, super.hashCode()); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java index 3fb246705bba9..0894ebf500ebd 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java @@ -54,8 +54,7 @@ void uploadBlobs( Set fileSnapshots, final Map blobPaths, ActionListener listener, - WritePriority writePriority, - final Map transferFileMetadata + WritePriority writePriority ) throws Exception; /** diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshot.java index 0f2aa1ab05c92..6dcdc8f8cf44a 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshot.java @@ -12,7 +12,7 @@ import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; import org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot; -import java.util.Map; +import java.io.IOException; import java.util.Set; /** @@ -42,8 +42,8 @@ public interface TransferSnapshot { TranslogTransferMetadata getTranslogTransferMetadata(); /** - * The map of translog to checkpoint file snapshot of this {@link TransferSnapshot} - * @return the map of translog and checkpoint file snapshot + * The snapshot of the translog generational files having checkpoint file inputStream as metadata + * @return the set of translog files having checkpoint file inputStream as metadata. */ - Map getTranslogCheckpointSnapshotMap(); + Set getTranslogFileSnapshotWithMetadata() throws IOException; } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java index def68efb09ec2..ae007c0c33e1e 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java @@ -15,11 +15,9 @@ import java.io.IOException; import java.nio.file.Path; import java.util.ArrayList; -import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -66,10 +64,14 @@ public Set getTranslogFileSnapshots() { return translogCheckpointFileInfoTupleSet.stream().map(Tuple::v1).collect(Collectors.toSet()); } - public Map getTranslogCheckpointSnapshotMap() { - Map tlogCkpSnapshots = new HashMap<>(); - translogCheckpointFileInfoTupleSet.forEach(tuple -> tlogCkpSnapshots.put(tuple.v1(), tuple.v2())); - return tlogCkpSnapshots; + @Override + public Set getTranslogFileSnapshotWithMetadata() throws IOException { + for (Tuple tuple : translogCheckpointFileInfoTupleSet) { + TransferFileSnapshot translogFileSnapshot = tuple.v1(); + TransferFileSnapshot checkpointFileSnapshot = tuple.v2(); + translogFileSnapshot.setMetadataFileInputStream(checkpointFileSnapshot.inputStream()); + } + return getTranslogFileSnapshots(); } @Override diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 464b28462e4a6..7af21ba63a929 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -65,7 +65,8 @@ public class TranslogTransferManager { private final RemoteTranslogTransferTracker remoteTranslogTransferTracker; private final RemoteStoreSettings remoteStoreSettings; private static final int METADATA_FILES_TO_FETCH = 10; - private final boolean ckpAsTranslogMetadata; + // boolean flag indicates if checkpoint file should upload/download with translog file object metadata + private final boolean isTranslogMetadataEnabled; final static String CHECKPOINT_FILE_DATA_KEY = "ckp-data"; private final Logger logger; @@ -84,7 +85,7 @@ public TranslogTransferManager( FileTransferTracker fileTransferTracker, RemoteTranslogTransferTracker remoteTranslogTransferTracker, RemoteStoreSettings remoteStoreSettings, - boolean ckpAsTranslogMetadata + boolean isTranslogMetadataEnabled ) { this.shardId = shardId; this.transferService = transferService; @@ -94,7 +95,7 @@ public TranslogTransferManager( this.logger = Loggers.getLogger(getClass(), shardId); this.remoteTranslogTransferTracker = remoteTranslogTransferTracker; this.remoteStoreSettings = remoteStoreSettings; - this.ckpAsTranslogMetadata = ckpAsTranslogMetadata; + this.isTranslogMetadataEnabled = isTranslogMetadataEnabled; } public RemoteTranslogTransferTracker getRemoteTranslogTransferTracker() { @@ -116,17 +117,8 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans long prevUploadTimeInMillis = remoteTranslogTransferTracker.getTotalUploadTimeInMillis(); try { - Map tlogCkpSnapshotMap = transferSnapshot.getTranslogCheckpointSnapshotMap(); - Map transferFileMetadata = new HashMap<>(); - if (ckpAsTranslogMetadata) { - toUpload.addAll(fileTransferTracker.exclusionFilter(transferSnapshot.getTranslogFileSnapshots())); - toUpload.forEach(fileSnapshot -> { - try { - transferFileMetadata.put(fileSnapshot, tlogCkpSnapshotMap.get(fileSnapshot).inputStream()); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); + if (isTranslogMetadataEnabled) { + toUpload.addAll(fileTransferTracker.exclusionFilter(transferSnapshot.getTranslogFileSnapshotWithMetadata())); } else { toUpload.addAll(fileTransferTracker.exclusionFilter(transferSnapshot.getTranslogFileSnapshots())); toUpload.addAll(fileTransferTracker.exclusionFilter((transferSnapshot.getCheckpointFileSnapshots()))); @@ -168,7 +160,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans // TODO: Ideally each file's upload start time should be when it is actually picked for upload // https://github.com/opensearch-project/OpenSearch/issues/9729 fileTransferTracker.recordFileTransferStartTime(uploadStartTime); - transferService.uploadBlobs(toUpload, blobPathMap, latchedActionListener, WritePriority.HIGH, transferFileMetadata); + transferService.uploadBlobs(toUpload, blobPathMap, latchedActionListener, WritePriority.HIGH); try { if (latch.await(remoteStoreSettings.getClusterRemoteTranslogTransferTimeout().millis(), TimeUnit.MILLISECONDS) == false) { @@ -257,14 +249,13 @@ public boolean downloadTranslog(String primaryTerm, String generation, Path loca ); String ckpFileName = Translog.getCommitCheckpointFileName(Long.parseLong(generation)); String translogFilename = Translog.getFilename(Long.parseLong(generation)); - if (ckpAsTranslogMetadata == false) { - // Download Checkpoint file from remote to local FS + if (isTranslogMetadataEnabled == false) { + // Download Checkpoint file, translog file from remote to local FS downloadToFS(ckpFileName, location, primaryTerm); - // Download translog file from remote to local FS downloadToFS(translogFilename, location, primaryTerm); } else { // Download translog.tlog file with object metadata from remote to local FS - Map metadata = downloadTranslogToFSAndGetMetadata(translogFilename, location, primaryTerm, generation); + Map metadata = downloadTranslogToFSAndGetMetadata(translogFilename, location, primaryTerm); try { assert metadata != null && !metadata.isEmpty() && metadata.containsKey(CHECKPOINT_FILE_DATA_KEY); recoverCkpFileUsingMetadata(metadata, location, generation, translogFilename); @@ -275,8 +266,7 @@ public boolean downloadTranslog(String primaryTerm, String generation, Path loca return true; } - private Map downloadTranslogToFSAndGetMetadata(String fileName, Path location, String primaryTerm, String generation) - throws IOException { + private Map downloadTranslogToFSAndGetMetadata(String fileName, Path location, String primaryTerm) throws IOException { Path filePath = location.resolve(fileName); // Here, we always override the existing file if present. // We need to change this logic when we introduce incremental download @@ -286,11 +276,11 @@ private Map downloadTranslogToFSAndGetMetadata(String fileName, long bytesToRead = 0, downloadStartTime = System.nanoTime(); Map metadata; - FetchBlobResult inputStreamWithMetadata = transferService.downloadBlobWithMetadata( - remoteDataTransferPath.add(primaryTerm), - fileName - ); try { + FetchBlobResult inputStreamWithMetadata = transferService.downloadBlobWithMetadata( + remoteDataTransferPath.add(primaryTerm), + fileName + ); InputStream inputStream = inputStreamWithMetadata.getInputStream(); metadata = inputStreamWithMetadata.getMetadata(); @@ -323,26 +313,18 @@ private void recoverCkpFileUsingMetadata(Map metadata, Path loca if (ckpDataBase64 == null) { logger.error("Error processing metadata for translog file: {}", fileName); throw new IllegalStateException( - "Checkpoint file data (key - ckp-data) is expected but not found in metadata for file: " + fileName + "Checkpoint file data key " + CHECKPOINT_FILE_DATA_KEY + " is expected but not found in metadata for file: " + fileName ); } byte[] ckpFileBytes = Base64.getDecoder().decode(ckpDataBase64); Files.write(filePath, ckpFileBytes); } - public void deleteFileIfExists(Path filePath) throws IOException { - if (Files.exists(filePath)) { - Files.delete(filePath); - } - } - private void downloadToFS(String fileName, Path location, String primaryTerm) throws IOException { Path filePath = location.resolve(fileName); // Here, we always override the existing file if present. // We need to change this logic when we introduce incremental download - if (Files.exists(filePath)) { - Files.delete(filePath); - } + deleteFileIfExists(filePath); boolean downloadStatus = false; long bytesToRead = 0, downloadStartTime = System.nanoTime(); @@ -362,6 +344,12 @@ private void downloadToFS(String fileName, Path location, String primaryTerm) th fileTransferTracker.add(fileName, true); } + private void deleteFileIfExists(Path filePath) throws IOException { + if (Files.exists(filePath)) { + Files.delete(filePath); + } + } + public TranslogTransferMetadata readMetadata() throws IOException { SetOnce metadataSetOnce = new SetOnce<>(); SetOnce exceptionSetOnce = new SetOnce<>(); @@ -482,7 +470,7 @@ public void deleteGenerationAsync(long primaryTerm, Set generations, Runna // Add .ckp and .tlog file to translog file list which is located in basePath/ String ckpFileName = Translog.getCommitCheckpointFileName(generation); String translogFileName = Translog.getFilename(generation); - if (ckpAsTranslogMetadata == false) { + if (isTranslogMetadataEnabled == false) { translogFiles.addAll(List.of(ckpFileName, translogFileName)); } else { translogFiles.add(translogFileName); diff --git a/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java b/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java index 44e6428506f12..6016d6ded361a 100644 --- a/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java +++ b/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java @@ -85,8 +85,8 @@ public class RemoteStoreSettings { * repositories that supports metadata read and write with metadata and is applicable for only remote store enabled clusters. */ @ExperimentalApi - public static final Setting CLUSTER_REMOTE_STORE_TRANSLOG_CKP_AS_METADATA = Setting.boolSetting( - "cluster.remote_store.index.translog.ckp_as_metadata", + public static final Setting CLUSTER_REMOTE_STORE_TRANSLOG_TRANSLOG_METADATA = Setting.boolSetting( + "cluster.remote_store.index.translog.translog_metadata", true, Property.NodeScope, Property.Dynamic @@ -123,7 +123,7 @@ public class RemoteStoreSettings { private volatile RemoteStoreEnums.PathType pathType; private volatile RemoteStoreEnums.PathHashAlgorithm pathHashAlgorithm; private volatile int maxRemoteTranslogReaders; - private volatile boolean ckpAsTranslogMetadata; + private volatile boolean isTranslogMetadataEnabled; public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) { clusterRemoteTranslogBufferInterval = CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(settings); @@ -147,8 +147,8 @@ public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) { pathType = clusterSettings.get(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING); clusterSettings.addSettingsUpdateConsumer(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING, this::setPathType); - ckpAsTranslogMetadata = clusterSettings.get(CLUSTER_REMOTE_STORE_TRANSLOG_CKP_AS_METADATA); - clusterSettings.addSettingsUpdateConsumer(CLUSTER_REMOTE_STORE_TRANSLOG_CKP_AS_METADATA, this::setCkpAsTranslogMetadata); + isTranslogMetadataEnabled = clusterSettings.get(CLUSTER_REMOTE_STORE_TRANSLOG_TRANSLOG_METADATA); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_REMOTE_STORE_TRANSLOG_TRANSLOG_METADATA, this::setisTranslogMetadataEnabled); pathHashAlgorithm = clusterSettings.get(CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING); clusterSettings.addSettingsUpdateConsumer(CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING, this::setPathHashAlgorithm); @@ -195,12 +195,12 @@ private void setPathType(RemoteStoreEnums.PathType pathType) { this.pathType = pathType; } - private void setCkpAsTranslogMetadata(boolean ckpAsTranslogMetadata) { - this.ckpAsTranslogMetadata = ckpAsTranslogMetadata; + private void setisTranslogMetadataEnabled(boolean isTranslogMetadataEnabled) { + this.isTranslogMetadataEnabled = isTranslogMetadataEnabled; } - public boolean isCkpAsTranslogMetadata() { - return ckpAsTranslogMetadata; + public boolean isTranslogMetadataEnabled() { + return isTranslogMetadataEnabled; } private void setPathHashAlgorithm(RemoteStoreEnums.PathHashAlgorithm pathHashAlgorithm) { diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteMigrationIndexMetadataUpdaterTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteMigrationIndexMetadataUpdaterTests.java index ad1b2fa72c389..f87701468b1e6 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteMigrationIndexMetadataUpdaterTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteMigrationIndexMetadataUpdaterTests.java @@ -303,7 +303,7 @@ public static Metadata createIndexMetadataWithRemoteStoreSettings(String indexNa "dummy", RemoteStoreEnums.PathHashAlgorithm.NAME, "dummy", - RemoteStoreEnums.CKP_AS_METADATA, + RemoteStoreEnums.TRANSLOG_METADATA, "dummy" ) ) diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteStoreCustomMetadataResolverTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteStoreCustomMetadataResolverTests.java index 8b7fe5cc6b385..690466f8c2a01 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteStoreCustomMetadataResolverTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteStoreCustomMetadataResolverTests.java @@ -18,7 +18,7 @@ import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING; import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING; -import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_CKP_AS_METADATA; +import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_TRANSLOG_METADATA; public class RemoteStoreCustomMetadataResolverTests extends OpenSearchTestCase { @@ -148,27 +148,27 @@ public void testGetPathStrategyStrategyWithDynamicUpdate() { } public void testTranslogCkpAsMetadataAllowedTrueWithMinVersionNewer() { - Settings settings = Settings.builder().put(CLUSTER_REMOTE_STORE_TRANSLOG_CKP_AS_METADATA.getKey(), true).build(); + Settings settings = Settings.builder().put(CLUSTER_REMOTE_STORE_TRANSLOG_TRANSLOG_METADATA.getKey(), true).build(); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); RemoteStoreSettings remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings); RemoteStoreCustomMetadataResolver resolver = new RemoteStoreCustomMetadataResolver(remoteStoreSettings, () -> Version.CURRENT); - assertTrue(resolver.isCkpAsTranslogMetadata()); + assertTrue(resolver.isTranslogMetadataEnabled()); } public void testTranslogCkpAsMetadataAllowedFalseWithMinVersionNewer() { - Settings settings = Settings.builder().put(CLUSTER_REMOTE_STORE_TRANSLOG_CKP_AS_METADATA.getKey(), false).build(); + Settings settings = Settings.builder().put(CLUSTER_REMOTE_STORE_TRANSLOG_TRANSLOG_METADATA.getKey(), false).build(); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); RemoteStoreSettings remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings); RemoteStoreCustomMetadataResolver resolver = new RemoteStoreCustomMetadataResolver(remoteStoreSettings, () -> Version.CURRENT); - assertFalse(resolver.isCkpAsTranslogMetadata()); + assertFalse(resolver.isTranslogMetadataEnabled()); } public void testTranslogCkpAsMetadataAllowedMinVersionOlder() { - Settings settings = Settings.builder().put(CLUSTER_REMOTE_STORE_TRANSLOG_CKP_AS_METADATA.getKey(), randomBoolean()).build(); + Settings settings = Settings.builder().put(CLUSTER_REMOTE_STORE_TRANSLOG_TRANSLOG_METADATA.getKey(), randomBoolean()).build(); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); RemoteStoreSettings remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings); RemoteStoreCustomMetadataResolver resolver = new RemoteStoreCustomMetadataResolver(remoteStoreSettings, () -> Version.V_2_14_0); - assertFalse(resolver.isCkpAsTranslogMetadata()); + assertFalse(resolver.isTranslogMetadataEnabled()); } } diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceMockRepositoryTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceMockRepositoryTests.java index 20e1e516b2104..a806eea381297 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceMockRepositoryTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceMockRepositoryTests.java @@ -19,9 +19,7 @@ import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; -import java.io.ByteArrayInputStream; import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; @@ -45,7 +43,6 @@ public class BlobStoreTransferServiceMockRepositoryTests extends OpenSearchTestC private ThreadPool threadPool; private BlobStore blobStore; - byte[] ckpBytes = "ckp-data".getBytes(StandardCharsets.UTF_8); @Override public void setUp() throws Exception { @@ -91,11 +88,7 @@ public void onResponse(FileSnapshot.TransferFileSnapshot fileSnapshot) { public void onFailure(Exception e) { exceptionRef.set(e); } - }, latch), WritePriority.HIGH, new HashMap<>() { - { - put(transferFileSnapshot, new ByteArrayInputStream(ckpBytes)); - } - }); + }, latch), WritePriority.HIGH); assertTrue(latch.await(1000, TimeUnit.MILLISECONDS)); verify(blobContainer).asyncBlobUpload(any(WriteContext.class), any()); @@ -136,11 +129,7 @@ public void onResponse(FileSnapshot.TransferFileSnapshot fileSnapshot) { public void onFailure(Exception e) { exceptionRef.set(e); } - }, latch), WritePriority.HIGH, new HashMap<>() { - { - put(transferFileSnapshot, new ByteArrayInputStream(ckpBytes)); - } - }); + }, latch), WritePriority.HIGH); assertTrue(latch.await(1000, TimeUnit.MILLISECONDS)); verify(blobContainer).asyncBlobUpload(any(WriteContext.class), any()); @@ -185,11 +174,7 @@ public void onFailure(Exception e) { { put(transferFileSnapshot.getPrimaryTerm(), new BlobPath().add("sample_path")); } - }, listener, WritePriority.HIGH, new HashMap<>() { - { - put(transferFileSnapshot, new ByteArrayInputStream(ckpBytes)); - } - }); + }, listener, WritePriority.HIGH); assertTrue(latch.await(1000, TimeUnit.MILLISECONDS)); verify(blobContainer).asyncBlobUpload(any(WriteContext.class), any()); diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java index e4f5a454b15f6..e0b3ad92c5712 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java @@ -26,14 +26,21 @@ import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.util.Base64; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import static org.opensearch.index.translog.transfer.TranslogTransferManager.CHECKPOINT_FILE_DATA_KEY; + public class BlobStoreTransferServiceTests extends OpenSearchTestCase { private ThreadPool threadPool; @@ -144,4 +151,36 @@ private Environment createEnvironment() { .build() ); } + + public void testBuildTransferFileMetadata_EmptyInputStream() throws IOException { + InputStream emptyInputStream = new ByteArrayInputStream(new byte[0]); + Map metadata = BlobStoreTransferService.buildTransferFileMetadata(emptyInputStream); + assertTrue(metadata.containsKey(CHECKPOINT_FILE_DATA_KEY)); + assertEquals("", metadata.get(CHECKPOINT_FILE_DATA_KEY)); + } + + public void testBuildTransferFileMetadata_NonEmptyInputStream() throws IOException { + String inputData = "This is a test input stream."; + InputStream inputStream = new ByteArrayInputStream(inputData.getBytes(StandardCharsets.UTF_8)); + Map metadata = BlobStoreTransferService.buildTransferFileMetadata(inputStream); + assertTrue(metadata.containsKey(CHECKPOINT_FILE_DATA_KEY)); + String expectedBase64String = Base64.getEncoder().encodeToString(inputData.getBytes(StandardCharsets.UTF_8)); + assertEquals(expectedBase64String, metadata.get(CHECKPOINT_FILE_DATA_KEY)); + } + + public void testBuildTransferFileMetadata_InputStreamExceedsLimit() { + byte[] largeData = new byte[1025]; // 1025 bytes, exceeding the 1KB limit + InputStream largeInputStream = new ByteArrayInputStream(largeData); + assertThrows(AssertionError.class, () -> BlobStoreTransferService.buildTransferFileMetadata(largeInputStream)); + } + + public void testBuildTransferFileMetadata_SmallInputStreamOptimization() throws IOException { + String inputData = "Small input"; + InputStream inputStream = new ByteArrayInputStream(inputData.getBytes(StandardCharsets.UTF_8)); + Map metadata = BlobStoreTransferService.buildTransferFileMetadata(inputStream); + assertTrue(metadata.containsKey(CHECKPOINT_FILE_DATA_KEY)); + String expectedBase64String = Base64.getEncoder().encodeToString(inputData.getBytes(StandardCharsets.UTF_8)); + assertEquals(expectedBase64String, metadata.get(CHECKPOINT_FILE_DATA_KEY)); + } + } diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index 8108496100b94..7a8211c3ef7ea 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -15,6 +15,7 @@ import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.blobstore.FetchBlobResult; import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.blobstore.support.PlainBlobMetadata; import org.opensearch.common.collect.Tuple; @@ -41,7 +42,9 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; +import java.util.Base64; import java.util.Collections; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -54,15 +57,17 @@ import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG; import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA; +import static org.opensearch.index.translog.transfer.TranslogTransferManager.CHECKPOINT_FILE_DATA_KEY; import static org.opensearch.index.translog.transfer.TranslogTransferMetadata.METADATA_SEPARATOR; import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.anyMap; -import static org.mockito.ArgumentMatchers.anySet; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyMap; +import static org.mockito.Mockito.anySet; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -82,7 +87,7 @@ public class TranslogTransferManagerTests extends OpenSearchTestCase { FileTransferTracker tracker; TranslogTransferManager translogTransferManager; long delayForBlobDownload; - boolean ckpAsTranslogMetadata; + boolean isTranslogMetadataEnabled; @Override public void setUp() throws Exception { @@ -99,7 +104,7 @@ public void setUp() throws Exception { tlogBytes = "Hello Translog".getBytes(StandardCharsets.UTF_8); ckpBytes = "Hello Checkpoint".getBytes(StandardCharsets.UTF_8); tracker = new FileTransferTracker(new ShardId("index", "indexUuid", 0), remoteTranslogTransferTracker); - ckpAsTranslogMetadata = false; + isTranslogMetadataEnabled = false; translogTransferManager = new TranslogTransferManager( shardId, transferService, @@ -108,7 +113,7 @@ public void setUp() throws Exception { tracker, remoteTranslogTransferTracker, DefaultRemoteStoreSettings.INSTANCE, - ckpAsTranslogMetadata + isTranslogMetadataEnabled ); delayForBlobDownload = 1; @@ -147,7 +152,7 @@ public void testTransferSnapshot() throws Exception { Set transferFileSnapshots = (Set) invocationOnMock.getArguments()[0]; transferFileSnapshots.forEach(listener::onResponse); return null; - }).when(transferService).uploadBlobs(anySet(), anyMap(), any(ActionListener.class), any(WritePriority.class), anyMap()); + }).when(transferService).uploadBlobs(anySet(), anyMap(), any(ActionListener.class), any(WritePriority.class)); FileTransferTracker fileTransferTracker = new FileTransferTracker( new ShardId("index", "indexUUid", 0), @@ -175,7 +180,7 @@ public void onFailure(TransferFileSnapshot fileSnapshot, Exception e) { fileTransferTracker, remoteTranslogTransferTracker, DefaultRemoteStoreSettings.INSTANCE, - ckpAsTranslogMetadata + isTranslogMetadataEnabled ); assertTrue(translogTransferManager.transferSnapshot(createTransferSnapshot(), new TranslogTransferListener() { @@ -213,7 +218,7 @@ public void testTransferSnapshotOnUploadTimeout() throws Exception { Thread t = new Thread(runnable); t.start(); return null; - }).when(transferService).uploadBlobs(anySet(), anyMap(), any(ActionListener.class), any(WritePriority.class), anyMap()); + }).when(transferService).uploadBlobs(anySet(), anyMap(), any(ActionListener.class), any(WritePriority.class)); FileTransferTracker fileTransferTracker = new FileTransferTracker( new ShardId("index", "indexUUid", 0), remoteTranslogTransferTracker @@ -228,7 +233,7 @@ public void testTransferSnapshotOnUploadTimeout() throws Exception { fileTransferTracker, remoteTranslogTransferTracker, remoteStoreSettings, - ckpAsTranslogMetadata + isTranslogMetadataEnabled ); SetOnce exception = new SetOnce<>(); translogTransferManager.transferSnapshot(createTransferSnapshot(), new TranslogTransferListener() { @@ -259,7 +264,7 @@ public void testTransferSnapshotOnThreadInterrupt() throws Exception { })); uploadThread.get().start(); return null; - }).when(transferService).uploadBlobs(anySet(), anyMap(), any(ActionListener.class), any(WritePriority.class), anyMap()); + }).when(transferService).uploadBlobs(anySet(), anyMap(), any(ActionListener.class), any(WritePriority.class)); FileTransferTracker fileTransferTracker = new FileTransferTracker( new ShardId("index", "indexUUid", 0), remoteTranslogTransferTracker @@ -272,7 +277,7 @@ public void testTransferSnapshotOnThreadInterrupt() throws Exception { fileTransferTracker, remoteTranslogTransferTracker, DefaultRemoteStoreSettings.INSTANCE, - ckpAsTranslogMetadata + isTranslogMetadataEnabled ); SetOnce exception = new SetOnce<>(); @@ -304,69 +309,66 @@ public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) { uploadThread.get().interrupt(); } - private TransferSnapshot createTransferSnapshot() { - return new TransferSnapshot() { - @Override - public Set getCheckpointFileSnapshots() { - try { - return Set.of( - new CheckpointFileSnapshot( - primaryTerm, - generation, - minTranslogGeneration, - createTempFile(Translog.TRANSLOG_FILE_PREFIX + generation, Translog.CHECKPOINT_SUFFIX), - null - ), - new CheckpointFileSnapshot( - primaryTerm, - generation, - minTranslogGeneration, - createTempFile(Translog.TRANSLOG_FILE_PREFIX + (generation - 1), Translog.CHECKPOINT_SUFFIX), - null - ) - ); - } catch (IOException e) { - throw new AssertionError("Failed to create temp file", e); + private TransferSnapshot createTransferSnapshot() throws IOException { + try { + CheckpointFileSnapshot checkpointFileSnapshot1 = new CheckpointFileSnapshot( + primaryTerm, + generation, + minTranslogGeneration, + createTempFile(Translog.TRANSLOG_FILE_PREFIX + generation, Translog.CHECKPOINT_SUFFIX), + null + ); + CheckpointFileSnapshot checkpointFileSnapshot2 = new CheckpointFileSnapshot( + primaryTerm, + generation, + minTranslogGeneration, + createTempFile(Translog.TRANSLOG_FILE_PREFIX + (generation - 1), Translog.CHECKPOINT_SUFFIX), + null + ); + TranslogFileSnapshot translogFileSnapshot1 = new TranslogFileSnapshot( + primaryTerm, + generation, + createTempFile(Translog.TRANSLOG_FILE_PREFIX + generation, Translog.TRANSLOG_FILE_SUFFIX), + null + ); + TranslogFileSnapshot translogFileSnapshot2 = new TranslogFileSnapshot( + primaryTerm, + generation - 1, + createTempFile(Translog.TRANSLOG_FILE_PREFIX + (generation - 1), Translog.TRANSLOG_FILE_SUFFIX), + null + ); + + return new TransferSnapshot() { + @Override + public Set getCheckpointFileSnapshots() { + return Set.of(checkpointFileSnapshot1, checkpointFileSnapshot2); } - } - @Override - public Set getTranslogFileSnapshots() { - try { - return Set.of( - new TranslogFileSnapshot( - primaryTerm, - generation, - createTempFile(Translog.TRANSLOG_FILE_PREFIX + generation, Translog.TRANSLOG_FILE_SUFFIX), - null - ), - new TranslogFileSnapshot( - primaryTerm, - generation - 1, - createTempFile(Translog.TRANSLOG_FILE_PREFIX + (generation - 1), Translog.TRANSLOG_FILE_SUFFIX), - null - ) - ); - } catch (IOException e) { - throw new AssertionError("Failed to create temp file", e); + @Override + public Set getTranslogFileSnapshots() { + return Set.of(translogFileSnapshot1, translogFileSnapshot2); } - } - @Override - public TranslogTransferMetadata getTranslogTransferMetadata() { - return new TranslogTransferMetadata(primaryTerm, generation, minTranslogGeneration, randomInt(5)); - } + @Override + public TranslogTransferMetadata getTranslogTransferMetadata() { + return new TranslogTransferMetadata(primaryTerm, generation, minTranslogGeneration, randomInt(5)); + } - @Override - public Map getTranslogCheckpointSnapshotMap() { - return Map.of(); - } + @Override + public Set getTranslogFileSnapshotWithMetadata() throws IOException { + translogFileSnapshot1.setMetadataFileInputStream(checkpointFileSnapshot1.inputStream()); + translogFileSnapshot2.setMetadataFileInputStream(checkpointFileSnapshot2.inputStream()); + return Set.of(translogFileSnapshot1, translogFileSnapshot2); + } - @Override - public String toString() { - return "test-to-string"; - } - }; + @Override + public String toString() { + return "test-to-string"; + } + }; + } catch (Exception e) { + throw new IOException("Failed to create transfer snapshot"); + } } public void testReadMetadataNoFile() throws IOException { @@ -515,7 +517,7 @@ public void testDeleteTranslogSuccess() throws Exception { tracker, remoteTranslogTransferTracker, DefaultRemoteStoreSettings.INSTANCE, - ckpAsTranslogMetadata + isTranslogMetadataEnabled ); String translogFile = "translog-19.tlog", checkpointFile = "translog-19.ckp"; tracker.add(translogFile, true); @@ -581,7 +583,7 @@ public void testDeleteTranslogFailure() throws Exception { tracker, remoteTranslogTransferTracker, DefaultRemoteStoreSettings.INSTANCE, - ckpAsTranslogMetadata + isTranslogMetadataEnabled ); String translogFile = "translog-19.tlog", checkpointFile = "translog-19.ckp"; tracker.add(translogFile, true); @@ -638,4 +640,127 @@ public void testMetadataConflict() throws InterruptedException { assertThrows(RuntimeException.class, translogTransferManager::readMetadata); } + + // tests for cases when ckp is stored as translog metadata. + public void testTransferSnapshotWithTranslogMetadata() throws Exception { + AtomicInteger fileTransferSucceeded = new AtomicInteger(); + AtomicInteger fileTransferFailed = new AtomicInteger(); + AtomicInteger translogTransferSucceeded = new AtomicInteger(); + AtomicInteger translogTransferFailed = new AtomicInteger(); + + isTranslogMetadataEnabled = true; + translogTransferManager = new TranslogTransferManager( + shardId, + transferService, + remoteBaseTransferPath.add(TRANSLOG.getName()), + remoteBaseTransferPath.add(METADATA.getName()), + tracker, + remoteTranslogTransferTracker, + DefaultRemoteStoreSettings.INSTANCE, + isTranslogMetadataEnabled + ); + + doNothing().when(transferService) + .uploadBlob( + any(TransferFileSnapshot.class), + Mockito.eq(remoteBaseTransferPath.add(String.valueOf(primaryTerm))), + any(WritePriority.class) + ); + doAnswer(invocationOnMock -> { + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; + Set transferFileSnapshots = (Set) invocationOnMock.getArguments()[0]; + transferFileSnapshots.forEach(transferFileSnapshot -> { + assertNotNull(transferFileSnapshot.getMetadataFileInputStream()); + listener.onResponse(transferFileSnapshot); + }); + return null; + }).when(transferService).uploadBlobs(anySet(), anyMap(), any(ActionListener.class), any(WritePriority.class)); + + FileTransferTracker fileTransferTracker = new FileTransferTracker( + new ShardId("index", "indexUUid", 0), + remoteTranslogTransferTracker + ) { + @Override + public void onSuccess(TransferFileSnapshot fileSnapshot) { + fileTransferSucceeded.incrementAndGet(); + super.onSuccess(fileSnapshot); + } + + @Override + public void onFailure(TransferFileSnapshot fileSnapshot, Exception e) { + fileTransferFailed.incrementAndGet(); + super.onFailure(fileSnapshot, e); + } + + }; + + TranslogTransferManager translogTransferManager = new TranslogTransferManager( + shardId, + transferService, + remoteBaseTransferPath.add(TRANSLOG.getName()), + remoteBaseTransferPath.add(METADATA.getName()), + fileTransferTracker, + remoteTranslogTransferTracker, + DefaultRemoteStoreSettings.INSTANCE, + isTranslogMetadataEnabled + ); + + assertTrue(translogTransferManager.transferSnapshot(createTransferSnapshot(), new TranslogTransferListener() { + @Override + public void onUploadComplete(TransferSnapshot transferSnapshot) { + translogTransferSucceeded.incrementAndGet(); + } + + @Override + public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) { + translogTransferFailed.incrementAndGet(); + } + })); + assertEquals(2, fileTransferSucceeded.get()); + assertEquals(0, fileTransferFailed.get()); + assertEquals(1, translogTransferSucceeded.get()); + assertEquals(0, translogTransferFailed.get()); + assertEquals(2, fileTransferTracker.allUploaded().size()); + } + + public void testDownloadTranslogWithMetadata() throws IOException { + isTranslogMetadataEnabled = true; + translogTransferManager = new TranslogTransferManager( + shardId, + transferService, + remoteBaseTransferPath.add(TRANSLOG.getName()), + remoteBaseTransferPath.add(METADATA.getName()), + tracker, + remoteTranslogTransferTracker, + DefaultRemoteStoreSettings.INSTANCE, + isTranslogMetadataEnabled + ); + Path location = createTempDir(); + assertFalse(Files.exists(location.resolve("translog-23.tlog"))); + assertFalse(Files.exists(location.resolve("translog-23.ckp"))); + mockDownloadBlobWithMetadataResponse(); + translogTransferManager.downloadTranslog("12", "23", location); + verify(transferService, times(0)).downloadBlob(any(BlobPath.class), eq("translog-23.tlog")); + verify(transferService, times(0)).downloadBlob(any(BlobPath.class), eq("translog-23.ckp")); + verify(transferService, times(1)).downloadBlobWithMetadata(any(BlobPath.class), eq("translog-23.tlog")); + assertTrue(Files.exists(location.resolve("translog-23.tlog"))); + assertTrue(Files.exists(location.resolve("translog-23.ckp"))); + assertTlogCkpDownloadStatsWithMetadata(); + } + + private void mockDownloadBlobWithMetadataResponse() throws IOException { + Map metadata = new HashMap<>(); + String ckpDataString = Base64.getEncoder().encodeToString(ckpBytes); + metadata.put(CHECKPOINT_FILE_DATA_KEY, ckpDataString); + when(transferService.downloadBlobWithMetadata(any(BlobPath.class), eq("translog-23.tlog"))).thenAnswer(invocation -> { + Thread.sleep(delayForBlobDownload); + return new FetchBlobResult(new ByteArrayInputStream(tlogBytes), metadata); + }); + } + + private void assertTlogCkpDownloadStatsWithMetadata() { + assertEquals(tlogBytes.length, remoteTranslogTransferTracker.getDownloadBytesSucceeded()); + // Expect delay for both tlog and ckp file + assertTrue(remoteTranslogTransferTracker.getTotalDownloadTimeInMillis() >= delayForBlobDownload); + } }