Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.11] [Remote Store] Fix stats reporting for multistream downloads. #10424

Merged
merged 1 commit into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.PrimaryReplicaSyncer.ResyncTask;
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.DirectoryFileTransferTracker;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
import org.opensearch.index.store.Store;
Expand Down Expand Up @@ -4938,9 +4939,10 @@ private void downloadSegments(
final Runnable onFileSync
) throws IOException {
final Path indexPath = store.shardPath() == null ? null : store.shardPath().resolveIndex();
final DirectoryFileTransferTracker fileTransferTracker = store.getDirectoryFileTransferTracker();
for (String segment : toDownloadSegments) {
final PlainActionFuture<String> segmentListener = PlainActionFuture.newFuture();
sourceRemoteDirectory.copyTo(segment, storeDirectory, indexPath, segmentListener);
sourceRemoteDirectory.copyTo(segment, storeDirectory, indexPath, fileTransferTracker, segmentListener);
segmentListener.actionGet();
onFileSync.run();
if (targetRemoteDirectory != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,30 +487,51 @@ public void copyFrom(Directory from, String src, IOContext context, ActionListen
* @param source The source file name
* @param destinationDirectory The destination directory (if multipart is not supported)
* @param destinationPath The destination path (if multipart is supported)
* @param fileTransferTracker Tracker used for file transfer stats
* @param fileCompletionListener The listener to notify of completion
*/
public void copyTo(String source, Directory destinationDirectory, Path destinationPath, ActionListener<String> fileCompletionListener) {
public void copyTo(
String source,
Directory destinationDirectory,
Path destinationPath,
DirectoryFileTransferTracker fileTransferTracker,
ActionListener<String> fileCompletionListener
) {
final String blobName = getExistingRemoteFilename(source);
if (destinationPath != null && remoteDataDirectory.getBlobContainer() instanceof AsyncMultiStreamBlobContainer) {
long length = 0L;
try {
length = fileLength(source);
} catch (IOException ex) {
logger.error("Unable to fetch segment length for stats tracking", ex);
}
final long fileLength = length;
final long startTime = System.currentTimeMillis();
fileTransferTracker.addTransferredBytesStarted(fileLength);
final AsyncMultiStreamBlobContainer blobContainer = (AsyncMultiStreamBlobContainer) remoteDataDirectory.getBlobContainer();
final Path destinationFilePath = destinationPath.resolve(source);
final ActionListener<String> completionListener = ActionListener.wrap(response -> {
fileTransferTracker.addTransferredBytesSucceeded(fileLength, startTime);
fileCompletionListener.onResponse(response);
}, e -> {
fileTransferTracker.addTransferredBytesFailed(fileLength, startTime);
fileCompletionListener.onFailure(e);
});
final ReadContextListener readContextListener = new ReadContextListener(
blobName,
destinationFilePath,
fileCompletionListener,
completionListener,
threadPool,
remoteDataDirectory.getDownloadRateLimiter(),
recoverySettings.getMaxConcurrentRemoteStoreStreams()
);
blobContainer.readBlobAsync(blobName, readContextListener);
} else {
// Fallback to older mechanism of downloading the file
try {
ActionListener.completeWith(fileCompletionListener, () -> {
destinationDirectory.copyFrom(this, source, source, IOContext.DEFAULT);
fileCompletionListener.onResponse(source);
} catch (IOException e) {
fileCompletionListener.onFailure(e);
}
return source;
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardState;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.DirectoryFileTransferTracker;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
Expand Down Expand Up @@ -121,7 +122,8 @@ public void getSegmentFiles(
assert directoryFiles.contains(file) == false : "Local store already contains the file " + file;
toDownloadSegments.add(fileMetadata);
}
downloadSegments(storeDirectory, remoteDirectory, toDownloadSegments, shardPath, listener);
final DirectoryFileTransferTracker fileTransferTracker = indexShard.store().getDirectoryFileTransferTracker();
downloadSegments(storeDirectory, remoteDirectory, toDownloadSegments, shardPath, fileTransferTracker, listener);
logger.debug("Downloaded segment files from remote store {}", toDownloadSegments);
} finally {
indexShard.store().decRef();
Expand All @@ -138,12 +140,13 @@ private void downloadSegments(
RemoteSegmentStoreDirectory remoteStoreDirectory,
List<StoreFileMetadata> toDownloadSegments,
ShardPath shardPath,
DirectoryFileTransferTracker fileTransferTracker,
ActionListener<GetSegmentFilesResponse> completionListener
) {
final Path indexPath = shardPath == null ? null : shardPath.resolveIndex();
for (StoreFileMetadata storeFileMetadata : toDownloadSegments) {
final PlainActionFuture<String> segmentListener = PlainActionFuture.newFuture();
remoteStoreDirectory.copyTo(storeFileMetadata.name(), storeDirectory, indexPath, segmentListener);
remoteStoreDirectory.copyTo(storeFileMetadata.name(), storeDirectory, indexPath, fileTransferTracker, segmentListener);
segmentListener.actionGet();
}
completionListener.onResponse(new GetSegmentFilesResponse(toDownloadSegments));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,10 +596,15 @@ public void onResponse(String unused) {
public void onFailure(Exception e) {}
};
Path path = createTempDir();
remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, completionListener);
DirectoryFileTransferTracker directoryFileTransferTracker = new DirectoryFileTransferTracker();
long sourceFileLengthInBytes = remoteSegmentStoreDirectory.fileLength(filename);
remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, directoryFileTransferTracker, completionListener);
assertTrue(downloadLatch.await(5000, TimeUnit.SECONDS));
verify(blobContainer, times(1)).readBlobAsync(contains(filename), any());
verify(storeDirectory, times(0)).copyFrom(any(), any(), any(), any());

// Verify stats are updated to DirectoryFileTransferTracker
assertEquals(sourceFileLengthInBytes, directoryFileTransferTracker.getTransferredBytesSucceeded());
}

public void testCopyFilesTo() throws Exception {
Expand All @@ -619,7 +624,7 @@ public void onResponse(String unused) {
public void onFailure(Exception e) {}
};
Path path = createTempDir();
remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, completionListener);
remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, new DirectoryFileTransferTracker(), completionListener);
assertTrue(downloadLatch.await(5000, TimeUnit.MILLISECONDS));
verify(storeDirectory, times(1)).copyFrom(any(), eq(filename), eq(filename), eq(IOContext.DEFAULT));
}
Expand All @@ -643,7 +648,7 @@ public void onResponse(String unused) {
@Override
public void onFailure(Exception e) {}
};
remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, null, completionListener);
remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, null, new DirectoryFileTransferTracker(), completionListener);
assertTrue(downloadLatch.await(5000, TimeUnit.MILLISECONDS));
verify(storeDirectory, times(1)).copyFrom(any(), eq(filename), eq(filename), eq(IOContext.DEFAULT));
}
Expand All @@ -670,7 +675,7 @@ public void onFailure(Exception e) {
}
};
Path path = createTempDir();
remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, completionListener);
remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, new DirectoryFileTransferTracker(), completionListener);
assertTrue(downloadLatch.await(5000, TimeUnit.MILLISECONDS));
verify(storeDirectory, times(1)).copyFrom(any(), eq(filename), eq(filename), eq(IOContext.DEFAULT));
}
Expand Down