diff --git a/server/src/main/java/org/opensearch/index/IndexModule.java b/server/src/main/java/org/opensearch/index/IndexModule.java index 8692876412ea9..e29283724ebf8 100644 --- a/server/src/main/java/org/opensearch/index/IndexModule.java +++ b/server/src/main/java/org/opensearch/index/IndexModule.java @@ -81,6 +81,7 @@ import org.opensearch.indices.IndicesQueryCache; import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; import org.opensearch.indices.mapper.MapperRegistry; +import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.plugins.IndexStorePlugin; import org.opensearch.repositories.RepositoriesService; @@ -602,7 +603,8 @@ public IndexService newIndexService( IndexStorePlugin.DirectoryFactory remoteDirectoryFactory, BiFunction translogFactorySupplier, Supplier clusterDefaultRefreshIntervalSupplier, - Supplier clusterRemoteTranslogBufferIntervalSupplier + Supplier clusterRemoteTranslogBufferIntervalSupplier, + RecoverySettings recoverySettings ) throws IOException { final IndexEventListener eventListener = freeze(); Function> readerWrapperFactory = indexReaderWrapper @@ -660,7 +662,8 @@ public IndexService newIndexService( recoveryStateFactory, translogFactorySupplier, clusterDefaultRefreshIntervalSupplier, - clusterRemoteTranslogBufferIntervalSupplier + clusterRemoteTranslogBufferIntervalSupplier, + recoverySettings ); success = true; return indexService; diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index af23145be9f89..a53dbe246fa44 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -89,13 +89,13 @@ import org.opensearch.index.shard.ShardNotInPrimaryModeException; import org.opensearch.index.shard.ShardPath; import org.opensearch.index.similarity.SimilarityService; -import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory; import org.opensearch.index.store.Store; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.TranslogFactory; import org.opensearch.indices.cluster.IndicesClusterStateService; import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; import org.opensearch.indices.mapper.MapperRegistry; +import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.plugins.IndexStorePlugin; @@ -179,6 +179,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust private final BiFunction translogFactorySupplier; private final Supplier clusterDefaultRefreshIntervalSupplier; private final Supplier clusterRemoteTranslogBufferIntervalSupplier; + private final RecoverySettings recoverySettings; public IndexService( IndexSettings indexSettings, @@ -213,7 +214,8 @@ public IndexService( IndexStorePlugin.RecoveryStateFactory recoveryStateFactory, BiFunction translogFactorySupplier, Supplier clusterDefaultRefreshIntervalSupplier, - Supplier clusterRemoteTranslogBufferIntervalSupplier + Supplier clusterRemoteTranslogBufferIntervalSupplier, + RecoverySettings recoverySettings ) { super(indexSettings); this.allowExpensiveQueries = allowExpensiveQueries; @@ -290,6 +292,7 @@ public IndexService( this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this); this.translogFactorySupplier = translogFactorySupplier; this.clusterRemoteTranslogBufferIntervalSupplier = clusterRemoteTranslogBufferIntervalSupplier; + this.recoverySettings = recoverySettings; updateFsyncTaskIfNecessary(); } @@ -522,7 +525,7 @@ public synchronized IndexShard createShard( remoteStoreStatsTrackerFactory, clusterRemoteTranslogBufferIntervalSupplier, nodeEnv.nodeId(), - (RemoteSegmentStoreDirectoryFactory) remoteDirectoryFactory + recoverySettings ); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 251f9a5ae01c0..2643af501bbc9 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -62,7 +62,6 @@ import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.opensearch.action.admin.indices.upgrade.post.UpgradeRequest; -import org.opensearch.action.support.PlainActionFuture; import org.opensearch.action.support.replication.PendingReplicationActions; import org.opensearch.action.support.replication.ReplicationResponse; import org.opensearch.cluster.metadata.DataStream; @@ -160,9 +159,8 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.PrimaryReplicaSyncer.ResyncTask; import org.opensearch.index.similarity.SimilarityService; -import org.opensearch.index.store.DirectoryFileTransferTracker; import org.opensearch.index.store.RemoteSegmentStoreDirectory; -import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory; +import org.opensearch.index.store.RemoteStoreFileDownloader; import org.opensearch.index.store.Store; import org.opensearch.index.store.Store.MetadataSnapshot; import org.opensearch.index.store.StoreFileMetadata; @@ -184,6 +182,7 @@ import org.opensearch.indices.recovery.PeerRecoveryTargetService; import org.opensearch.indices.recovery.RecoveryFailedException; import org.opensearch.indices.recovery.RecoveryListener; +import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; @@ -201,7 +200,6 @@ import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; import java.nio.file.NoSuchFileException; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -342,7 +340,7 @@ Runnable getGlobalCheckpointSyncer() { private final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory; private final List internalRefreshListener = new ArrayList<>(); - private final RemoteSegmentStoreDirectoryFactory remoteSegmentStoreDirectoryFactory; + private final RemoteStoreFileDownloader fileDownloader; public IndexShard( final ShardRouting shardRouting, @@ -371,10 +369,7 @@ public IndexShard( final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory, final Supplier clusterRemoteTranslogBufferIntervalSupplier, final String nodeId, - // Wiring a directory factory here breaks some intended abstractions, but this remote directory - // factory is used not as a Lucene directory but instead to copy files from a remote store when - // restoring a shallow snapshot. - @Nullable final RemoteSegmentStoreDirectoryFactory remoteSegmentStoreDirectoryFactory + final RecoverySettings recoverySettings ) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); @@ -470,7 +465,7 @@ public boolean shouldCache(Query query) { ? false : mapperService.documentMapper().mappers().containsTimeStampField(); this.remoteStoreStatsTrackerFactory = remoteStoreStatsTrackerFactory; - this.remoteSegmentStoreDirectoryFactory = remoteSegmentStoreDirectoryFactory; + this.fileDownloader = new RemoteStoreFileDownloader(shardRouting.shardId(), threadPool, recoverySettings); } public ThreadPool getThreadPool() { @@ -568,6 +563,10 @@ public String getNodeId() { return translogConfig.getNodeId(); } + public RemoteStoreFileDownloader getFileDownloader() { + return fileDownloader; + } + @Override public void updateShardState( final ShardRouting newRouting, @@ -2706,7 +2705,7 @@ public void restoreFromRemoteStore(ActionListener listener) { public void restoreFromSnapshotAndRemoteStore( Repository repository, - RemoteSegmentStoreDirectoryFactory remoteSegmentStoreDirectoryFactory, + RepositoriesService repositoriesService, ActionListener listener ) { try { @@ -2714,7 +2713,7 @@ public void restoreFromSnapshotAndRemoteStore( assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT : "invalid recovery type: " + recoveryState.getRecoverySource(); StoreRecovery storeRecovery = new StoreRecovery(shardId, logger); - storeRecovery.recoverFromSnapshotAndRemoteStore(this, repository, remoteSegmentStoreDirectoryFactory, listener); + storeRecovery.recoverFromSnapshotAndRemoteStore(this, repository, repositoriesService, listener, threadPool); } catch (Exception e) { listener.onFailure(e); } @@ -3554,7 +3553,7 @@ public void startRecovery( "from snapshot and remote store", recoveryState, recoveryListener, - l -> restoreFromSnapshotAndRemoteStore(repositoriesService.repository(repo), remoteSegmentStoreDirectoryFactory, l) + l -> restoreFromSnapshotAndRemoteStore(repositoriesService.repository(repo), repositoriesService, l) ); // indicesService.indexService(shardRouting.shardId().getIndex()).addMetadataListener(); } else { @@ -4912,7 +4911,7 @@ private String copySegmentFiles( if (toDownloadSegments.isEmpty() == false) { try { - downloadSegments(storeDirectory, sourceRemoteDirectory, targetRemoteDirectory, toDownloadSegments, onFileSync); + fileDownloader.download(sourceRemoteDirectory, storeDirectory, targetRemoteDirectory, toDownloadSegments, onFileSync); } catch (Exception e) { throw new IOException("Error occurred when downloading segments from remote store", e); } @@ -4925,26 +4924,6 @@ private String copySegmentFiles( return segmentNFile; } - private void downloadSegments( - Directory storeDirectory, - RemoteSegmentStoreDirectory sourceRemoteDirectory, - RemoteSegmentStoreDirectory targetRemoteDirectory, - Set toDownloadSegments, - final Runnable onFileSync - ) throws IOException { - final Path indexPath = store.shardPath() == null ? null : store.shardPath().resolveIndex(); - final DirectoryFileTransferTracker fileTransferTracker = store.getDirectoryFileTransferTracker(); - for (String segment : toDownloadSegments) { - final PlainActionFuture segmentListener = PlainActionFuture.newFuture(); - sourceRemoteDirectory.copyTo(segment, storeDirectory, indexPath, fileTransferTracker, segmentListener); - segmentListener.actionGet(); - onFileSync.run(); - if (targetRemoteDirectory != null) { - targetRemoteDirectory.copyFrom(storeDirectory, segment, segment, IOContext.DEFAULT); - } - } - } - private boolean localDirectoryContains(Directory localDirectory, String file, long checksum) { try (IndexInput indexInput = localDirectory.openInput(file, IOContext.DEFAULT)) { if (checksum == CodecUtil.retrieveChecksum(indexInput)) { diff --git a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java index 762aab51469d0..c0211e1257c8e 100644 --- a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java @@ -70,7 +70,9 @@ import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.repositories.IndexId; +import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; +import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.nio.channels.FileChannel; @@ -360,8 +362,9 @@ void recoverFromRepository(final IndexShard indexShard, Repository repository, A void recoverFromSnapshotAndRemoteStore( final IndexShard indexShard, Repository repository, - RemoteSegmentStoreDirectoryFactory directoryFactory, - ActionListener listener + RepositoriesService repositoriesService, + ActionListener listener, + ThreadPool threadPool ) { try { if (canRecover(indexShard)) { @@ -389,6 +392,10 @@ void recoverFromSnapshotAndRemoteStore( remoteStoreRepository = shallowCopyShardMetadata.getRemoteStoreRepository(); } + RemoteSegmentStoreDirectoryFactory directoryFactory = new RemoteSegmentStoreDirectoryFactory( + () -> repositoriesService, + threadPool + ); RemoteSegmentStoreDirectory sourceRemoteDirectory = (RemoteSegmentStoreDirectory) directoryFactory.newDirectory( remoteStoreRepository, indexUUID, diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java index 70a88f6159764..345583bbbd1be 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java @@ -336,10 +336,6 @@ public boolean copyFrom( return false; } - protected UnaryOperator getDownloadRateLimiter() { - return downloadRateLimiter; - } - private void uploadBlob( Directory from, String src, diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index c0b302c68c0ea..be1f2341236ab 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -24,8 +24,6 @@ import org.apache.lucene.store.IndexOutput; import org.apache.lucene.util.Version; import org.opensearch.common.UUIDs; -import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; -import org.opensearch.common.blobstore.stream.read.listener.ReadContextListener; import org.opensearch.common.collect.Tuple; import org.opensearch.common.io.VersionedCodecStreamWrapper; import org.opensearch.common.logging.Loggers; @@ -38,7 +36,6 @@ import org.opensearch.index.store.lockmanager.RemoteStoreLockManager; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler; -import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.threadpool.ThreadPool; @@ -46,7 +43,6 @@ import java.io.IOException; import java.io.InputStream; import java.nio.file.NoSuchFileException; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -93,8 +89,6 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement private final ThreadPool threadPool; - private final RecoverySettings recoverySettings; - /** * Keeps track of local segment filename to uploaded filename along with other attributes like checksum. * This map acts as a cache layer for uploaded segment filenames which helps avoid calling listAll() each time. @@ -127,15 +121,13 @@ public RemoteSegmentStoreDirectory( RemoteDirectory remoteMetadataDirectory, RemoteStoreLockManager mdLockManager, ThreadPool threadPool, - ShardId shardId, - RecoverySettings recoverySettings + ShardId shardId ) throws IOException { super(remoteDataDirectory); this.remoteDataDirectory = remoteDataDirectory; this.remoteMetadataDirectory = remoteMetadataDirectory; this.mdLockManager = mdLockManager; this.threadPool = threadPool; - this.recoverySettings = recoverySettings; this.logger = Loggers.getLogger(getClass(), shardId); init(); } @@ -473,70 +465,6 @@ public void copyFrom(Directory from, String src, IOContext context, ActionListen } } - /** - * Copies an existing {@code source} file from this directory to a non-existent file (also - * named {@code source}) in either {@code destinationDirectory} or {@code destinationPath}. - * If the blob container backing this directory supports multipart downloads, the {@code source} - * file will be downloaded (potentially in multiple concurrent parts) directly to - * {@code destinationPath}. This method will return immediately and {@code fileCompletionListener} - * will be notified upon completion. - *

- * If multipart downloads are not supported, then {@code source} file will be copied to a file named - * {@code source} in a single part to {@code destinationDirectory}. The download will happen on the - * calling thread and {@code fileCompletionListener} will be notified synchronously before this - * method returns. - * - * @param source The source file name - * @param destinationDirectory The destination directory (if multipart is not supported) - * @param destinationPath The destination path (if multipart is supported) - * @param fileTransferTracker Tracker used for file transfer stats - * @param fileCompletionListener The listener to notify of completion - */ - public void copyTo( - String source, - Directory destinationDirectory, - Path destinationPath, - DirectoryFileTransferTracker fileTransferTracker, - ActionListener fileCompletionListener - ) { - final String blobName = getExistingRemoteFilename(source); - if (destinationPath != null && remoteDataDirectory.getBlobContainer() instanceof AsyncMultiStreamBlobContainer) { - long length = 0L; - try { - length = fileLength(source); - } catch (IOException ex) { - logger.error("Unable to fetch segment length for stats tracking", ex); - } - final long fileLength = length; - final long startTime = System.currentTimeMillis(); - fileTransferTracker.addTransferredBytesStarted(fileLength); - final AsyncMultiStreamBlobContainer blobContainer = (AsyncMultiStreamBlobContainer) remoteDataDirectory.getBlobContainer(); - final Path destinationFilePath = destinationPath.resolve(source); - final ActionListener completionListener = ActionListener.wrap(response -> { - fileTransferTracker.addTransferredBytesSucceeded(fileLength, startTime); - fileCompletionListener.onResponse(response); - }, e -> { - fileTransferTracker.addTransferredBytesFailed(fileLength, startTime); - fileCompletionListener.onFailure(e); - }); - final ReadContextListener readContextListener = new ReadContextListener( - blobName, - destinationFilePath, - completionListener, - threadPool, - remoteDataDirectory.getDownloadRateLimiter(), - recoverySettings.getMaxConcurrentRemoteStoreStreams() - ); - blobContainer.readBlobAsync(blobName, readContextListener); - } else { - // Fallback to older mechanism of downloading the file - ActionListener.completeWith(fileCompletionListener, () -> { - destinationDirectory.copyFrom(this, source, source, IOContext.DEFAULT); - return source; - }); - } - } - /** * This acquires a lock on a given commit by creating a lock file in lock directory using {@code FileLockInfo} * diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java index cc55380894ecd..a5e89ec6a8327 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java @@ -15,7 +15,6 @@ import org.opensearch.index.shard.ShardPath; import org.opensearch.index.store.lockmanager.RemoteStoreLockManager; import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory; -import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.plugins.IndexStorePlugin; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; @@ -35,18 +34,12 @@ public class RemoteSegmentStoreDirectoryFactory implements IndexStorePlugin.Dire private static final String SEGMENTS = "segments"; private final Supplier repositoriesService; - private final RecoverySettings recoverySettings; private final ThreadPool threadPool; - public RemoteSegmentStoreDirectoryFactory( - Supplier repositoriesService, - ThreadPool threadPool, - RecoverySettings recoverySettings - ) { + public RemoteSegmentStoreDirectoryFactory(Supplier repositoriesService, ThreadPool threadPool) { this.repositoriesService = repositoriesService; this.threadPool = threadPool; - this.recoverySettings = recoverySettings; } @Override @@ -78,7 +71,7 @@ public Directory newDirectory(String repositoryName, String indexUUID, ShardId s String.valueOf(shardId.id()) ); - return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, mdLockManager, threadPool, shardId, recoverySettings); + return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, mdLockManager, threadPool, shardId); } catch (RepositoryMissingException e) { throw new IllegalArgumentException("Repository should be created before creating index with remote_store enabled setting", e); } diff --git a/server/src/main/java/org/opensearch/index/store/RemoteStoreFileDownloader.java b/server/src/main/java/org/opensearch/index/store/RemoteStoreFileDownloader.java new file mode 100644 index 0000000000000..4fc721f2b96b5 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/RemoteStoreFileDownloader.java @@ -0,0 +1,147 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.opensearch.action.support.GroupedActionListener; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.common.Nullable; +import org.opensearch.common.annotation.InternalApi; +import org.opensearch.common.logging.Loggers; +import org.opensearch.common.util.concurrent.UncategorizedExecutionException; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.Collection; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; + +/** + * Helper class to downloads files from a {@link RemoteSegmentStoreDirectory} + * instance to a local {@link Directory} instance in parallel depending on thread + * pool size and recovery settings. + */ +@InternalApi +public final class RemoteStoreFileDownloader { + private final Logger logger; + private final ThreadPool threadPool; + private final RecoverySettings recoverySettings; + + public RemoteStoreFileDownloader(ShardId shardId, ThreadPool threadPool, RecoverySettings recoverySettings) { + this.logger = Loggers.getLogger(RemoteStoreFileDownloader.class, shardId); + this.threadPool = threadPool; + this.recoverySettings = recoverySettings; + } + + /** + * Copies the given segments from the remote segment store to the given + * local directory. + * @param source The remote directory to copy segment files from + * @param destination The local directory to copy segment files to + * @param toDownloadSegments The list of segment files to download + */ + public void download(Directory source, Directory destination, Collection toDownloadSegments) throws IOException { + downloadInternal(source, destination, null, toDownloadSegments, () -> {}); + } + + /** + * Copies the given segments from the remote segment store to the given + * local directory, while also copying the segments _to_ another remote directory. + * @param source The remote directory to copy segment files from + * @param destination The local directory to copy segment files to + * @param secondDestination The second remote directory that segment files are + * copied to after being copied to the local directory + * @param toDownloadSegments The list of segment files to download + * @param onFileCompletion A generic runnable that is invoked after each file download. + * Must be thread safe as this may be invoked concurrently from + * different threads. + */ + public void download( + Directory source, + Directory destination, + Directory secondDestination, + Collection toDownloadSegments, + Runnable onFileCompletion + ) throws IOException { + downloadInternal(source, destination, secondDestination, toDownloadSegments, onFileCompletion); + } + + private void downloadInternal( + Directory source, + Directory destination, + @Nullable Directory secondDestination, + Collection toDownloadSegments, + Runnable onFileCompletion + ) throws IOException { + final Queue queue = new ConcurrentLinkedQueue<>(toDownloadSegments); + // Choose the minimum of: + // - number of files to download + // - max thread pool size + // - "indices.recovery.max_concurrent_remote_store_streams" setting + final int threads = Math.min( + toDownloadSegments.size(), + Math.min(threadPool.info(ThreadPool.Names.REMOTE_RECOVERY).getMax(), recoverySettings.getMaxConcurrentRemoteStoreStreams()) + ); + logger.trace("Starting download of {} files with {} threads", queue.size(), threads); + final PlainActionFuture> listener = PlainActionFuture.newFuture(); + final ActionListener allFilesListener = new GroupedActionListener<>(listener, threads); + for (int i = 0; i < threads; i++) { + copyOneFile(source, destination, secondDestination, queue, onFileCompletion, allFilesListener); + } + try { + listener.actionGet(); + } catch (UncategorizedExecutionException e) { + // Any IOException will be double-wrapped so dig it out and throw it + if (e.getCause() instanceof ExecutionException) { + if (e.getCause().getCause() instanceof IOException) { + throw (IOException) e.getCause().getCause(); + } + } + throw e; + } + } + + private void copyOneFile( + Directory source, + Directory destination, + @Nullable Directory secondDestination, + Queue queue, + Runnable onFileCompletion, + ActionListener listener + ) { + final String file = queue.poll(); + if (file == null) { + // Queue is empty, so notify listener we are done + listener.onResponse(null); + } else { + threadPool.executor(ThreadPool.Names.REMOTE_RECOVERY).submit(() -> { + logger.trace("Downloading file {}", file); + try { + destination.copyFrom(source, file, file, IOContext.DEFAULT); + onFileCompletion.run(); + if (secondDestination != null) { + secondDestination.copyFrom(destination, file, file, IOContext.DEFAULT); + } + } catch (Exception e) { + // Clear the queue to stop any future processing, report the failure, then return + queue.clear(); + listener.onFailure(e); + return; + } + copyOneFile(source, destination, secondDestination, queue, onFileCompletion, listener); + }); + } + } +} diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index a72142e65c5e8..e27a597007b62 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -144,6 +144,7 @@ import org.opensearch.indices.mapper.MapperRegistry; import org.opensearch.indices.recovery.PeerRecoveryTargetService; import org.opensearch.indices.recovery.RecoveryListener; +import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.common.ReplicationType; @@ -335,6 +336,7 @@ public class IndicesService extends AbstractLifecycleComponent private final CountDownLatch closeLatch = new CountDownLatch(1); private volatile boolean idFieldDataEnabled; private volatile boolean allowExpensiveQueries; + private final RecoverySettings recoverySettings; @Nullable private final OpenSearchThreadPoolExecutor danglingIndicesThreadPoolExecutor; @@ -380,7 +382,8 @@ public IndicesService( Supplier repositoriesServiceSupplier, FileCacheCleaner fileCacheCleaner, SearchRequestStats searchRequestStats, - @Nullable RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory + @Nullable RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory, + RecoverySettings recoverySettings ) { this.settings = settings; this.threadPool = threadPool; @@ -477,6 +480,7 @@ protected void closeInternal() { this.clusterRemoteTranslogBufferInterval = CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(clusterService.getSettings()); clusterService.getClusterSettings() .addSettingsUpdateConsumer(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING, this::setClusterRemoteTranslogBufferInterval); + this.recoverySettings = recoverySettings; } /** @@ -874,7 +878,8 @@ private synchronized IndexService createIndexService( remoteDirectoryFactory, translogFactorySupplier, this::getClusterDefaultRefreshInterval, - this::getClusterRemoteTranslogBufferInterval + this::getClusterRemoteTranslogBufferInterval, + this.recoverySettings ); } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java index ed9755bf824ea..44dfb2f4cb00a 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java @@ -85,11 +85,11 @@ public class RecoverySettings { ); /** - * Controls the maximum number of streams that can be started concurrently when downloading from the remote store. + * Controls the maximum number of streams that can be started concurrently per recovery when downloading from the remote store. */ public static final Setting INDICES_RECOVERY_MAX_CONCURRENT_REMOTE_STORE_STREAMS_SETTING = Setting.intSetting( "indices.recovery.max_concurrent_remote_store_streams", - 20, + 10, 1, Property.Dynamic, Property.NodeScope diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java index ddbcb86269aa9..d2000a56401f5 100644 --- a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java @@ -14,20 +14,16 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.util.Version; -import org.opensearch.action.support.PlainActionFuture; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.core.action.ActionListener; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardState; -import org.opensearch.index.shard.ShardPath; -import org.opensearch.index.store.DirectoryFileTransferTracker; import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -109,49 +105,31 @@ public void getSegmentFiles( logger.debug("Downloading segment files from remote store {}", filesToFetch); RemoteSegmentMetadata remoteSegmentMetadata = remoteDirectory.readLatestMetadataFile(); - List toDownloadSegments = new ArrayList<>(); Collection directoryFiles = List.of(indexShard.store().directory().listAll()); if (remoteSegmentMetadata != null) { try { indexShard.store().incRef(); indexShard.remoteStore().incRef(); final Directory storeDirectory = indexShard.store().directory(); - final ShardPath shardPath = indexShard.shardPath(); + final List toDownloadSegmentNames = new ArrayList<>(); for (StoreFileMetadata fileMetadata : filesToFetch) { String file = fileMetadata.name(); assert directoryFiles.contains(file) == false : "Local store already contains the file " + file; - toDownloadSegments.add(fileMetadata); + toDownloadSegmentNames.add(file); } - final DirectoryFileTransferTracker fileTransferTracker = indexShard.store().getDirectoryFileTransferTracker(); - downloadSegments(storeDirectory, remoteDirectory, toDownloadSegments, shardPath, fileTransferTracker, listener); - logger.debug("Downloaded segment files from remote store {}", toDownloadSegments); + indexShard.getFileDownloader().download(remoteDirectory, storeDirectory, toDownloadSegmentNames); + logger.debug("Downloaded segment files from remote store {}", filesToFetch); } finally { indexShard.store().decRef(); indexShard.remoteStore().decRef(); } } + listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); } catch (Exception e) { listener.onFailure(e); } } - private void downloadSegments( - Directory storeDirectory, - RemoteSegmentStoreDirectory remoteStoreDirectory, - List toDownloadSegments, - ShardPath shardPath, - DirectoryFileTransferTracker fileTransferTracker, - ActionListener completionListener - ) { - final Path indexPath = shardPath == null ? null : shardPath.resolveIndex(); - for (StoreFileMetadata storeFileMetadata : toDownloadSegments) { - final PlainActionFuture segmentListener = PlainActionFuture.newFuture(); - remoteStoreDirectory.copyTo(storeFileMetadata.name(), storeDirectory, indexPath, fileTransferTracker, segmentListener); - segmentListener.actionGet(); - } - completionListener.onResponse(new GetSegmentFilesResponse(toDownloadSegments)); - } - @Override public String getDescription() { return "RemoteStoreReplicationSource"; diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index f44c8c8bea7f1..5b3b064a47c66 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -771,8 +771,7 @@ protected Node( final IndexStorePlugin.DirectoryFactory remoteDirectoryFactory = new RemoteSegmentStoreDirectoryFactory( repositoriesServiceReference::get, - threadPool, - recoverySettings + threadPool ); final SearchRequestStats searchRequestStats = new SearchRequestStats(); @@ -803,7 +802,8 @@ protected Node( repositoriesServiceReference::get, fileCacheCleaner, searchRequestStats, - remoteStoreStatsTrackerFactory + remoteStoreStatsTrackerFactory, + recoverySettings ); final AliasValidator aliasValidator = new AliasValidator(); diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index bf85e19493203..8e6649c366eeb 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -1144,8 +1144,7 @@ private void executeStaleShardDelete( // see https://github.com/opensearch-project/OpenSearch/issues/8469 new RemoteSegmentStoreDirectoryFactory( remoteStoreLockManagerFactory.getRepositoriesService(), - threadPool, - recoverySettings + threadPool ).newDirectory( remoteStoreRepoForIndex, indexUUID, @@ -1615,8 +1614,7 @@ private void executeOneStaleIndexDelete( // see https://github.com/opensearch-project/OpenSearch/issues/8469 new RemoteSegmentStoreDirectoryFactory( remoteStoreLockManagerFactory.getRepositoriesService(), - threadPool, - recoverySettings + threadPool ).newDirectory( remoteStoreRepoForIndex, indexUUID, diff --git a/server/src/test/java/org/opensearch/index/IndexModuleTests.java b/server/src/test/java/org/opensearch/index/IndexModuleTests.java index bbd73bcf97aab..97bc822be7d51 100644 --- a/server/src/test/java/org/opensearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/opensearch/index/IndexModuleTests.java @@ -258,10 +258,11 @@ private IndexService newIndexService(IndexModule module) throws IOException { writableRegistry(), () -> false, null, - new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService, threadPool, DefaultRecoverySettings.INSTANCE), + new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService, threadPool), translogFactorySupplier, () -> IndexSettings.DEFAULT_REFRESH_INTERVAL, - () -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL + () -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL, + DefaultRecoverySettings.INSTANCE ); } diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 941f2f48e71af..5a13f57db2c87 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -33,7 +33,6 @@ import org.opensearch.index.store.RemoteSegmentStoreDirectory.MetadataFilenameUtils; import org.opensearch.index.store.Store; import org.opensearch.index.store.lockmanager.RemoteStoreLockManager; -import org.opensearch.indices.recovery.DefaultRecoverySettings; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.threadpool.ThreadPool; @@ -156,8 +155,7 @@ public void testRemoteDirectoryInitThrowsException() throws IOException { remoteMetadataDirectory, mock(RemoteStoreLockManager.class), mock(ThreadPool.class), - shardId, - DefaultRecoverySettings.INSTANCE + shardId ); FilterDirectory remoteStoreFilterDirectory = new RemoteStoreRefreshListenerTests.TestFilterDirectory( new RemoteStoreRefreshListenerTests.TestFilterDirectory(remoteSegmentStoreDirectory) diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java index 78c7fe64cebd9..cad5e47531cc6 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java @@ -20,7 +20,6 @@ import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.ShardPath; -import org.opensearch.indices.recovery.DefaultRecoverySettings; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.RepositoryMissingException; import org.opensearch.repositories.blobstore.BlobStoreRepository; @@ -58,11 +57,7 @@ public void setup() { repositoriesService = mock(RepositoriesService.class); threadPool = mock(ThreadPool.class); when(repositoriesServiceSupplier.get()).thenReturn(repositoriesService); - remoteSegmentStoreDirectoryFactory = new RemoteSegmentStoreDirectoryFactory( - repositoriesServiceSupplier, - threadPool, - DefaultRecoverySettings.INSTANCE - ); + remoteSegmentStoreDirectoryFactory = new RemoteSegmentStoreDirectoryFactory(repositoriesServiceSupplier, threadPool); } public void testNewDirectory() throws IOException { diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index 3e45699ef36eb..36cfd84ff960a 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -25,9 +25,7 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.UUIDs; import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; -import org.opensearch.common.blobstore.stream.read.ReadContext; import org.opensearch.common.blobstore.stream.write.WriteContext; -import org.opensearch.common.io.InputStreamContainer; import org.opensearch.common.io.VersionedCodecStreamWrapper; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lucene.store.ByteArrayIndexInput; @@ -42,7 +40,6 @@ import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler; -import org.opensearch.indices.recovery.DefaultRecoverySettings; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.threadpool.ThreadPool; import org.junit.After; @@ -51,18 +48,15 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.nio.file.NoSuchFileException; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import java.util.function.UnaryOperator; import org.mockito.Mockito; @@ -72,8 +66,6 @@ import static org.opensearch.test.RemoteStoreTestUtils.getDummyMetadata; import static org.hamcrest.CoreMatchers.is; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.contains; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; @@ -151,14 +143,12 @@ public void setup() throws IOException { remoteMetadataDirectory, mdLockManager, threadPool, - indexShard.shardId(), - DefaultRecoverySettings.INSTANCE + indexShard.shardId() ); try (Store store = indexShard.store()) { segmentInfos = store.readLastCommittedSegmentsInfo(); } - when(remoteDataDirectory.getDownloadRateLimiter()).thenReturn(UnaryOperator.identity()); when(threadPool.executor(ThreadPool.Names.REMOTE_PURGE)).thenReturn(executorService); when(threadPool.executor(ThreadPool.Names.REMOTE_RECOVERY)).thenReturn(executorService); } @@ -568,118 +558,6 @@ public void onFailure(Exception e) {} storeDirectory.close(); } - public void testCopyFilesToMultipart() throws Exception { - String filename = "_0.cfe"; - populateMetadata(); - remoteSegmentStoreDirectory.init(); - - Directory storeDirectory = mock(Directory.class); - AsyncMultiStreamBlobContainer blobContainer = mock(AsyncMultiStreamBlobContainer.class); - when(remoteDataDirectory.getBlobContainer()).thenReturn(blobContainer); - - Mockito.doAnswer(invocation -> { - ActionListener completionListener = invocation.getArgument(1); - final CompletableFuture future = new CompletableFuture<>(); - future.complete(new InputStreamContainer(new ByteArrayInputStream(new byte[] { 42 }), 0, 1)); - completionListener.onResponse(new ReadContext(1, List.of(() -> future), "")); - return null; - }).when(blobContainer).readBlobAsync(any(), any()); - - CountDownLatch downloadLatch = new CountDownLatch(1); - ActionListener completionListener = new ActionListener<>() { - @Override - public void onResponse(String unused) { - downloadLatch.countDown(); - } - - @Override - public void onFailure(Exception e) {} - }; - Path path = createTempDir(); - DirectoryFileTransferTracker directoryFileTransferTracker = new DirectoryFileTransferTracker(); - long sourceFileLengthInBytes = remoteSegmentStoreDirectory.fileLength(filename); - remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, directoryFileTransferTracker, completionListener); - assertTrue(downloadLatch.await(5000, TimeUnit.SECONDS)); - verify(blobContainer, times(1)).readBlobAsync(contains(filename), any()); - verify(storeDirectory, times(0)).copyFrom(any(), any(), any(), any()); - - // Verify stats are updated to DirectoryFileTransferTracker - assertEquals(sourceFileLengthInBytes, directoryFileTransferTracker.getTransferredBytesSucceeded()); - } - - public void testCopyFilesTo() throws Exception { - String filename = "_0.cfe"; - populateMetadata(); - remoteSegmentStoreDirectory.init(); - - Directory storeDirectory = mock(Directory.class); - CountDownLatch downloadLatch = new CountDownLatch(1); - ActionListener completionListener = new ActionListener<>() { - @Override - public void onResponse(String unused) { - downloadLatch.countDown(); - } - - @Override - public void onFailure(Exception e) {} - }; - Path path = createTempDir(); - remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, new DirectoryFileTransferTracker(), completionListener); - assertTrue(downloadLatch.await(5000, TimeUnit.MILLISECONDS)); - verify(storeDirectory, times(1)).copyFrom(any(), eq(filename), eq(filename), eq(IOContext.DEFAULT)); - } - - public void testCopyFilesToEmptyPath() throws Exception { - String filename = "_0.cfe"; - populateMetadata(); - remoteSegmentStoreDirectory.init(); - - Directory storeDirectory = mock(Directory.class); - AsyncMultiStreamBlobContainer blobContainer = mock(AsyncMultiStreamBlobContainer.class); - when(remoteDataDirectory.getBlobContainer()).thenReturn(blobContainer); - - CountDownLatch downloadLatch = new CountDownLatch(1); - ActionListener completionListener = new ActionListener<>() { - @Override - public void onResponse(String unused) { - downloadLatch.countDown(); - } - - @Override - public void onFailure(Exception e) {} - }; - remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, null, new DirectoryFileTransferTracker(), completionListener); - assertTrue(downloadLatch.await(5000, TimeUnit.MILLISECONDS)); - verify(storeDirectory, times(1)).copyFrom(any(), eq(filename), eq(filename), eq(IOContext.DEFAULT)); - } - - public void testCopyFilesToException() throws Exception { - String filename = "_0.cfe"; - populateMetadata(); - remoteSegmentStoreDirectory.init(); - - Directory storeDirectory = mock(Directory.class); - Mockito.doThrow(new IOException()) - .when(storeDirectory) - .copyFrom(any(Directory.class), anyString(), anyString(), any(IOContext.class)); - CountDownLatch downloadLatch = new CountDownLatch(1); - ActionListener completionListener = new ActionListener<>() { - @Override - public void onResponse(String unused) { - - } - - @Override - public void onFailure(Exception e) { - downloadLatch.countDown(); - } - }; - Path path = createTempDir(); - remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, new DirectoryFileTransferTracker(), completionListener); - assertTrue(downloadLatch.await(5000, TimeUnit.MILLISECONDS)); - verify(storeDirectory, times(1)).copyFrom(any(), eq(filename), eq(filename), eq(IOContext.DEFAULT)); - } - public void testCopyFilesFromMultipartIOException() throws Exception { String filename = "_100.si"; AsyncMultiStreamBlobContainer blobContainer = mock(AsyncMultiStreamBlobContainer.class); @@ -689,8 +567,7 @@ public void testCopyFilesFromMultipartIOException() throws Exception { remoteMetadataDirectory, mdLockManager, threadPool, - indexShard.shardId(), - DefaultRecoverySettings.INSTANCE + indexShard.shardId() ); populateMetadata(); diff --git a/server/src/test/java/org/opensearch/index/store/RemoteStoreFileDownloaderTests.java b/server/src/test/java/org/opensearch/index/store/RemoteStoreFileDownloaderTests.java new file mode 100644 index 0000000000000..588d9e8bb13a2 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/RemoteStoreFileDownloaderTests.java @@ -0,0 +1,119 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.NIOFSDirectory; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.file.NoSuchFileException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +public class RemoteStoreFileDownloaderTests extends OpenSearchTestCase { + + private ThreadPool threadPool; + private Directory source; + private Directory destination; + private Directory secondDestination; + private RemoteStoreFileDownloader fileDownloader; + private Map files = new HashMap<>(); + + @Before + public void setup() throws IOException { + final int streamLimit = randomIntBetween(1, 20); + final RecoverySettings recoverySettings = new RecoverySettings( + Settings.builder().put("indices.recovery.max_concurrent_remote_store_streams", streamLimit).build(), + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ); + threadPool = new TestThreadPool(getTestName()); + source = new NIOFSDirectory(createTempDir()); + destination = new NIOFSDirectory(createTempDir()); + secondDestination = new NIOFSDirectory(createTempDir()); + for (int i = 0; i < 10; i++) { + final String filename = "file_" + i; + final int content = randomInt(); + try (IndexOutput output = source.createOutput(filename, IOContext.DEFAULT)) { + output.writeInt(content); + } + files.put(filename, content); + } + fileDownloader = new RemoteStoreFileDownloader( + ShardId.fromString("[RemoteStoreFileDownloaderTests][0]"), + threadPool, + recoverySettings + ); + } + + @After + public void stopThreadPool() throws Exception { + threadPool.shutdown(); + assertTrue(threadPool.awaitTermination(5, TimeUnit.SECONDS)); + } + + public void testDownload() throws IOException { + fileDownloader.download(source, destination, files.keySet()); + assertContent(files, destination); + } + + public void testDownloadWithSecondDestination() throws IOException { + fileDownloader.download(source, destination, secondDestination, files.keySet(), () -> {}); + assertContent(files, destination); + assertContent(files, secondDestination); + } + + public void testDownloadWithFileCompletionHandler() throws IOException { + final AtomicInteger counter = new AtomicInteger(0); + fileDownloader.download(source, destination, null, files.keySet(), counter::incrementAndGet); + assertContent(files, destination); + assertEquals(files.size(), counter.get()); + } + + public void testDownloadNonExistentFile() { + assertThrows(NoSuchFileException.class, () -> fileDownloader.download(source, destination, Set.of("not real"))); + } + + public void testDownloadExtraNonExistentFile() { + List filesWithExtra = new ArrayList<>(files.keySet()); + filesWithExtra.add("not real"); + assertThrows(NoSuchFileException.class, () -> fileDownloader.download(source, destination, filesWithExtra)); + } + + private static void assertContent(Map expected, Directory destination) throws IOException { + // Note that Lucene will randomly write extra files (see org.apache.lucene.tests.mockfile.ExtraFS) + // so we just need to check that all the expected files are present but not that _only_ the expected + // files are present + final Set actualFiles = Set.of(destination.listAll()); + for (String file : expected.keySet()) { + assertTrue(actualFiles.contains(file)); + try (IndexInput input = destination.openInput(file, IOContext.DEFAULT)) { + assertEquals(expected.get(file), Integer.valueOf(input.readInt())); + assertThrows(EOFException.class, input::readByte); + } + } + } +} diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 80731b378f369..97c5d23831965 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -2067,11 +2067,12 @@ public void onFailure(final Exception e) { emptyMap(), null, emptyMap(), - new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService, threadPool, DefaultRecoverySettings.INSTANCE), + new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService, threadPool), repositoriesServiceReference::get, fileCacheCleaner, null, - new RemoteStoreStatsTrackerFactory(clusterService, settings) + new RemoteStoreStatsTrackerFactory(clusterService, settings), + DefaultRecoverySettings.INSTANCE ); final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); snapshotShardsService = new SnapshotShardsService( diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 186c1c7e78f6b..ffa37817e8a03 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -702,7 +702,7 @@ protected IndexShard newShard( remoteStoreStatsTrackerFactory, () -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL, "dummy-node", - null + DefaultRecoverySettings.INSTANCE ); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); if (remoteStoreStatsTrackerFactory != null) { @@ -789,14 +789,7 @@ protected RemoteSegmentStoreDirectory createRemoteSegmentStoreDirectory(ShardId RemoteStoreLockManager remoteStoreLockManager = new RemoteStoreMetadataLockManager( new RemoteBufferedOutputDirectory(getBlobContainer(remoteShardPath.resolveIndex())) ); - return new RemoteSegmentStoreDirectory( - dataDirectory, - metadataDirectory, - remoteStoreLockManager, - threadPool, - shardId, - DefaultRecoverySettings.INSTANCE - ); + return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, remoteStoreLockManager, threadPool, shardId); } private RemoteDirectory newRemoteDirectory(Path f) throws IOException {