From eceb712391c178625a79d437672d36b4c07a4d7b Mon Sep 17 00:00:00 2001 From: Kunal Kotwani Date: Mon, 29 Jan 2024 10:22:16 -0800 Subject: [PATCH] Introduce a unified download manager for remote store operations Signed-off-by: Kunal Kotwani --- .../opensearch/index/shard/IndexShardIT.java | 1 + .../org/opensearch/index/IndexModule.java | 7 +- .../org/opensearch/index/IndexService.java | 9 +- .../transfer/DownloadManager.java} | 112 +++++++++-- .../StatsTrackingDownloadManager.java | 89 +++++++++ .../index/remote/transfer/package-info.java | 12 ++ .../opensearch/index/shard/IndexShard.java | 21 ++- ...emoteBlobStoreInternalTranslogFactory.java | 9 +- .../index/translog/RemoteFsTranslog.java | 25 ++- .../transfer/FileTransferTracker.java | 2 +- .../translog/transfer/TransferService.java | 1 + .../transfer/TranslogTransferManager.java | 79 +++----- .../transfer/TranslogTransferMetadata.java | 6 +- .../opensearch/indices/IndicesService.java | 21 ++- .../RemoteStoreReplicationSource.java | 4 +- .../opensearch/index/IndexModuleTests.java | 8 +- .../transfer/DownloadManagerTests.java} | 177 +++++++++++++----- .../index/translog/RemoteFsTranslogTests.java | 26 ++- .../TranslogTransferManagerTests.java | 87 +++++++-- .../index/shard/IndexShardTestCase.java | 8 +- 20 files changed, 527 insertions(+), 177 deletions(-) rename server/src/main/java/org/opensearch/index/{store/RemoteStoreFileDownloader.java => remote/transfer/DownloadManager.java} (57%) create mode 100644 server/src/main/java/org/opensearch/index/remote/transfer/StatsTrackingDownloadManager.java create mode 100644 server/src/main/java/org/opensearch/index/remote/transfer/package-info.java rename server/src/test/java/org/opensearch/index/{store/RemoteStoreFileDownloaderTests.java => remote/transfer/DownloadManagerTests.java} (50%) diff --git a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java index c394a1f631690..5c9944afb2fde 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java @@ -713,6 +713,7 @@ public static final IndexShard newIndexShard( null, () -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL, nodeId, + null, null ); } diff --git a/server/src/main/java/org/opensearch/index/IndexModule.java b/server/src/main/java/org/opensearch/index/IndexModule.java index 6ac10a221d49e..9134a04e5cdc4 100644 --- a/server/src/main/java/org/opensearch/index/IndexModule.java +++ b/server/src/main/java/org/opensearch/index/IndexModule.java @@ -70,6 +70,7 @@ import org.opensearch.index.engine.EngineConfigFactory; import org.opensearch.index.engine.EngineFactory; import org.opensearch.index.mapper.MapperService; +import org.opensearch.index.remote.transfer.DownloadManager; import org.opensearch.index.shard.IndexEventListener; import org.opensearch.index.shard.IndexingOperationListener; import org.opensearch.index.shard.SearchOperationListener; @@ -605,7 +606,8 @@ public IndexService newIndexService( BiFunction translogFactorySupplier, Supplier clusterDefaultRefreshIntervalSupplier, Supplier clusterRemoteTranslogBufferIntervalSupplier, - RecoverySettings recoverySettings + RecoverySettings recoverySettings, + DownloadManager downloadManager ) throws IOException { final IndexEventListener eventListener = freeze(); Function> readerWrapperFactory = indexReaderWrapper @@ -664,7 +666,8 @@ public IndexService newIndexService( translogFactorySupplier, clusterDefaultRefreshIntervalSupplier, clusterRemoteTranslogBufferIntervalSupplier, - recoverySettings + recoverySettings, + downloadManager ); success = true; return indexService; diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index 0909e2d5c8ff0..2bae04c0ea65e 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -80,6 +80,7 @@ import org.opensearch.index.query.QueryShardContext; import org.opensearch.index.query.SearchIndexNameMatcher; import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory; +import org.opensearch.index.remote.transfer.DownloadManager; import org.opensearch.index.seqno.RetentionLeaseSyncer; import org.opensearch.index.shard.IndexEventListener; import org.opensearch.index.shard.IndexShard; @@ -182,6 +183,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust private final Supplier clusterDefaultRefreshIntervalSupplier; private final Supplier clusterRemoteTranslogBufferIntervalSupplier; private final RecoverySettings recoverySettings; + private final DownloadManager downloadManager; public IndexService( IndexSettings indexSettings, @@ -217,7 +219,8 @@ public IndexService( BiFunction translogFactorySupplier, Supplier clusterDefaultRefreshIntervalSupplier, Supplier clusterRemoteTranslogBufferIntervalSupplier, - RecoverySettings recoverySettings + RecoverySettings recoverySettings, + DownloadManager downloadManager ) { super(indexSettings); this.allowExpensiveQueries = allowExpensiveQueries; @@ -295,6 +298,7 @@ public IndexService( this.translogFactorySupplier = translogFactorySupplier; this.clusterRemoteTranslogBufferIntervalSupplier = clusterRemoteTranslogBufferIntervalSupplier; this.recoverySettings = recoverySettings; + this.downloadManager = downloadManager; updateFsyncTaskIfNecessary(); } @@ -528,7 +532,8 @@ public synchronized IndexShard createShard( remoteStoreStatsTrackerFactory, clusterRemoteTranslogBufferIntervalSupplier, nodeEnv.nodeId(), - recoverySettings + recoverySettings, + downloadManager ); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); diff --git a/server/src/main/java/org/opensearch/index/store/RemoteStoreFileDownloader.java b/server/src/main/java/org/opensearch/index/remote/transfer/DownloadManager.java similarity index 57% rename from server/src/main/java/org/opensearch/index/store/RemoteStoreFileDownloader.java rename to server/src/main/java/org/opensearch/index/remote/transfer/DownloadManager.java index 134994c0ae32c..427f76630d976 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteStoreFileDownloader.java +++ b/server/src/main/java/org/opensearch/index/remote/transfer/DownloadManager.java @@ -6,47 +6,105 @@ * compatible open source license. */ -package org.opensearch.index.store; +package org.opensearch.index.remote.transfer; +import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; import org.opensearch.action.support.GroupedActionListener; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.common.Nullable; import org.opensearch.common.annotation.PublicApi; -import org.opensearch.common.logging.Loggers; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.lucene.store.ByteArrayIndexInput; import org.opensearch.common.util.CancellableThreads; import org.opensearch.core.action.ActionListener; -import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.store.RemoteSegmentStoreDirectory; +import org.opensearch.index.translog.transfer.TranslogTransferManager; +import org.opensearch.index.translog.transfer.TranslogTransferMetadata; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.threadpool.ThreadPool; import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.Collection; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; /** - * Helper class to downloads files from a {@link RemoteSegmentStoreDirectory} - * instance to a local {@link Directory} instance in parallel depending on thread - * pool size and recovery settings. + * Manages file downloads from the remote store and from a {@link RemoteSegmentStoreDirectory} + * instance to a local {@link Directory} instance. * * @opensearch.api */ -@PublicApi(since = "2.11.0") -public final class RemoteStoreFileDownloader { - private final Logger logger; +@PublicApi(since = "2.13.0") +public class DownloadManager { + private final ThreadPool threadPool; private final RecoverySettings recoverySettings; + private static final Logger logger = LogManager.getLogger(DownloadManager.class); - public RemoteStoreFileDownloader(ShardId shardId, ThreadPool threadPool, RecoverySettings recoverySettings) { - this.logger = Loggers.getLogger(RemoteStoreFileDownloader.class, shardId); + public DownloadManager(ThreadPool threadPool, RecoverySettings recoverySettings) { this.threadPool = threadPool; this.recoverySettings = recoverySettings; } + public DownloadManager(DownloadManager other) { + this.threadPool = other.threadPool; + this.recoverySettings = other.recoverySettings; + } + + /** + * Downloads the {@link TranslogTransferMetadata} file from the blob container stored as a part of remote store + * metadata + * @param blobContainer location where the metadata file is stored + * @param fileName name of the metadata file + * @return deserialized metadata object + * @throws IOException exception when reading from the remote file + */ + public TranslogTransferMetadata fetchTranslogTransferMetadata(BlobContainer blobContainer, String fileName) throws IOException { + byte[] data = downloadFileFromRemoteStoreToMemory(blobContainer, fileName); + IndexInput indexInput = new ByteArrayIndexInput("metadata file", data); + return TranslogTransferManager.METADATA_STREAM_WRAPPER.readStream(indexInput); + } + + /** + * Downloads a file stored in the blob container into memory + * @param blobContainer location of the blob + * @param fileName name of the blob to be downloaded + * @return in-memory byte representation of the blob + * @throws IOException exception when reading from the repository file + */ + public byte[] downloadFileFromRemoteStoreToMemory(BlobContainer blobContainer, String fileName) throws IOException { + // TODO: Add a circuit breaker check before putting all the data in heap + try (InputStream inputStream = blobContainer.readBlob(fileName)) { + return inputStream.readAllBytes(); + } + } + + /** + * Downloads a file stored within the blob container onto local disk specified using {@link Path} based location. + * @param blobContainer location of the blob + * @param fileName name of the blob to be downloaded + * @param location location on disk where the blob will be downloaded + * @throws IOException exception when reading from the repository file or writing to disk + */ + public void downloadFileFromRemoteStore(BlobContainer blobContainer, String fileName, Path location) throws IOException { + Path filePath = location.resolve(fileName); + // Here, we always override the existing file if present. + // We need to change this logic when we introduce incremental download + Files.deleteIfExists(filePath); + + try (InputStream inputStream = blobContainer.readBlob(fileName)) { + Files.copy(inputStream, filePath); + } + } + /** * Copies the given segments from the remote segment store to the given * local directory. @@ -55,14 +113,14 @@ public RemoteStoreFileDownloader(ShardId shardId, ThreadPool threadPool, Recover * @param toDownloadSegments The list of segment files to download * @param listener Callback listener to be notified upon completion */ - public void downloadAsync( + public void copySegmentsFromRemoteStoreAsync( CancellableThreads cancellableThreads, Directory source, Directory destination, Collection toDownloadSegments, ActionListener listener ) { - downloadInternal(cancellableThreads, source, destination, null, toDownloadSegments, () -> {}, listener); + copySegmentsInternal(cancellableThreads, source, destination, null, toDownloadSegments, () -> {}, listener); } /** @@ -77,7 +135,7 @@ public void downloadAsync( * Must be thread safe as this may be invoked concurrently from * different threads. */ - public void download( + public void copySegmentsFromRemoteStore( Directory source, Directory destination, Directory secondDestination, @@ -86,7 +144,7 @@ public void download( ) throws InterruptedException, IOException { final CancellableThreads cancellableThreads = new CancellableThreads(); final PlainActionFuture listener = PlainActionFuture.newFuture(); - downloadInternal(cancellableThreads, source, destination, secondDestination, toDownloadSegments, onFileCompletion, listener); + copySegmentsInternal(cancellableThreads, source, destination, secondDestination, toDownloadSegments, onFileCompletion, listener); try { listener.get(); } catch (ExecutionException e) { @@ -105,7 +163,7 @@ public void download( } } - private void downloadInternal( + private void copySegmentsInternal( CancellableThreads cancellableThreads, Directory source, Directory destination, @@ -126,11 +184,19 @@ private void downloadInternal( logger.trace("Starting download of {} files with {} threads", queue.size(), threads); final ActionListener allFilesListener = new GroupedActionListener<>(ActionListener.map(listener, r -> null), threads); for (int i = 0; i < threads; i++) { - copyOneFile(cancellableThreads, source, destination, secondDestination, queue, onFileCompletion, allFilesListener); + copyFileFromRemoteStoreDirectory( + cancellableThreads, + source, + destination, + secondDestination, + queue, + onFileCompletion, + allFilesListener + ); } } - private void copyOneFile( + private void copyFileFromRemoteStoreDirectory( CancellableThreads cancellableThreads, Directory source, Directory destination, @@ -160,7 +226,15 @@ private void copyOneFile( listener.onFailure(e); return; } - copyOneFile(cancellableThreads, source, destination, secondDestination, queue, onFileCompletion, listener); + copyFileFromRemoteStoreDirectory( + cancellableThreads, + source, + destination, + secondDestination, + queue, + onFileCompletion, + listener + ); }); } } diff --git a/server/src/main/java/org/opensearch/index/remote/transfer/StatsTrackingDownloadManager.java b/server/src/main/java/org/opensearch/index/remote/transfer/StatsTrackingDownloadManager.java new file mode 100644 index 0000000000000..58ed9d2ea946a --- /dev/null +++ b/server/src/main/java/org/opensearch/index/remote/transfer/StatsTrackingDownloadManager.java @@ -0,0 +1,89 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.remote.transfer; + +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.index.remote.RemoteTranslogTransferTracker; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +/** + * A delegating wrapper class of {@link DownloadManager} for keeping track of download stats and metric reporting. + */ +public class StatsTrackingDownloadManager extends DownloadManager { + + private final RemoteTranslogTransferTracker remoteTranslogTransferTracker; + + public StatsTrackingDownloadManager( + ThreadPool threadPool, + RecoverySettings recoverySettings, + RemoteTranslogTransferTracker remoteTranslogTransferTracker + ) { + super(threadPool, recoverySettings); + this.remoteTranslogTransferTracker = remoteTranslogTransferTracker; + } + + public StatsTrackingDownloadManager(DownloadManager downloadManager, RemoteTranslogTransferTracker remoteTranslogTransferTracker) { + super(downloadManager); + this.remoteTranslogTransferTracker = remoteTranslogTransferTracker; + } + + /** + * Tracks the download time and size when fetching an object from the container to memory + * @param blobContainer location of the blob + * @param fileName name of the blob to be downloaded + * @return in-memory byte representation of the file + * @throws IOException exception on reading blob from container + */ + @Override + public byte[] downloadFileFromRemoteStoreToMemory(BlobContainer blobContainer, String fileName) throws IOException { + // TODO: Rewrite stats logic to remove hard-wiring to translog transfer tracker + // (maybe make RemoteTranslogTransferTracker methods interface dependent?) + long downloadStartTime = System.nanoTime(); + byte[] data = null; + try { + data = super.downloadFileFromRemoteStoreToMemory(blobContainer, fileName); + } finally { + remoteTranslogTransferTracker.addDownloadTimeInMillis((System.nanoTime() - downloadStartTime) / 1_000_000L); + if (data != null) { + remoteTranslogTransferTracker.addDownloadBytesSucceeded(data.length); + } + } + return data; + } + + /** + * Tracks the download time and size when fetching an object from the container to a local file + * @param blobContainer location of the blob + * @param fileName name of the blob to be downloaded + * @param location location on disk where the blob will be downloaded + * @throws IOException exception on reading blob from container or writing to disk + */ + @Override + public void downloadFileFromRemoteStore(BlobContainer blobContainer, String fileName, Path location) throws IOException { + // TODO: Rewrite stats logic to remove hard-wiring to translog transfer tracker + boolean downloadStatus = false; + long bytesToRead = 0, downloadStartTime = System.nanoTime(); + try { + super.downloadFileFromRemoteStore(blobContainer, fileName, location); + bytesToRead = Files.size(location.resolve(fileName)); + downloadStatus = true; + } finally { + remoteTranslogTransferTracker.addDownloadTimeInMillis((System.nanoTime() - downloadStartTime) / 1_000_000L); + if (downloadStatus) { + remoteTranslogTransferTracker.addDownloadBytesSucceeded(bytesToRead); + } + } + + } +} diff --git a/server/src/main/java/org/opensearch/index/remote/transfer/package-info.java b/server/src/main/java/org/opensearch/index/remote/transfer/package-info.java new file mode 100644 index 0000000000000..053dc268abeaa --- /dev/null +++ b/server/src/main/java/org/opensearch/index/remote/transfer/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Package containing classes to manage downloads from the repository for remote store operations. + */ +package org.opensearch.index.remote.transfer; diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 5834eabfa9af0..c6c09f23e131f 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -149,6 +149,7 @@ import org.opensearch.index.refresh.RefreshStats; import org.opensearch.index.remote.RemoteSegmentStats; import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory; +import org.opensearch.index.remote.transfer.DownloadManager; import org.opensearch.index.search.stats.SearchStats; import org.opensearch.index.search.stats.ShardSearchStats; import org.opensearch.index.seqno.ReplicationTracker; @@ -161,7 +162,6 @@ import org.opensearch.index.shard.PrimaryReplicaSyncer.ResyncTask; import org.opensearch.index.similarity.SimilarityService; import org.opensearch.index.store.RemoteSegmentStoreDirectory; -import org.opensearch.index.store.RemoteStoreFileDownloader; import org.opensearch.index.store.Store; import org.opensearch.index.store.Store.MetadataSnapshot; import org.opensearch.index.store.StoreFileMetadata; @@ -344,7 +344,7 @@ Runnable getGlobalCheckpointSyncer() { private final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory; private final List internalRefreshListener = new ArrayList<>(); - private final RemoteStoreFileDownloader fileDownloader; + private final DownloadManager downloadManager; private final RecoverySettings recoverySettings; public IndexShard( @@ -374,7 +374,8 @@ public IndexShard( final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory, final Supplier clusterRemoteTranslogBufferIntervalSupplier, final String nodeId, - final RecoverySettings recoverySettings + final RecoverySettings recoverySettings, + DownloadManager downloadManager ) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); @@ -471,7 +472,7 @@ public boolean shouldCache(Query query) { : mapperService.documentMapper().mappers().containsTimeStampField(); this.remoteStoreStatsTrackerFactory = remoteStoreStatsTrackerFactory; this.recoverySettings = recoverySettings; - this.fileDownloader = new RemoteStoreFileDownloader(shardRouting.shardId(), threadPool, recoverySettings); + this.downloadManager = downloadManager; } public ThreadPool getThreadPool() { @@ -573,8 +574,8 @@ public RecoverySettings getRecoverySettings() { return recoverySettings; } - public RemoteStoreFileDownloader getFileDownloader() { - return fileDownloader; + public DownloadManager getDownloadManager() { + return downloadManager; } @Override @@ -5009,7 +5010,13 @@ private String copySegmentFiles( if (toDownloadSegments.isEmpty() == false) { try { - fileDownloader.download(sourceRemoteDirectory, storeDirectory, targetRemoteDirectory, toDownloadSegments, onFileSync); + downloadManager.copySegmentsFromRemoteStore( + sourceRemoteDirectory, + storeDirectory, + targetRemoteDirectory, + toDownloadSegments, + onFileSync + ); } catch (Exception e) { throw new IOException("Error occurred when downloading segments from remote store", e); } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java b/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java index e100ffaabf13d..03d074eff49fa 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java @@ -9,6 +9,7 @@ package org.opensearch.index.translog; import org.opensearch.index.remote.RemoteTranslogTransferTracker; +import org.opensearch.index.remote.transfer.DownloadManager; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import org.opensearch.repositories.RepositoryMissingException; @@ -33,12 +34,14 @@ public class RemoteBlobStoreInternalTranslogFactory implements TranslogFactory { private final ThreadPool threadPool; private final RemoteTranslogTransferTracker remoteTranslogTransferTracker; + private final DownloadManager downloadManager; public RemoteBlobStoreInternalTranslogFactory( Supplier repositoriesServiceSupplier, ThreadPool threadPool, String repositoryName, - RemoteTranslogTransferTracker remoteTranslogTransferTracker + RemoteTranslogTransferTracker remoteTranslogTransferTracker, + DownloadManager downloadManager ) { Repository repository; try { @@ -49,6 +52,7 @@ public RemoteBlobStoreInternalTranslogFactory( this.repository = repository; this.threadPool = threadPool; this.remoteTranslogTransferTracker = remoteTranslogTransferTracker; + this.downloadManager = downloadManager; } @Override @@ -74,7 +78,8 @@ public Translog newTranslog( blobStoreRepository, threadPool, startedPrimarySupplier, - remoteTranslogTransferTracker + remoteTranslogTransferTracker, + downloadManager ); } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 7b969a37e4aa6..8cf99f482cd55 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -19,6 +19,8 @@ import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.util.FileSystemUtils; import org.opensearch.index.remote.RemoteTranslogTransferTracker; +import org.opensearch.index.remote.transfer.DownloadManager; +import org.opensearch.index.remote.transfer.StatsTrackingDownloadManager; import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.index.translog.transfer.FileTransferTracker; import org.opensearch.index.translog.transfer.TransferSnapshot; @@ -94,19 +96,21 @@ public RemoteFsTranslog( BlobStoreRepository blobStoreRepository, ThreadPool threadPool, BooleanSupplier startedPrimarySupplier, - RemoteTranslogTransferTracker remoteTranslogTransferTracker + RemoteTranslogTransferTracker remoteTranslogTransferTracker, + DownloadManager downloadManager ) throws IOException { super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer); logger = Loggers.getLogger(getClass(), shardId); this.startedPrimarySupplier = startedPrimarySupplier; this.remoteTranslogTransferTracker = remoteTranslogTransferTracker; - fileTransferTracker = new FileTransferTracker(shardId, remoteTranslogTransferTracker); + this.fileTransferTracker = new FileTransferTracker(shardId, remoteTranslogTransferTracker); this.translogTransferManager = buildTranslogTransferManager( blobStoreRepository, threadPool, shardId, fileTransferTracker, - remoteTranslogTransferTracker + remoteTranslogTransferTracker, + downloadManager ); try { download(translogTransferManager, location, logger); @@ -162,12 +166,14 @@ public static void download(Repository repository, ShardId shardId, ThreadPool t // TODO: To be revisited as part of https://github.com/opensearch-project/OpenSearch/issues/7567 RemoteTranslogTransferTracker remoteTranslogTransferTracker = new RemoteTranslogTransferTracker(shardId, 1000); FileTransferTracker fileTransferTracker = new FileTransferTracker(shardId, remoteTranslogTransferTracker); + DownloadManager downloadManager = new StatsTrackingDownloadManager(threadPool, null, remoteTranslogTransferTracker); TranslogTransferManager translogTransferManager = buildTranslogTransferManager( blobStoreRepository, threadPool, shardId, fileTransferTracker, - remoteTranslogTransferTracker + remoteTranslogTransferTracker, + downloadManager ); RemoteFsTranslog.download(translogTransferManager, location, logger); logger.trace(remoteTranslogTransferTracker.toString()); @@ -244,14 +250,17 @@ public static TranslogTransferManager buildTranslogTransferManager( ThreadPool threadPool, ShardId shardId, FileTransferTracker fileTransferTracker, - RemoteTranslogTransferTracker remoteTranslogTransferTracker + RemoteTranslogTransferTracker remoteTranslogTransferTracker, + DownloadManager downloadManager ) { return new TranslogTransferManager( shardId, new BlobStoreTransferService(blobStoreRepository.blobStore(), threadPool), + blobStoreRepository.blobStore(), blobStoreRepository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(TRANSLOG), fileTransferTracker, - remoteTranslogTransferTracker + remoteTranslogTransferTracker, + new StatsTrackingDownloadManager(downloadManager, remoteTranslogTransferTracker) ); } @@ -531,12 +540,14 @@ public static void cleanup(Repository repository, ShardId shardId, ThreadPool th // TODO: To be revisited as part of https://github.com/opensearch-project/OpenSearch/issues/7567 RemoteTranslogTransferTracker remoteTranslogTransferTracker = new RemoteTranslogTransferTracker(shardId, 1000); FileTransferTracker fileTransferTracker = new FileTransferTracker(shardId, remoteTranslogTransferTracker); + DownloadManager downloadManager = new StatsTrackingDownloadManager(threadPool, null, remoteTranslogTransferTracker); TranslogTransferManager translogTransferManager = buildTranslogTransferManager( blobStoreRepository, threadPool, shardId, fileTransferTracker, - remoteTranslogTransferTracker + remoteTranslogTransferTracker, + downloadManager ); // clean up all remote translog files translogTransferManager.deleteTranslogFiles(); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java b/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java index 9c2304f809f46..5d70d456ad00b 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java @@ -70,7 +70,7 @@ public void onSuccess(TransferFileSnapshot fileSnapshot) { add(fileSnapshot.getName(), TransferState.SUCCESS); } - void add(String file, boolean success) { + public void add(String file, boolean success) { TransferState targetState = success ? TransferState.SUCCESS : TransferState.FAILED; add(file, targetState); } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java index cfe833dde87eb..78b6cfa709867 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java @@ -116,6 +116,7 @@ void uploadBlobs( */ void listFoldersAsync(String threadpoolName, Iterable path, ActionListener> listener); + // TODO: Remove this API. Only used in tests. /** * * @param path the remote path from where download should be made diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 2f6055df87804..f0ff80cc12345 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -10,29 +10,28 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.OutputStreamIndexOutput; import org.opensearch.action.LatchedActionListener; import org.opensearch.common.SetOnce; +import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.io.VersionedCodecStreamWrapper; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.logging.Loggers; -import org.opensearch.common.lucene.store.ByteArrayIndexInput; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.index.remote.RemoteTranslogTransferTracker; +import org.opensearch.index.remote.transfer.DownloadManager; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.transfer.listener.TranslogTransferListener; import org.opensearch.threadpool.ThreadPool; import java.io.IOException; -import java.io.InputStream; -import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.HashMap; @@ -56,9 +55,11 @@ public class TranslogTransferManager { private final ShardId shardId; private final TransferService transferService; + private final BlobStore blobStore; private final BlobPath remoteDataTransferPath; private final BlobPath remoteMetadataTransferPath; private final BlobPath remoteBaseTransferPath; + private final DownloadManager downloadManager; private final FileTransferTracker fileTransferTracker; private final RemoteTranslogTransferTracker remoteTranslogTransferTracker; @@ -70,7 +71,7 @@ public class TranslogTransferManager { private final static String METADATA_DIR = "metadata"; private final static String DATA_DIR = "data"; - private static final VersionedCodecStreamWrapper metadataStreamWrapper = new VersionedCodecStreamWrapper<>( + public static final VersionedCodecStreamWrapper METADATA_STREAM_WRAPPER = new VersionedCodecStreamWrapper<>( new TranslogTransferMetadataHandler(), TranslogTransferMetadata.CURRENT_VERSION, TranslogTransferMetadata.METADATA_CODEC @@ -79,18 +80,22 @@ public class TranslogTransferManager { public TranslogTransferManager( ShardId shardId, TransferService transferService, + BlobStore blobStore, BlobPath remoteBaseTransferPath, FileTransferTracker fileTransferTracker, - RemoteTranslogTransferTracker remoteTranslogTransferTracker + RemoteTranslogTransferTracker remoteTranslogTransferTracker, + DownloadManager downloadManager ) { this.shardId = shardId; this.transferService = transferService; + this.blobStore = blobStore; this.remoteBaseTransferPath = remoteBaseTransferPath; this.remoteDataTransferPath = remoteBaseTransferPath.add(DATA_DIR); this.remoteMetadataTransferPath = remoteBaseTransferPath.add(METADATA_DIR); this.fileTransferTracker = fileTransferTracker; this.logger = Loggers.getLogger(getClass(), shardId); this.remoteTranslogTransferTracker = remoteTranslogTransferTracker; + this.downloadManager = downloadManager; } public RemoteTranslogTransferTracker getRemoteTranslogTransferTracker() { @@ -238,39 +243,21 @@ public boolean downloadTranslog(String primaryTerm, String generation, Path loca generation, location ); + + final BlobContainer blobContainer = blobStore.blobContainer(remoteDataTransferPath.add(primaryTerm)); + // Download Checkpoint file from remote to local FS String ckpFileName = Translog.getCommitCheckpointFileName(Long.parseLong(generation)); - downloadToFS(ckpFileName, location, primaryTerm); + downloadManager.downloadFileFromRemoteStore(blobContainer, ckpFileName, location); + // Mark in FileTransferTracker so that the same files are not uploaded at the time of translog sync + fileTransferTracker.add(ckpFileName, true); + // Download translog file from remote to local FS String translogFilename = Translog.getFilename(Long.parseLong(generation)); - downloadToFS(translogFilename, location, primaryTerm); - return true; - } - - private void downloadToFS(String fileName, Path location, String primaryTerm) throws IOException { - Path filePath = location.resolve(fileName); - // Here, we always override the existing file if present. - // We need to change this logic when we introduce incremental download - if (Files.exists(filePath)) { - Files.delete(filePath); - } - - boolean downloadStatus = false; - long bytesToRead = 0, downloadStartTime = System.nanoTime(); - try (InputStream inputStream = transferService.downloadBlob(remoteDataTransferPath.add(primaryTerm), fileName)) { - // Capture number of bytes for stats before reading - bytesToRead = inputStream.available(); - Files.copy(inputStream, filePath); - downloadStatus = true; - } finally { - remoteTranslogTransferTracker.addDownloadTimeInMillis((System.nanoTime() - downloadStartTime) / 1_000_000L); - if (downloadStatus) { - remoteTranslogTransferTracker.addDownloadBytesSucceeded(bytesToRead); - } - } - + downloadManager.downloadFileFromRemoteStore(blobContainer, translogFilename, location); // Mark in FileTransferTracker so that the same files are not uploaded at the time of translog sync - fileTransferTracker.add(fileName, true); + fileTransferTracker.add(translogFilename, true); + return true; } public TranslogTransferMetadata readMetadata() throws IOException { @@ -284,24 +271,14 @@ public TranslogTransferMetadata readMetadata() throws IOException { blobMetadataList.stream().map(BlobMetadata::name).collect(Collectors.toList()), TranslogTransferMetadata::getNodeIdByPrimaryTermAndGen ); - String filename = blobMetadataList.get(0).name(); - boolean downloadStatus = false; - long downloadStartTime = System.nanoTime(), bytesToRead = 0; - try (InputStream inputStream = transferService.downloadBlob(remoteMetadataTransferPath, filename)) { - // Capture number of bytes for stats before reading - bytesToRead = inputStream.available(); - IndexInput indexInput = new ByteArrayIndexInput("metadata file", inputStream.readAllBytes()); - metadataSetOnce.set(metadataStreamWrapper.readStream(indexInput)); - downloadStatus = true; + + final BlobContainer blobContainer = blobStore.blobContainer(remoteMetadataTransferPath); + final String filename = blobMetadataList.get(0).name(); + try { + metadataSetOnce.set(downloadManager.fetchTranslogTransferMetadata(blobContainer, filename)); } catch (IOException e) { logger.error(() -> new ParameterizedMessage("Exception while reading metadata file: {}", filename), e); exceptionSetOnce.set(e); - } finally { - remoteTranslogTransferTracker.addDownloadTimeInMillis((System.nanoTime() - downloadStartTime) / 1_000_000L); - logger.debug("translogMetadataDownloadStatus={}", downloadStatus); - if (downloadStatus) { - remoteTranslogTransferTracker.addDownloadBytesSucceeded(bytesToRead); - } } }, e -> { if (e instanceof RuntimeException) { @@ -359,7 +336,7 @@ private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot) * @param metadata The object to be parsed * @return Byte representation for the given metadata */ - public byte[] getMetadataBytes(TranslogTransferMetadata metadata) throws IOException { + public static byte[] getMetadataBytes(TranslogTransferMetadata metadata) throws IOException { byte[] metadataBytes; try (BytesStreamOutput output = new BytesStreamOutput()) { @@ -371,7 +348,7 @@ public byte[] getMetadataBytes(TranslogTransferMetadata metadata) throws IOExcep TranslogTransferMetadata.BUFFER_SIZE ) ) { - metadataStreamWrapper.writeStream(indexOutput, metadata); + METADATA_STREAM_WRAPPER.writeStream(indexOutput, metadata); } metadataBytes = BytesReference.toBytes(output.bytes()); } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java index 052206d807fa6..71e1479593039 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java @@ -39,11 +39,11 @@ public class TranslogTransferMetadata { public static final String METADATA_PREFIX = "metadata"; - static final int BUFFER_SIZE = 4096; + public static final int BUFFER_SIZE = 4096; - static final int CURRENT_VERSION = 1; + public static final int CURRENT_VERSION = 1; - static final String METADATA_CODEC = "md"; + public static final String METADATA_CODEC = "md"; private final long createdAt; diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index db5b93f073b03..fb3a0dc5daa62 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -124,6 +124,7 @@ import org.opensearch.index.recovery.RecoveryStats; import org.opensearch.index.refresh.RefreshStats; import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory; +import org.opensearch.index.remote.transfer.DownloadManager; import org.opensearch.index.search.stats.SearchStats; import org.opensearch.index.seqno.RetentionLeaseStats; import org.opensearch.index.seqno.RetentionLeaseSyncer; @@ -352,7 +353,7 @@ public class IndicesService extends AbstractLifecycleComponent private volatile boolean idFieldDataEnabled; private volatile boolean allowExpensiveQueries; private final RecoverySettings recoverySettings; - + private final DownloadManager downloadManager; @Nullable private final OpenSearchThreadPoolExecutor danglingIndicesThreadPoolExecutor; private final Set danglingIndicesToWrite = Sets.newConcurrentHashSet(); @@ -492,8 +493,15 @@ protected void closeInternal() { this.allowExpensiveQueries = ALLOW_EXPENSIVE_QUERIES.get(clusterService.getSettings()); clusterService.getClusterSettings().addSettingsUpdateConsumer(ALLOW_EXPENSIVE_QUERIES, this::setAllowExpensiveQueries); + this.remoteDirectoryFactory = remoteDirectoryFactory; - this.translogFactorySupplier = getTranslogFactorySupplier(repositoriesServiceSupplier, threadPool, remoteStoreStatsTrackerFactory); + this.downloadManager = new DownloadManager(threadPool, recoverySettings); + this.translogFactorySupplier = getTranslogFactorySupplier( + repositoriesServiceSupplier, + threadPool, + remoteStoreStatsTrackerFactory, + downloadManager + ); this.searchRequestStats = searchRequestStats; this.clusterDefaultRefreshInterval = CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING.get(clusterService.getSettings()); clusterService.getClusterSettings() @@ -523,7 +531,8 @@ private void onRefreshIntervalUpdate(TimeValue clusterDefaultRefreshInterval) { private static BiFunction getTranslogFactorySupplier( Supplier repositoriesServiceSupplier, ThreadPool threadPool, - RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory + RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory, + DownloadManager downloadManager ) { return (indexSettings, shardRouting) -> { if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) { @@ -531,7 +540,8 @@ private static BiFunction getTrans repositoriesServiceSupplier, threadPool, indexSettings.getRemoteStoreTranslogRepository(), - remoteStoreStatsTrackerFactory.getRemoteTranslogTransferTracker(shardRouting.shardId()) + remoteStoreStatsTrackerFactory.getRemoteTranslogTransferTracker(shardRouting.shardId()), + downloadManager ); } return new InternalTranslogFactory(); @@ -900,7 +910,8 @@ private synchronized IndexService createIndexService( translogFactorySupplier, this::getClusterDefaultRefreshInterval, this::getClusterRemoteTranslogBufferInterval, - this.recoverySettings + this.recoverySettings, + this.downloadManager ); } diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java index b06b3e0497cf7..63953ad5a9c62 100644 --- a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java @@ -120,8 +120,8 @@ public void getSegmentFiles( assert directoryFiles.contains(file) == false : "Local store already contains the file " + file; toDownloadSegmentNames.add(file); } - indexShard.getFileDownloader() - .downloadAsync( + indexShard.getDownloadManager() + .copySegmentsFromRemoteStoreAsync( cancellableThreads, remoteDirectory, new ReplicationStatsDirectoryWrapper(storeDirectory, fileProgressTracker), diff --git a/server/src/test/java/org/opensearch/index/IndexModuleTests.java b/server/src/test/java/org/opensearch/index/IndexModuleTests.java index 97bc822be7d51..514f1ea0f88af 100644 --- a/server/src/test/java/org/opensearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/opensearch/index/IndexModuleTests.java @@ -88,6 +88,7 @@ import org.opensearch.index.mapper.ParsedDocument; import org.opensearch.index.mapper.Uid; import org.opensearch.index.remote.RemoteTranslogTransferTracker; +import org.opensearch.index.remote.transfer.DownloadManager; import org.opensearch.index.shard.IndexEventListener; import org.opensearch.index.shard.IndexingOperationListener; import org.opensearch.index.shard.SearchOperationListener; @@ -230,13 +231,15 @@ public void tearDown() throws Exception { private IndexService newIndexService(IndexModule module) throws IOException { final SetOnce repositoriesServiceReference = new SetOnce<>(); repositoriesServiceReference.set(repositoriesService); + DownloadManager downloadManager = new DownloadManager(threadPool, DefaultRecoverySettings.INSTANCE); BiFunction translogFactorySupplier = (indexSettings, shardRouting) -> { if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) { return new RemoteBlobStoreInternalTranslogFactory( repositoriesServiceReference::get, threadPool, indexSettings.getRemoteStoreTranslogRepository(), - new RemoteTranslogTransferTracker(shardRouting.shardId(), 10) + new RemoteTranslogTransferTracker(shardRouting.shardId(), 10), + downloadManager ); } return new InternalTranslogFactory(); @@ -262,7 +265,8 @@ private IndexService newIndexService(IndexModule module) throws IOException { translogFactorySupplier, () -> IndexSettings.DEFAULT_REFRESH_INTERVAL, () -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL, - DefaultRecoverySettings.INSTANCE + DefaultRecoverySettings.INSTANCE, + downloadManager ); } diff --git a/server/src/test/java/org/opensearch/index/store/RemoteStoreFileDownloaderTests.java b/server/src/test/java/org/opensearch/index/remote/transfer/DownloadManagerTests.java similarity index 50% rename from server/src/test/java/org/opensearch/index/store/RemoteStoreFileDownloaderTests.java rename to server/src/test/java/org/opensearch/index/remote/transfer/DownloadManagerTests.java index 6d8b3fe4d69fb..c0a5548b0ab6b 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteStoreFileDownloaderTests.java +++ b/server/src/test/java/org/opensearch/index/remote/transfer/DownloadManagerTests.java @@ -6,7 +6,7 @@ * compatible open source license. */ -package org.opensearch.index.store; +package org.opensearch.index.remote.transfer; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FilterDirectory; @@ -16,12 +16,14 @@ import org.apache.lucene.store.NIOFSDirectory; import org.opensearch.OpenSearchTimeoutException; import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.CancellableThreads; import org.opensearch.core.action.ActionListener; -import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.translog.transfer.TranslogTransferManager; +import org.opensearch.index.translog.transfer.TranslogTransferMetadata; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; @@ -29,9 +31,13 @@ import org.junit.After; import org.junit.Before; +import java.io.ByteArrayInputStream; import java.io.EOFException; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.nio.file.NoSuchFileException; +import java.nio.file.Path; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -42,14 +48,20 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -public class RemoteStoreFileDownloaderTests extends OpenSearchTestCase { +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +public class DownloadManagerTests extends OpenSearchTestCase { private ThreadPool threadPool; private Directory source; private Directory destination; private Directory secondDestination; - private RemoteStoreFileDownloader fileDownloader; - private Map files = new HashMap<>(); + private BlobContainer blobContainer; + private DownloadManager downloadManager; + private Path path; + private final Map files = new HashMap<>(); @Before public void setup() throws IOException { @@ -62,6 +74,8 @@ public void setup() throws IOException { source = new NIOFSDirectory(createTempDir()); destination = new NIOFSDirectory(createTempDir()); secondDestination = new NIOFSDirectory(createTempDir()); + blobContainer = mock(BlobContainer.class); + path = createTempDir("DownloadManagerTests"); for (int i = 0; i < 10; i++) { final String filename = "file_" + i; final int content = randomInt(); @@ -70,11 +84,7 @@ public void setup() throws IOException { } files.put(filename, content); } - fileDownloader = new RemoteStoreFileDownloader( - ShardId.fromString("[RemoteStoreFileDownloaderTests][0]"), - threadPool, - recoverySettings - ); + downloadManager = new DownloadManager(threadPool, recoverySettings); } @After @@ -83,59 +93,121 @@ public void stopThreadPool() throws Exception { assertTrue(threadPool.awaitTermination(5, TimeUnit.SECONDS)); } - public void testDownload() throws IOException { + public void testFetchTranslogTransferMetadata() throws Exception { + final String metadataFileName = "translog.md"; + final TranslogTransferMetadata expectedTranslogTransferMetadata = new TranslogTransferMetadata(1, 2, 3, 4, "node-1"); + when(blobContainer.readBlob(eq(metadataFileName))).thenReturn( + new ByteArrayInputStream(TranslogTransferManager.getMetadataBytes(expectedTranslogTransferMetadata)) + ); + TranslogTransferMetadata actualTranslogTransferMetadata = downloadManager.fetchTranslogTransferMetadata( + blobContainer, + metadataFileName + ); + assertEquals(expectedTranslogTransferMetadata, actualTranslogTransferMetadata); + } + + public void testFetchTranslogTransferMetadataException() throws Exception { + final String metadataFileName = "translog.md"; + when(blobContainer.readBlob(eq(metadataFileName))).thenThrow(new IOException("something happened")); + assertThrows(IOException.class, () -> downloadManager.fetchTranslogTransferMetadata(blobContainer, metadataFileName)); + } + + public void testDownloadFileFromRemoteStoreToMemory() throws Exception { + byte[] expectedData = "test".getBytes(StandardCharsets.UTF_16); + when(blobContainer.readBlob(anyString())).thenReturn(new ByteArrayInputStream(expectedData)); + byte[] actualData = downloadManager.downloadFileFromRemoteStoreToMemory(blobContainer, "test-file"); + assertArrayEquals(expectedData, actualData); + } + + public void testDownloadFileFromRemoteStoreToMemoryException() throws Exception { + when(blobContainer.readBlob(anyString())).thenThrow(new IOException("something happened")); + assertThrows(IOException.class, () -> downloadManager.downloadFileFromRemoteStoreToMemory(blobContainer, "test-file")); + } + + public void testDownloadFileFromRemoteStore() throws Exception { + final String fileName = "test-file"; + final byte[] expectedData = "test".getBytes(StandardCharsets.UTF_16); + final Path fileLocation = path.resolve(fileName); + when(blobContainer.readBlob(anyString())).thenReturn(new ByteArrayInputStream(expectedData)); + downloadManager.downloadFileFromRemoteStore(blobContainer, fileName, path); + + assertTrue(Files.exists(fileLocation)); + assertTrue(Files.size(fileLocation) > 0); + } + + public void testDownloadFileFromRemoteStoreException() throws Exception { + final String fileName = "test-file"; + final Path fileLocation = path.resolve(fileName); + when(blobContainer.readBlob(anyString())).thenThrow(new IOException("unexpected error")); + assertThrows(IOException.class, () -> downloadManager.downloadFileFromRemoteStore(blobContainer, fileName, path)); + assertFalse(Files.exists(fileLocation)); + } + + public void testCopySegmentsFromRemoteStoreAsync() throws IOException { final PlainActionFuture l = new PlainActionFuture<>(); - fileDownloader.downloadAsync(new CancellableThreads(), source, destination, files.keySet(), l); + downloadManager.copySegmentsFromRemoteStoreAsync(new CancellableThreads(), source, destination, files.keySet(), l); l.actionGet(); assertContent(files, destination); } - public void testDownloadWithSecondDestination() throws IOException, InterruptedException { - fileDownloader.download(source, destination, secondDestination, files.keySet(), () -> {}); + public void testCopySegmentsFromRemoteStore() throws IOException, InterruptedException { + downloadManager.copySegmentsFromRemoteStore(source, destination, secondDestination, files.keySet(), () -> {}); assertContent(files, destination); assertContent(files, secondDestination); } - public void testDownloadWithFileCompletionHandler() throws IOException, InterruptedException { + public void testCopySegmentsWithFileCompletionHandler() throws IOException, InterruptedException { final AtomicInteger counter = new AtomicInteger(0); - fileDownloader.download(source, destination, null, files.keySet(), counter::incrementAndGet); + downloadManager.copySegmentsFromRemoteStore(source, destination, null, files.keySet(), counter::incrementAndGet); assertContent(files, destination); assertEquals(files.size(), counter.get()); } - public void testDownloadNonExistentFile() throws InterruptedException { + public void testCopyNonExistentFile() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); - fileDownloader.downloadAsync(new CancellableThreads(), source, destination, Set.of("not real"), new ActionListener<>() { - @Override - public void onResponse(Void unused) {} + downloadManager.copySegmentsFromRemoteStoreAsync( + new CancellableThreads(), + source, + destination, + Set.of("not real"), + new ActionListener<>() { + @Override + public void onResponse(Void unused) {} - @Override - public void onFailure(Exception e) { - assertEquals(NoSuchFileException.class, e.getClass()); - latch.countDown(); + @Override + public void onFailure(Exception e) { + assertEquals(NoSuchFileException.class, e.getClass()); + latch.countDown(); + } } - }); + ); assertTrue(latch.await(10, TimeUnit.SECONDS)); } - public void testDownloadExtraNonExistentFile() throws InterruptedException { + public void testCopySegmentsExtraNonExistentFile() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); final List filesWithExtra = new ArrayList<>(files.keySet()); filesWithExtra.add("not real"); - fileDownloader.downloadAsync(new CancellableThreads(), source, destination, filesWithExtra, new ActionListener<>() { - @Override - public void onResponse(Void unused) {} + downloadManager.copySegmentsFromRemoteStoreAsync( + new CancellableThreads(), + source, + destination, + filesWithExtra, + new ActionListener<>() { + @Override + public void onResponse(Void unused) {} - @Override - public void onFailure(Exception e) { - assertEquals(NoSuchFileException.class, e.getClass()); - latch.countDown(); + @Override + public void onFailure(Exception e) { + assertEquals(NoSuchFileException.class, e.getClass()); + latch.countDown(); + } } - }); + ); assertTrue(latch.await(10, TimeUnit.SECONDS)); } - public void testCancellable() { + public void testCopySegmentsCancellable() { final CancellableThreads cancellableThreads = new CancellableThreads(); final PlainActionFuture blockingListener = new PlainActionFuture<>(); final Directory blockingDestination = new FilterDirectory(destination) { @@ -149,7 +221,7 @@ public void copyFrom(Directory from, String src, String dest, IOContext context) } } }; - fileDownloader.downloadAsync(cancellableThreads, source, blockingDestination, files.keySet(), blockingListener); + downloadManager.copySegmentsFromRemoteStoreAsync(cancellableThreads, source, blockingDestination, files.keySet(), blockingListener); assertThrows( "Expected to timeout due to blocking directory", OpenSearchTimeoutException.class, @@ -163,7 +235,7 @@ public void copyFrom(Directory from, String src, String dest, IOContext context) ); } - public void testBlockingCallCanBeInterrupted() throws Exception { + public void testCopySegmentsBlockingCallCanBeInterrupted() throws Exception { final Directory blockingDestination = new FilterDirectory(destination) { @Override public void copyFrom(Directory from, String src, String dest, IOContext context) { @@ -178,7 +250,7 @@ public void copyFrom(Directory from, String src, String dest, IOContext context) final AtomicReference capturedException = new AtomicReference<>(); final Thread thread = new Thread(() -> { try { - fileDownloader.download(source, blockingDestination, null, files.keySet(), () -> {}); + downloadManager.copySegmentsFromRemoteStore(source, blockingDestination, null, files.keySet(), () -> {}); } catch (Exception e) { capturedException.set(e); } @@ -189,26 +261,35 @@ public void copyFrom(Directory from, String src, String dest, IOContext context) assertEquals(InterruptedException.class, capturedException.get().getClass()); } - public void testIOException() throws IOException, InterruptedException { + public void testCopySegmentsIOException() throws IOException, InterruptedException { final Directory failureDirectory = new FilterDirectory(destination) { @Override public void copyFrom(Directory from, String src, String dest, IOContext context) throws IOException { throw new IOException("test"); } }; - assertThrows(IOException.class, () -> fileDownloader.download(source, failureDirectory, null, files.keySet(), () -> {})); + assertThrows( + IOException.class, + () -> downloadManager.copySegmentsFromRemoteStore(source, failureDirectory, null, files.keySet(), () -> {}) + ); final CountDownLatch latch = new CountDownLatch(1); - fileDownloader.downloadAsync(new CancellableThreads(), source, failureDirectory, files.keySet(), new ActionListener<>() { - @Override - public void onResponse(Void unused) {} + downloadManager.copySegmentsFromRemoteStoreAsync( + new CancellableThreads(), + source, + failureDirectory, + files.keySet(), + new ActionListener<>() { + @Override + public void onResponse(Void unused) {} - @Override - public void onFailure(Exception e) { - assertEquals(IOException.class, e.getClass()); - latch.countDown(); + @Override + public void onFailure(Exception e) { + assertEquals(IOException.class, e.getClass()); + latch.countDown(); + } } - }); + ); assertTrue(latch.await(10, TimeUnit.SECONDS)); } diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java index a83e737dc25c1..05ab8bf0faad1 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java @@ -43,6 +43,8 @@ import org.opensearch.index.IndexSettings; import org.opensearch.index.engine.MissingHistoryOperationsException; import org.opensearch.index.remote.RemoteTranslogTransferTracker; +import org.opensearch.index.remote.transfer.DownloadManager; +import org.opensearch.index.remote.transfer.StatsTrackingDownloadManager; import org.opensearch.index.seqno.LocalCheckpointTracker; import org.opensearch.index.seqno.LocalCheckpointTrackerTests; import org.opensearch.index.seqno.SequenceNumbers; @@ -50,6 +52,7 @@ import org.opensearch.index.translog.transfer.TranslogTransferManager; import org.opensearch.index.translog.transfer.TranslogTransferMetadata; import org.opensearch.index.translog.transfer.TranslogUploadFailedException; +import org.opensearch.indices.recovery.DefaultRecoverySettings; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.repositories.blobstore.BlobStoreRepository; @@ -124,6 +127,7 @@ public class RemoteFsTranslogTests extends OpenSearchTestCase { private final AtomicBoolean primaryMode = new AtomicBoolean(true); private final AtomicReference persistedSeqNoConsumer = new AtomicReference<>(); private ThreadPool threadPool; + private DownloadManager downloadManager; private final static String METADATA_DIR = "metadata"; private final static String DATA_DIR = "data"; AtomicInteger writeCalls = new AtomicInteger(); @@ -178,6 +182,8 @@ private RemoteFsTranslog create(Path path, BlobStoreRepository repository, Strin final TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(translogConfig.getIndexSettings()); threadPool = new TestThreadPool(getClass().getName()); blobStoreTransferService = new BlobStoreTransferService(repository.blobStore(), threadPool); + RemoteTranslogTransferTracker translogTransferTracker = new RemoteTranslogTransferTracker(shardId, 10); + downloadManager = new StatsTrackingDownloadManager(threadPool, DefaultRecoverySettings.INSTANCE, translogTransferTracker); return new RemoteFsTranslog( translogConfig, translogUUID, @@ -188,7 +194,8 @@ private RemoteFsTranslog create(Path path, BlobStoreRepository repository, Strin repository, threadPool, primaryMode::get, - new RemoteTranslogTransferTracker(shardId, 10) + translogTransferTracker, + downloadManager ); } @@ -447,6 +454,8 @@ public void testExtraGenToKeep() throws Exception { ); TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(config.getIndexSettings()); ArrayList ops = new ArrayList<>(); + RemoteTranslogTransferTracker translogTransferTracker = new RemoteTranslogTransferTracker(shardId, 10); + downloadManager = new StatsTrackingDownloadManager(threadPool, DefaultRecoverySettings.INSTANCE, translogTransferTracker); try ( RemoteFsTranslog translog = new RemoteFsTranslog( config, @@ -458,7 +467,8 @@ public void testExtraGenToKeep() throws Exception { repository, threadPool, () -> Boolean.TRUE, - new RemoteTranslogTransferTracker(shardId, 10) + translogTransferTracker, + downloadManager ) { @Override ChannelFactory getChannelFactory() { @@ -1496,6 +1506,9 @@ public void testTranslogWriterCanFlushInAddOrReadCall() throws IOException { primaryTerm.get() ); + RemoteTranslogTransferTracker translogTransferTracker = new RemoteTranslogTransferTracker(shardId, 10); + downloadManager = new StatsTrackingDownloadManager(threadPool, DefaultRecoverySettings.INSTANCE, translogTransferTracker); + try ( Translog translog = new RemoteFsTranslog( config, @@ -1507,7 +1520,8 @@ public void testTranslogWriterCanFlushInAddOrReadCall() throws IOException { repository, threadPool, () -> Boolean.TRUE, - new RemoteTranslogTransferTracker(shardId, 10) + translogTransferTracker, + downloadManager ) { @Override ChannelFactory getChannelFactory() { @@ -1604,6 +1618,9 @@ public void force(boolean metaData) throws IOException { primaryTerm.get() ); + RemoteTranslogTransferTracker translogTransferTracker = new RemoteTranslogTransferTracker(shardId, 10); + downloadManager = new StatsTrackingDownloadManager(threadPool, DefaultRecoverySettings.INSTANCE, translogTransferTracker); + try ( Translog translog = new RemoteFsTranslog( config, @@ -1615,7 +1632,8 @@ public void force(boolean metaData) throws IOException { repository, threadPool, () -> Boolean.TRUE, - new RemoteTranslogTransferTracker(shardId, 10) + translogTransferTracker, + downloadManager ) { @Override ChannelFactory getChannelFactory() { diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index e34bc078896f9..af1e60314b320 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -22,11 +22,14 @@ import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.remote.RemoteTranslogTransferTracker; +import org.opensearch.index.remote.transfer.DownloadManager; +import org.opensearch.index.remote.transfer.StatsTrackingDownloadManager; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.transfer.FileSnapshot.CheckpointFileSnapshot; import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; import org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot; import org.opensearch.index.translog.transfer.listener.TranslogTransferListener; +import org.opensearch.indices.recovery.DefaultRecoverySettings; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -64,6 +67,9 @@ public class TranslogTransferManagerTests extends OpenSearchTestCase { private TransferService transferService; + private DownloadManager downloadManager; + private BlobStore blobStore; + private BlobContainer blobContainer; private ShardId shardId; private BlobPath remoteBaseTransferPath; private ThreadPool threadPool; @@ -87,20 +93,36 @@ public void setUp() throws Exception { minTranslogGeneration = randomLongBetween(0, generation); remoteBaseTransferPath = new BlobPath().add("base_path"); transferService = mock(TransferService.class); + blobStore = mock(BlobStore.class); + blobContainer = mock(BlobContainer.class); threadPool = new TestThreadPool(getClass().getName()); remoteTranslogTransferTracker = new RemoteTranslogTransferTracker(shardId, 20); + downloadManager = new StatsTrackingDownloadManager(threadPool, DefaultRecoverySettings.INSTANCE, remoteTranslogTransferTracker); tlogBytes = "Hello Translog".getBytes(StandardCharsets.UTF_8); ckpBytes = "Hello Checkpoint".getBytes(StandardCharsets.UTF_8); tracker = new FileTransferTracker(new ShardId("index", "indexUuid", 0), remoteTranslogTransferTracker); translogTransferManager = new TranslogTransferManager( shardId, transferService, + blobStore, remoteBaseTransferPath, tracker, - remoteTranslogTransferTracker + remoteTranslogTransferTracker, + downloadManager ); delayForBlobDownload = 1; + + when(blobStore.blobContainer(any(BlobPath.class))).thenReturn(blobContainer); + when(blobContainer.readBlob(eq("translog-23.tlog"))).thenAnswer(invocation -> { + Thread.sleep(delayForBlobDownload); + return new ByteArrayInputStream(tlogBytes); + }); + when(blobContainer.readBlob(eq("translog-23.ckp"))).thenAnswer(invocation -> { + Thread.sleep(delayForBlobDownload); + return new ByteArrayInputStream(ckpBytes); + }); + when(transferService.downloadBlob(any(BlobPath.class), eq("translog-23.tlog"))).thenAnswer(invocation -> { Thread.sleep(delayForBlobDownload); return new ByteArrayInputStream(tlogBytes); @@ -159,9 +181,11 @@ public void onFailure(TransferFileSnapshot fileSnapshot, Exception e) { TranslogTransferManager translogTransferManager = new TranslogTransferManager( shardId, transferService, + blobStore, remoteBaseTransferPath, fileTransferTracker, - remoteTranslogTransferTracker + remoteTranslogTransferTracker, + downloadManager ); assertTrue(translogTransferManager.transferSnapshot(createTransferSnapshot(), new TranslogTransferListener() { @@ -194,9 +218,11 @@ public void testTransferSnapshotOnUploadTimeout() throws Exception { TranslogTransferManager translogTransferManager = new TranslogTransferManager( shardId, transferService, + blobStore, remoteBaseTransferPath, fileTransferTracker, - remoteTranslogTransferTracker + remoteTranslogTransferTracker, + downloadManager ); SetOnce exception = new SetOnce<>(); translogTransferManager.transferSnapshot(createTransferSnapshot(), new TranslogTransferListener() { @@ -235,9 +261,11 @@ public void testTransferSnapshotOnThreadInterrupt() throws Exception { TranslogTransferManager translogTransferManager = new TranslogTransferManager( shardId, transferService, + blobStore, remoteBaseTransferPath, fileTransferTracker, - remoteTranslogTransferTracker + remoteTranslogTransferTracker, + downloadManager ); SetOnce exception = new SetOnce<>(); @@ -333,9 +361,11 @@ public void testReadMetadataNoFile() throws IOException { TranslogTransferManager translogTransferManager = new TranslogTransferManager( shardId, transferService, + blobStore, remoteBaseTransferPath, null, - remoteTranslogTransferTracker + remoteTranslogTransferTracker, + downloadManager ); doAnswer(invocation -> { LatchedActionListener> latchedActionListener = invocation.getArgument(3); @@ -354,9 +384,11 @@ public void testReadMetadataFile() throws IOException { TranslogTransferManager translogTransferManager = new TranslogTransferManager( shardId, transferService, + blobStore, remoteBaseTransferPath, null, - remoteTranslogTransferTracker + remoteTranslogTransferTracker, + downloadManager ); TranslogTransferMetadata metadata1 = new TranslogTransferMetadata(1, 1, 1, 2); String mdFilename1 = metadata1.getFileName(); @@ -375,14 +407,14 @@ public void testReadMetadataFile() throws IOException { TranslogTransferMetadata metadata = createTransferSnapshot().getTranslogTransferMetadata(); long delayForMdDownload = 1; - when(transferService.downloadBlob(any(BlobPath.class), eq(mdFilename1))).thenAnswer(invocation -> { + when(blobContainer.readBlob(eq(mdFilename1))).thenAnswer(invocation -> { Thread.sleep(delayForMdDownload); - return new ByteArrayInputStream(translogTransferManager.getMetadataBytes(metadata)); + return new ByteArrayInputStream(TranslogTransferManager.getMetadataBytes(metadata)); }); assertEquals(metadata, translogTransferManager.readMetadata()); - assertEquals(translogTransferManager.getMetadataBytes(metadata).length, remoteTranslogTransferTracker.getDownloadBytesSucceeded()); + assertEquals(TranslogTransferManager.getMetadataBytes(metadata).length, remoteTranslogTransferTracker.getDownloadBytesSucceeded()); assertTrue(remoteTranslogTransferTracker.getTotalDownloadTimeInMillis() >= delayForMdDownload); } @@ -390,9 +422,11 @@ public void testReadMetadataReadException() throws IOException { TranslogTransferManager translogTransferManager = new TranslogTransferManager( shardId, transferService, + blobStore, remoteBaseTransferPath, null, - remoteTranslogTransferTracker + remoteTranslogTransferTracker, + downloadManager ); TranslogTransferMetadata tm = new TranslogTransferMetadata(1, 1, 1, 2); String mdFilename = tm.getFileName(); @@ -406,7 +440,10 @@ public void testReadMetadataReadException() throws IOException { }).when(transferService) .listAllInSortedOrder(any(BlobPath.class), eq(TranslogTransferMetadata.METADATA_PREFIX), anyInt(), any(ActionListener.class)); - when(transferService.downloadBlob(any(BlobPath.class), eq(mdFilename))).thenThrow(new IOException("Something went wrong")); + when(blobContainer.readBlob(eq(mdFilename))).thenAnswer(invocation -> { + Thread.sleep(delayForBlobDownload); + throw new IOException("Something went wrong"); + }); assertThrows(IOException.class, translogTransferManager::readMetadata); assertNoDownloadStats(true); @@ -426,9 +463,11 @@ public void testReadMetadataListException() throws IOException { TranslogTransferManager translogTransferManager = new TranslogTransferManager( shardId, transferService, + blobStore, remoteBaseTransferPath, null, - remoteTranslogTransferTracker + remoteTranslogTransferTracker, + downloadManager ); doAnswer(invocation -> { @@ -461,8 +500,8 @@ public void testDownloadTranslogAlreadyExists() throws IOException { translogTransferManager.downloadTranslog("12", "23", location); - verify(transferService).downloadBlob(any(BlobPath.class), eq("translog-23.tlog")); - verify(transferService).downloadBlob(any(BlobPath.class), eq("translog-23.ckp")); + verify(blobContainer).readBlob(eq("translog-23.tlog")); + verify(blobContainer).readBlob(eq("translog-23.ckp")); assertTrue(Files.exists(location.resolve("translog-23.tlog"))); assertTrue(Files.exists(location.resolve("translog-23.ckp"))); assertTlogCkpDownloadStats(); @@ -476,8 +515,8 @@ public void testDownloadTranslogWithTrackerUpdated() throws IOException { translogTransferManager.downloadTranslog("12", "23", location); - verify(transferService).downloadBlob(any(BlobPath.class), eq(translogFile)); - verify(transferService).downloadBlob(any(BlobPath.class), eq(checkpointFile)); + verify(blobContainer).readBlob(eq("translog-23.tlog")); + verify(blobContainer).readBlob(eq("translog-23.ckp")); assertTrue(Files.exists(location.resolve(translogFile))); assertTrue(Files.exists(location.resolve(checkpointFile))); @@ -499,9 +538,11 @@ public void testDeleteTranslogSuccess() throws Exception { TranslogTransferManager translogTransferManager = new TranslogTransferManager( shardId, blobStoreTransferService, + blobStore, remoteBaseTransferPath, tracker, - remoteTranslogTransferTracker + remoteTranslogTransferTracker, + downloadManager ); String translogFile = "translog-19.tlog", checkpointFile = "translog-19.ckp"; tracker.add(translogFile, true); @@ -518,9 +559,11 @@ public void testDeleteStaleTranslogMetadata() { TranslogTransferManager translogTransferManager = new TranslogTransferManager( shardId, transferService, + blobStore, remoteBaseTransferPath, null, - remoteTranslogTransferTracker + remoteTranslogTransferTracker, + downloadManager ); String tm1 = new TranslogTransferMetadata(1, 1, 1, 2).getFileName(); String tm2 = new TranslogTransferMetadata(1, 2, 1, 2).getFileName(); @@ -569,9 +612,11 @@ public void testDeleteTranslogFailure() throws Exception { TranslogTransferManager translogTransferManager = new TranslogTransferManager( shardId, blobStoreTransferService, + blobStore, remoteBaseTransferPath, tracker, - remoteTranslogTransferTracker + remoteTranslogTransferTracker, + downloadManager ); String translogFile = "translog-19.tlog", checkpointFile = "translog-19.ckp"; tracker.add(translogFile, true); @@ -612,9 +657,11 @@ public void testMetadataConflict() throws InterruptedException { TranslogTransferManager translogTransferManager = new TranslogTransferManager( shardId, transferService, + blobStore, remoteBaseTransferPath, null, - remoteTranslogTransferTracker + remoteTranslogTransferTracker, + downloadManager ); TranslogTransferMetadata tm = new TranslogTransferMetadata(1, 1, 1, 2, "node--1"); String mdFilename = tm.getFileName(); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 412d5235fe462..816e2d6217ec7 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -99,6 +99,7 @@ import org.opensearch.index.mapper.SourceToParse; import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory; import org.opensearch.index.remote.RemoteTranslogTransferTracker; +import org.opensearch.index.remote.transfer.DownloadManager; import org.opensearch.index.replication.TestReplicationSource; import org.opensearch.index.seqno.ReplicationTracker; import org.opensearch.index.seqno.RetentionLeaseSyncer; @@ -664,6 +665,7 @@ protected IndexShard newShard( } else { remoteStore = null; } + final DownloadManager downloadManager = new DownloadManager(threadPool, DefaultRecoverySettings.INSTANCE); final BiFunction translogFactorySupplier = (settings, shardRouting) -> { if (settings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) { @@ -671,7 +673,8 @@ protected IndexShard newShard( () -> mockRepoSvc, threadPool, settings.getRemoteStoreTranslogRepository(), - new RemoteTranslogTransferTracker(shardRouting.shardId(), 20) + new RemoteTranslogTransferTracker(shardRouting.shardId(), 20), + downloadManager ); } return new InternalTranslogFactory(); @@ -703,7 +706,8 @@ protected IndexShard newShard( remoteStoreStatsTrackerFactory, () -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL, "dummy-node", - DefaultRecoverySettings.INSTANCE + DefaultRecoverySettings.INSTANCE, + downloadManager ); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); if (remoteStoreStatsTrackerFactory != null) {