Skip to content

Commit

Permalink
Introduce a unified download manager for remote store operations
Browse files Browse the repository at this point in the history
Signed-off-by: Kunal Kotwani <[email protected]>
  • Loading branch information
kotwanikunal committed Feb 1, 2024
1 parent 16c5257 commit eceb712
Show file tree
Hide file tree
Showing 20 changed files with 527 additions and 177 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,7 @@ public static final IndexShard newIndexShard(
null,
() -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL,
nodeId,
null,
null
);
}
Expand Down
7 changes: 5 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.opensearch.index.engine.EngineConfigFactory;
import org.opensearch.index.engine.EngineFactory;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.remote.transfer.DownloadManager;
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.index.shard.IndexingOperationListener;
import org.opensearch.index.shard.SearchOperationListener;
Expand Down Expand Up @@ -605,7 +606,8 @@ public IndexService newIndexService(
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
Supplier<TimeValue> clusterRemoteTranslogBufferIntervalSupplier,
RecoverySettings recoverySettings
RecoverySettings recoverySettings,
DownloadManager downloadManager
) throws IOException {
final IndexEventListener eventListener = freeze();
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory = indexReaderWrapper
Expand Down Expand Up @@ -664,7 +666,8 @@ public IndexService newIndexService(
translogFactorySupplier,
clusterDefaultRefreshIntervalSupplier,
clusterRemoteTranslogBufferIntervalSupplier,
recoverySettings
recoverySettings,
downloadManager
);
success = true;
return indexService;
Expand Down
9 changes: 7 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.opensearch.index.query.QueryShardContext;
import org.opensearch.index.query.SearchIndexNameMatcher;
import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory;
import org.opensearch.index.remote.transfer.DownloadManager;
import org.opensearch.index.seqno.RetentionLeaseSyncer;
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.index.shard.IndexShard;
Expand Down Expand Up @@ -182,6 +183,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier;
private final Supplier<TimeValue> clusterRemoteTranslogBufferIntervalSupplier;
private final RecoverySettings recoverySettings;
private final DownloadManager downloadManager;

public IndexService(
IndexSettings indexSettings,
Expand Down Expand Up @@ -217,7 +219,8 @@ public IndexService(
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
Supplier<TimeValue> clusterRemoteTranslogBufferIntervalSupplier,
RecoverySettings recoverySettings
RecoverySettings recoverySettings,
DownloadManager downloadManager
) {
super(indexSettings);
this.allowExpensiveQueries = allowExpensiveQueries;
Expand Down Expand Up @@ -295,6 +298,7 @@ public IndexService(
this.translogFactorySupplier = translogFactorySupplier;
this.clusterRemoteTranslogBufferIntervalSupplier = clusterRemoteTranslogBufferIntervalSupplier;
this.recoverySettings = recoverySettings;
this.downloadManager = downloadManager;
updateFsyncTaskIfNecessary();
}

Expand Down Expand Up @@ -528,7 +532,8 @@ public synchronized IndexShard createShard(
remoteStoreStatsTrackerFactory,
clusterRemoteTranslogBufferIntervalSupplier,
nodeEnv.nodeId(),
recoverySettings
recoverySettings,
downloadManager
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,47 +6,105 @@
* compatible open source license.
*/

package org.opensearch.index.store;
package org.opensearch.index.remote.transfer;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.opensearch.action.support.GroupedActionListener;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.common.Nullable;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.translog.transfer.TranslogTransferManager;
import org.opensearch.index.translog.transfer.TranslogTransferMetadata;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;

/**
* Helper class to downloads files from a {@link RemoteSegmentStoreDirectory}
* instance to a local {@link Directory} instance in parallel depending on thread
* pool size and recovery settings.
* Manages file downloads from the remote store and from a {@link RemoteSegmentStoreDirectory}
* instance to a local {@link Directory} instance.
*
* @opensearch.api
*/
@PublicApi(since = "2.11.0")
public final class RemoteStoreFileDownloader {
private final Logger logger;
@PublicApi(since = "2.13.0")
public class DownloadManager {

private final ThreadPool threadPool;
private final RecoverySettings recoverySettings;
private static final Logger logger = LogManager.getLogger(DownloadManager.class);

public RemoteStoreFileDownloader(ShardId shardId, ThreadPool threadPool, RecoverySettings recoverySettings) {
this.logger = Loggers.getLogger(RemoteStoreFileDownloader.class, shardId);
public DownloadManager(ThreadPool threadPool, RecoverySettings recoverySettings) {
this.threadPool = threadPool;
this.recoverySettings = recoverySettings;
}

public DownloadManager(DownloadManager other) {
this.threadPool = other.threadPool;
this.recoverySettings = other.recoverySettings;
}

/**
* Downloads the {@link TranslogTransferMetadata} file from the blob container stored as a part of remote store
* metadata
* @param blobContainer location where the metadata file is stored
* @param fileName name of the metadata file
* @return deserialized metadata object
* @throws IOException exception when reading from the remote file
*/
public TranslogTransferMetadata fetchTranslogTransferMetadata(BlobContainer blobContainer, String fileName) throws IOException {
byte[] data = downloadFileFromRemoteStoreToMemory(blobContainer, fileName);
IndexInput indexInput = new ByteArrayIndexInput("metadata file", data);
return TranslogTransferManager.METADATA_STREAM_WRAPPER.readStream(indexInput);
}

/**
* Downloads a file stored in the blob container into memory
* @param blobContainer location of the blob
* @param fileName name of the blob to be downloaded
* @return in-memory byte representation of the blob
* @throws IOException exception when reading from the repository file
*/
public byte[] downloadFileFromRemoteStoreToMemory(BlobContainer blobContainer, String fileName) throws IOException {
// TODO: Add a circuit breaker check before putting all the data in heap
try (InputStream inputStream = blobContainer.readBlob(fileName)) {
return inputStream.readAllBytes();
}
}

/**
* Downloads a file stored within the blob container onto local disk specified using {@link Path} based location.
* @param blobContainer location of the blob
* @param fileName name of the blob to be downloaded
* @param location location on disk where the blob will be downloaded
* @throws IOException exception when reading from the repository file or writing to disk
*/
public void downloadFileFromRemoteStore(BlobContainer blobContainer, String fileName, Path location) throws IOException {
Path filePath = location.resolve(fileName);
// Here, we always override the existing file if present.
// We need to change this logic when we introduce incremental download
Files.deleteIfExists(filePath);

try (InputStream inputStream = blobContainer.readBlob(fileName)) {
Files.copy(inputStream, filePath);
}
}

/**
* Copies the given segments from the remote segment store to the given
* local directory.
Expand All @@ -55,14 +113,14 @@ public RemoteStoreFileDownloader(ShardId shardId, ThreadPool threadPool, Recover
* @param toDownloadSegments The list of segment files to download
* @param listener Callback listener to be notified upon completion
*/
public void downloadAsync(
public void copySegmentsFromRemoteStoreAsync(
CancellableThreads cancellableThreads,
Directory source,
Directory destination,
Collection<String> toDownloadSegments,
ActionListener<Void> listener
) {
downloadInternal(cancellableThreads, source, destination, null, toDownloadSegments, () -> {}, listener);
copySegmentsInternal(cancellableThreads, source, destination, null, toDownloadSegments, () -> {}, listener);
}

/**
Expand All @@ -77,7 +135,7 @@ public void downloadAsync(
* Must be thread safe as this may be invoked concurrently from
* different threads.
*/
public void download(
public void copySegmentsFromRemoteStore(
Directory source,
Directory destination,
Directory secondDestination,
Expand All @@ -86,7 +144,7 @@ public void download(
) throws InterruptedException, IOException {
final CancellableThreads cancellableThreads = new CancellableThreads();
final PlainActionFuture<Void> listener = PlainActionFuture.newFuture();
downloadInternal(cancellableThreads, source, destination, secondDestination, toDownloadSegments, onFileCompletion, listener);
copySegmentsInternal(cancellableThreads, source, destination, secondDestination, toDownloadSegments, onFileCompletion, listener);
try {
listener.get();
} catch (ExecutionException e) {
Expand All @@ -105,7 +163,7 @@ public void download(
}
}

private void downloadInternal(
private void copySegmentsInternal(
CancellableThreads cancellableThreads,
Directory source,
Directory destination,
Expand All @@ -126,11 +184,19 @@ private void downloadInternal(
logger.trace("Starting download of {} files with {} threads", queue.size(), threads);
final ActionListener<Void> allFilesListener = new GroupedActionListener<>(ActionListener.map(listener, r -> null), threads);
for (int i = 0; i < threads; i++) {
copyOneFile(cancellableThreads, source, destination, secondDestination, queue, onFileCompletion, allFilesListener);
copyFileFromRemoteStoreDirectory(
cancellableThreads,
source,
destination,
secondDestination,
queue,
onFileCompletion,
allFilesListener
);
}
}

private void copyOneFile(
private void copyFileFromRemoteStoreDirectory(
CancellableThreads cancellableThreads,
Directory source,
Directory destination,
Expand Down Expand Up @@ -160,7 +226,15 @@ private void copyOneFile(
listener.onFailure(e);
return;
}
copyOneFile(cancellableThreads, source, destination, secondDestination, queue, onFileCompletion, listener);
copyFileFromRemoteStoreDirectory(
cancellableThreads,
source,
destination,
secondDestination,
queue,
onFileCompletion,
listener
);
});
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.remote.transfer;

import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.index.remote.RemoteTranslogTransferTracker;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;

/**
* A delegating wrapper class of {@link DownloadManager} for keeping track of download stats and metric reporting.
*/
public class StatsTrackingDownloadManager extends DownloadManager {

private final RemoteTranslogTransferTracker remoteTranslogTransferTracker;

public StatsTrackingDownloadManager(
ThreadPool threadPool,
RecoverySettings recoverySettings,
RemoteTranslogTransferTracker remoteTranslogTransferTracker
) {
super(threadPool, recoverySettings);
this.remoteTranslogTransferTracker = remoteTranslogTransferTracker;
}

public StatsTrackingDownloadManager(DownloadManager downloadManager, RemoteTranslogTransferTracker remoteTranslogTransferTracker) {
super(downloadManager);
this.remoteTranslogTransferTracker = remoteTranslogTransferTracker;
}

/**
* Tracks the download time and size when fetching an object from the container to memory
* @param blobContainer location of the blob
* @param fileName name of the blob to be downloaded
* @return in-memory byte representation of the file
* @throws IOException exception on reading blob from container
*/
@Override
public byte[] downloadFileFromRemoteStoreToMemory(BlobContainer blobContainer, String fileName) throws IOException {
// TODO: Rewrite stats logic to remove hard-wiring to translog transfer tracker
// (maybe make RemoteTranslogTransferTracker methods interface dependent?)
long downloadStartTime = System.nanoTime();
byte[] data = null;
try {
data = super.downloadFileFromRemoteStoreToMemory(blobContainer, fileName);
} finally {
remoteTranslogTransferTracker.addDownloadTimeInMillis((System.nanoTime() - downloadStartTime) / 1_000_000L);
if (data != null) {
remoteTranslogTransferTracker.addDownloadBytesSucceeded(data.length);
}
}
return data;
}

/**
* Tracks the download time and size when fetching an object from the container to a local file
* @param blobContainer location of the blob
* @param fileName name of the blob to be downloaded
* @param location location on disk where the blob will be downloaded
* @throws IOException exception on reading blob from container or writing to disk
*/
@Override
public void downloadFileFromRemoteStore(BlobContainer blobContainer, String fileName, Path location) throws IOException {
// TODO: Rewrite stats logic to remove hard-wiring to translog transfer tracker
boolean downloadStatus = false;
long bytesToRead = 0, downloadStartTime = System.nanoTime();
try {
super.downloadFileFromRemoteStore(blobContainer, fileName, location);
bytesToRead = Files.size(location.resolve(fileName));
downloadStatus = true;
} finally {
remoteTranslogTransferTracker.addDownloadTimeInMillis((System.nanoTime() - downloadStartTime) / 1_000_000L);
if (downloadStatus) {
remoteTranslogTransferTracker.addDownloadBytesSucceeded(bytesToRead);
}
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* Package containing classes to manage downloads from the repository for remote store operations.
*/
package org.opensearch.index.remote.transfer;
Loading

0 comments on commit eceb712

Please sign in to comment.