From 31ab384fa89d02c6481d5c4212e8d9b87f436692 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Fri, 9 Feb 2024 18:25:57 +0000 Subject: [PATCH 1/8] implement logic of fetching blocks from multiple chunks of snapshot file. Signed-off-by: Rishikesh1159 --- .../snapshots/SearchableSnapshotIT.java | 2 +- .../file/OnDemandBlockSnapshotIndexInput.java | 11 ++-- .../store/remote/utils/BlobFetchRequest.java | 15 ++++++ .../store/remote/utils/TransferManager.java | 50 ++++++++++++++++--- 4 files changed, 61 insertions(+), 17 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java index 9a92ddc81852a..58d56705d7e9c 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java @@ -78,7 +78,7 @@ protected Settings.Builder randomRepositorySettings() { private Settings.Builder chunkedRepositorySettings() { final Settings.Builder settings = Settings.builder(); settings.put("location", randomRepoPath()).put("compress", randomBoolean()); - settings.put("chunk_size", 2 << 23, ByteSizeUnit.BYTES); + settings.put("chunk_size", randomBoolean() ? (2 << 23) : 1000, ByteSizeUnit.BYTES); return settings; } diff --git a/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java b/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java index 7166e9aa482e3..f7c6bd2c05857 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java +++ b/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java @@ -139,18 +139,13 @@ protected IndexInput fetchBlock(int blockId) throws IOException { // If the snapshot file is chunked, we must account for this by // choosing the appropriate file part and updating the position // accordingly. - final int part = (int) (blockStart / partSize); - final long partStart = part * partSize; - - final long position = blockStart - partStart; - final long length = blockEnd - blockStart; BlobFetchRequest blobFetchRequest = BlobFetchRequest.builder() - .position(position) - .length(length) - .blobName(fileInfo.partName(part)) + .position(blockStart) + .length(blockEnd) .directory(directory) .fileName(blockFileName) + .fileInfo(fileInfo) .build(); return transferManager.fetchBlob(blobFetchRequest); } diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/BlobFetchRequest.java b/server/src/main/java/org/opensearch/index/store/remote/utils/BlobFetchRequest.java index d0508e9c6f4c7..2e45e55d795c0 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/BlobFetchRequest.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/BlobFetchRequest.java @@ -10,6 +10,7 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; +import org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; import java.nio.file.Path; @@ -26,6 +27,8 @@ public class BlobFetchRequest { private final String blobName; + private final BlobStoreIndexShardSnapshot.FileInfo fileInfo; + private final Path filePath; private final Directory directory; @@ -36,6 +39,7 @@ private BlobFetchRequest(Builder builder) { this.position = builder.position; this.length = builder.length; this.blobName = builder.blobName; + this.fileInfo = builder.fileInfo; this.fileName = builder.fileName; this.filePath = builder.directory.getDirectory().resolve(fileName); this.directory = builder.directory; @@ -53,6 +57,10 @@ public String getBlobName() { return blobName; } + public BlobStoreIndexShardSnapshot.FileInfo getFileInfo() { + return fileInfo; + } + public Path getFilePath() { return filePath; } @@ -96,6 +104,8 @@ public static final class Builder { private long position; private long length; private String blobName; + + private BlobStoreIndexShardSnapshot.FileInfo fileInfo; private FSDirectory directory; private String fileName; @@ -119,6 +129,11 @@ public Builder blobName(String blobName) { return this; } + public Builder fileInfo(BlobStoreIndexShardSnapshot.FileInfo fileInfo) { + this.fileInfo = fileInfo; + return this; + } + public Builder directory(FSDirectory directory) { this.directory = directory; return this; diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java index c9469283ee921..e061e0c2a4e8e 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java @@ -21,11 +21,15 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.io.SequenceInputStream; import java.io.UncheckedIOException; import java.nio.file.Files; import java.nio.file.Path; import java.security.AccessController; import java.security.PrivilegedAction; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.atomic.AtomicBoolean; @@ -83,17 +87,47 @@ private static FileCachedIndexInput createIndexInput(FileCache fileCache, BlobCo return AccessController.doPrivileged((PrivilegedAction) () -> { try { if (Files.exists(request.getFilePath()) == false) { - try ( + List inputStreamList = new ArrayList<>(); + long partSize = request.getFileInfo().partSize().getBytes(); + long blockStart = request.getPosition(); + long blockEnd = request.getLength(); + int partNum = (int) (blockStart / partSize); + long pos = blockStart; + long diff = (blockEnd - pos); + while (diff > 0) { + long partStart = pos % partSize; + long partEnd; + if ((partStart + diff) > partSize) { + partEnd = partSize; + } else { + partEnd = (partStart + diff); + } + long fetchBytes = partEnd - partStart; InputStream snapshotFileInputStream = blobContainer.readBlob( - request.getBlobName(), - request.getPosition(), - request.getLength() + request.getFileInfo().partName(partNum), + partStart, + fetchBytes ); - OutputStream fileOutputStream = Files.newOutputStream(request.getFilePath()); - OutputStream localFileOutputStream = new BufferedOutputStream(fileOutputStream) - ) { - snapshotFileInputStream.transferTo(localFileOutputStream); + inputStreamList.add(snapshotFileInputStream); + partNum++; + pos = pos + fetchBytes; + diff = (blockEnd - pos); + + } + OutputStream fileOutputStream = Files.newOutputStream(request.getFilePath()); + OutputStream localFileOutputStream = new BufferedOutputStream(fileOutputStream); + SequenceInputStream sis = new SequenceInputStream(Collections.enumeration(inputStreamList)); + int ch; + while (true) { + try { + if (!((ch = sis.read()) != -1)) break; + localFileOutputStream.write(ch); + } catch (IOException e) { + throw new RuntimeException(e); + } } + sis.close(); + localFileOutputStream.close(); } final IndexInput luceneIndexInput = request.getDirectory().openInput(request.getFileName(), IOContext.READ); return new FileCachedIndexInput(fileCache, request.getFilePath(), luceneIndexInput); From c9f27ac99e5bada54123e6e0df5af4166e267146 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Mon, 19 Feb 2024 03:04:10 +0000 Subject: [PATCH 2/8] Refactor and address comments. Signed-off-by: Rishikesh1159 --- .../snapshots/SearchableSnapshotIT.java | 36 +++++- .../file/OnDemandBlockSnapshotIndexInput.java | 44 ++++++-- .../store/remote/utils/BlobFetchRequest.java | 15 --- .../store/remote/utils/TransferManager.java | 103 ++++++++---------- .../OnDemandBlockSnapshotIndexInputTests.java | 7 +- .../remote/utils/TransferManagerTests.java | 39 +++---- 6 files changed, 134 insertions(+), 110 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java index 58d56705d7e9c..a9d369026108a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java @@ -75,10 +75,10 @@ protected Settings.Builder randomRepositorySettings() { return settings; } - private Settings.Builder chunkedRepositorySettings() { + private Settings.Builder chunkedRepositorySettings(long chunkSize) { final Settings.Builder settings = Settings.builder(); settings.put("location", randomRepoPath()).put("compress", randomBoolean()); - settings.put("chunk_size", randomBoolean() ? (2 << 23) : 1000, ByteSizeUnit.BYTES); + settings.put("chunk_size", chunkSize, ByteSizeUnit.BYTES); return settings; } @@ -184,10 +184,10 @@ public void testSnapshottingSearchableSnapshots() throws Exception { } /** - * Tests a chunked repository scenario for searchable snapshots by creating an index, + * Tests a default 8mib chunked repository scenario for searchable snapshots by creating an index, * taking a snapshot, restoring it as a searchable snapshot index. */ - public void testCreateSearchableSnapshotWithChunks() throws Exception { + public void testCreateSearchableSnapshotWithDefaultChunks() throws Exception { final int numReplicasIndex = randomIntBetween(1, 4); final String indexName = "test-idx"; final String restoredIndexName = indexName + "-copy"; @@ -195,7 +195,33 @@ public void testCreateSearchableSnapshotWithChunks() throws Exception { final String snapshotName = "test-snap"; final Client client = client(); - Settings.Builder repositorySettings = chunkedRepositorySettings(); + Settings.Builder repositorySettings = chunkedRepositorySettings(2 << 23); + + internalCluster().ensureAtLeastNumSearchAndDataNodes(numReplicasIndex + 1); + createIndexWithDocsAndEnsureGreen(numReplicasIndex, 1000, indexName); + createRepositoryWithSettings(repositorySettings, repoName); + takeSnapshot(client, snapshotName, repoName, indexName); + + deleteIndicesAndEnsureGreen(client, indexName); + restoreSnapshotAndEnsureGreen(client, snapshotName, repoName); + assertRemoteSnapshotIndexSettings(client, restoredIndexName); + + assertDocCount(restoredIndexName, 1000L); + } + + /** + * Tests a small 1000 bytes chunked repository scenario for searchable snapshots by creating an index, + * taking a snapshot, restoring it as a searchable snapshot index. + */ + public void testCreateSearchableSnapshotWithSmallChunks() throws Exception { + final int numReplicasIndex = randomIntBetween(1, 4); + final String indexName = "test-idx"; + final String restoredIndexName = indexName + "-copy"; + final String repoName = "test-repo"; + final String snapshotName = "test-snap"; + final Client client = client(); + + Settings.Builder repositorySettings = chunkedRepositorySettings(1000); internalCluster().ensureAtLeastNumSearchAndDataNodes(numReplicasIndex + 1); createIndexWithDocsAndEnsureGreen(numReplicasIndex, 1000, indexName); diff --git a/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java b/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java index f7c6bd2c05857..4cf537ab30037 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java +++ b/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java @@ -15,6 +15,8 @@ import org.opensearch.index.store.remote.utils.TransferManager; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; /** * This is an implementation of {@link OnDemandBlockIndexInput} where this class provides the main IndexInput using shard snapshot files. @@ -139,15 +141,39 @@ protected IndexInput fetchBlock(int blockId) throws IOException { // If the snapshot file is chunked, we must account for this by // choosing the appropriate file part and updating the position // accordingly. - - BlobFetchRequest blobFetchRequest = BlobFetchRequest.builder() - .position(blockStart) - .length(blockEnd) - .directory(directory) - .fileName(blockFileName) - .fileInfo(fileInfo) - .build(); - return transferManager.fetchBlob(blobFetchRequest); + int partNum = (int) (blockStart / partSize); + long pos = blockStart; + long diff = (blockEnd - blockStart); + + // Block may be present on multiple chunks of a file, so we need + // to make multiple blobFetchRequest to fetch an entire block. + // Each fetchBlobRequest can fetch only a single chunk of a file. + List blobFetchRequestList = new ArrayList<>(); + while (diff > 0) { + long partStart = pos % partSize; + long partEnd; + if ((partStart + diff) > partSize) { + partEnd = partSize; + } else { + partEnd = (partStart + diff); + } + long fetchBytes = partEnd - partStart; + BlobFetchRequest.Builder builder = BlobFetchRequest.builder(); + builder.position(partStart) + .length(fetchBytes) + .blobName(fileInfo.partName(partNum)) + .directory(directory) + .fileName(blockFileName); + BlobFetchRequest req = builder.build(); + blobFetchRequestList.add(req); + partNum++; + pos = pos + fetchBytes; + diff = (blockEnd - pos); + } + if (blobFetchRequestList.isEmpty()) { + throw new IOException("block size cannot be zero"); + } + return transferManager.fetchBlob(blobFetchRequestList); } @Override diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/BlobFetchRequest.java b/server/src/main/java/org/opensearch/index/store/remote/utils/BlobFetchRequest.java index 2e45e55d795c0..d0508e9c6f4c7 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/BlobFetchRequest.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/BlobFetchRequest.java @@ -10,7 +10,6 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; -import org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; import java.nio.file.Path; @@ -27,8 +26,6 @@ public class BlobFetchRequest { private final String blobName; - private final BlobStoreIndexShardSnapshot.FileInfo fileInfo; - private final Path filePath; private final Directory directory; @@ -39,7 +36,6 @@ private BlobFetchRequest(Builder builder) { this.position = builder.position; this.length = builder.length; this.blobName = builder.blobName; - this.fileInfo = builder.fileInfo; this.fileName = builder.fileName; this.filePath = builder.directory.getDirectory().resolve(fileName); this.directory = builder.directory; @@ -57,10 +53,6 @@ public String getBlobName() { return blobName; } - public BlobStoreIndexShardSnapshot.FileInfo getFileInfo() { - return fileInfo; - } - public Path getFilePath() { return filePath; } @@ -104,8 +96,6 @@ public static final class Builder { private long position; private long length; private String blobName; - - private BlobStoreIndexShardSnapshot.FileInfo fileInfo; private FSDirectory directory; private String fileName; @@ -129,11 +119,6 @@ public Builder blobName(String blobName) { return this; } - public Builder fileInfo(BlobStoreIndexShardSnapshot.FileInfo fileInfo) { - this.fileInfo = fileInfo; - return this; - } - public Builder directory(FSDirectory directory) { this.directory = directory; return this; diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java index e061e0c2a4e8e..96f19878d32e7 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java @@ -21,19 +21,18 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.io.SequenceInputStream; import java.io.UncheckedIOException; import java.nio.file.Files; import java.nio.file.Path; import java.security.AccessController; import java.security.PrivilegedAction; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.atomic.AtomicBoolean; +import static java.nio.file.StandardOpenOption.APPEND; + /** * This acts as entry point to fetch {@link BlobFetchRequest} and return actual {@link IndexInput}. Utilizes the BlobContainer interface to * read snapshot files located within a repository. This basically adapts BlobContainer snapshots files into IndexInput @@ -52,17 +51,20 @@ public TransferManager(final BlobContainer blobContainer, final FileCache fileCa } /** - * Given a blobFetchRequest, return it's corresponding IndexInput. - * @param blobFetchRequest to fetch + * Given a blobFetchRequestList, return it's corresponding IndexInput. + * @param blobFetchRequestList to fetch * @return future of IndexInput augmented with internal caching maintenance tasks */ - public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOException { - final Path key = blobFetchRequest.getFilePath(); + public IndexInput fetchBlob(List blobFetchRequestList) throws IOException { + + assert blobFetchRequestList.isEmpty() == false; + + final Path key = blobFetchRequestList.get(0).getFilePath(); final CachedIndexInput cacheEntry = fileCache.compute(key, (path, cachedIndexInput) -> { if (cachedIndexInput == null || cachedIndexInput.isClosed()) { // Doesn't exist or is closed, either way create a new one - return new DelayedCreationCachedIndexInput(fileCache, blobContainer, blobFetchRequest); + return new DelayedCreationCachedIndexInput(fileCache, blobContainer, blobFetchRequestList); } else { // already in the cache and ready to be used (open) return cachedIndexInput; @@ -79,58 +81,41 @@ public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOExceptio } } - private static FileCachedIndexInput createIndexInput(FileCache fileCache, BlobContainer blobContainer, BlobFetchRequest request) { + private static FileCachedIndexInput createIndexInput( + FileCache fileCache, + BlobContainer blobContainer, + List requestList + ) { // We need to do a privileged action here in order to fetch from remote // and write to the local file cache in case this is invoked as a side // effect of a plugin (such as a scripted search) that doesn't have the // necessary permissions. + assert requestList.isEmpty() == false; return AccessController.doPrivileged((PrivilegedAction) () -> { try { - if (Files.exists(request.getFilePath()) == false) { - List inputStreamList = new ArrayList<>(); - long partSize = request.getFileInfo().partSize().getBytes(); - long blockStart = request.getPosition(); - long blockEnd = request.getLength(); - int partNum = (int) (blockStart / partSize); - long pos = blockStart; - long diff = (blockEnd - pos); - while (diff > 0) { - long partStart = pos % partSize; - long partEnd; - if ((partStart + diff) > partSize) { - partEnd = partSize; - } else { - partEnd = (partStart + diff); + boolean needsAppend = false; + if (Files.exists(requestList.get(0).getFilePath()) == false) { + for (BlobFetchRequest request : requestList) { + try ( + InputStream snapshotFileInputStream = blobContainer.readBlob( + request.getBlobName(), + request.getPosition(), + request.getLength() + ); + OutputStream fileOutputStream = needsAppend + ? Files.newOutputStream(request.getFilePath(), APPEND) + : Files.newOutputStream(request.getFilePath()); + OutputStream localFileOutputStream = new BufferedOutputStream(fileOutputStream) + ) { + snapshotFileInputStream.transferTo(localFileOutputStream); } - long fetchBytes = partEnd - partStart; - InputStream snapshotFileInputStream = blobContainer.readBlob( - request.getFileInfo().partName(partNum), - partStart, - fetchBytes - ); - inputStreamList.add(snapshotFileInputStream); - partNum++; - pos = pos + fetchBytes; - diff = (blockEnd - pos); - + needsAppend = true; } - OutputStream fileOutputStream = Files.newOutputStream(request.getFilePath()); - OutputStream localFileOutputStream = new BufferedOutputStream(fileOutputStream); - SequenceInputStream sis = new SequenceInputStream(Collections.enumeration(inputStreamList)); - int ch; - while (true) { - try { - if (!((ch = sis.read()) != -1)) break; - localFileOutputStream.write(ch); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - sis.close(); - localFileOutputStream.close(); } - final IndexInput luceneIndexInput = request.getDirectory().openInput(request.getFileName(), IOContext.READ); - return new FileCachedIndexInput(fileCache, request.getFilePath(), luceneIndexInput); + final IndexInput luceneIndexInput = requestList.get(0) + .getDirectory() + .openInput(requestList.get(0).getFileName(), IOContext.READ); + return new FileCachedIndexInput(fileCache, requestList.get(0).getFilePath(), luceneIndexInput); } catch (IOException e) { throw new UncheckedIOException(e); } @@ -147,15 +132,15 @@ private static FileCachedIndexInput createIndexInput(FileCache fileCache, BlobCo private static class DelayedCreationCachedIndexInput implements CachedIndexInput { private final FileCache fileCache; private final BlobContainer blobContainer; - private final BlobFetchRequest request; + private final List requestList; private final CompletableFuture result = new CompletableFuture<>(); private final AtomicBoolean isStarted = new AtomicBoolean(false); private final AtomicBoolean isClosed = new AtomicBoolean(false); - private DelayedCreationCachedIndexInput(FileCache fileCache, BlobContainer blobContainer, BlobFetchRequest request) { + private DelayedCreationCachedIndexInput(FileCache fileCache, BlobContainer blobContainer, List requestList) { this.fileCache = fileCache; this.blobContainer = blobContainer; - this.request = request; + this.requestList = requestList; } @Override @@ -166,10 +151,10 @@ public IndexInput getIndexInput() throws IOException { if (isStarted.getAndSet(true) == false) { // We're the first one here, need to download the block try { - result.complete(createIndexInput(fileCache, blobContainer, request)); + result.complete(createIndexInput(fileCache, blobContainer, requestList)); } catch (Exception e) { result.completeExceptionally(e); - fileCache.remove(request.getFilePath()); + fileCache.remove(requestList.get(0).getFilePath()); } } try { @@ -186,7 +171,11 @@ public IndexInput getIndexInput() throws IOException { @Override public long length() { - return request.getLength(); + long length = 0; + for (BlobFetchRequest request : requestList) { + length += request.getLength(); + } + return length; } @Override diff --git a/server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInputTests.java b/server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInputTests.java index 2204124f1de4f..7e786381e88b7 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInputTests.java +++ b/server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInputTests.java @@ -32,6 +32,7 @@ import java.io.EOFException; import java.io.IOException; import java.nio.file.Path; +import java.util.List; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.any; @@ -106,7 +107,7 @@ public void testChunkedRepository() throws IOException { indexInput.seek(repositoryChunkSize); } // Verify the second chunk is requested (i.e. ".part1") - verify(transferManager).fetchBlob(argThat(request -> request.getBlobName().equals("File_Name.part1"))); + verify(transferManager).fetchBlob(argThat(requestList -> requestList.get(0).getBlobName().equals("File_Name.part1"))); } private void runAllTestsFor(int blockSizeShift) throws Exception { @@ -147,8 +148,8 @@ private OnDemandBlockSnapshotIndexInput createOnDemandBlockSnapshotIndexInput(in int blockSize = 1 << blockSizeShift; doAnswer(invocation -> { - BlobFetchRequest blobFetchRequest = invocation.getArgument(0); - return blobFetchRequest.getDirectory().openInput(blobFetchRequest.getFileName(), IOContext.READ); + List blobFetchRequestList = invocation.getArgument(0); + return blobFetchRequestList.get(0).getDirectory().openInput(blobFetchRequestList.get(0).getFileName(), IOContext.READ); }).when(transferManager).fetchBlob(any()); FSDirectory directory = null; diff --git a/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTests.java b/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTests.java index d42e614302658..b1bfc6888d4af 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTests.java @@ -163,18 +163,11 @@ public void testUsageExceedsCapacity() throws Exception { public void testDownloadFails() throws Exception { doThrow(new IOException("Expected test exception")).when(blobContainer).readBlob(eq("failure-blob"), anyLong(), anyLong()); - expectThrows( - IOException.class, - () -> transferManager.fetchBlob( - BlobFetchRequest.builder() - .blobName("failure-blob") - .position(0) - .fileName("file") - .directory(directory) - .length(EIGHT_MB) - .build() - ) + List blobFetchRequestList = new ArrayList<>(); + blobFetchRequestList.add( + BlobFetchRequest.builder().blobName("failure-blob").position(0).fileName("file").directory(directory).length(EIGHT_MB).build() ); + expectThrows(IOException.class, () -> transferManager.fetchBlob(blobFetchRequestList)); MatcherAssert.assertThat(fileCache.usage().activeUsage(), equalTo(0L)); MatcherAssert.assertThat(fileCache.usage().usage(), equalTo(0L)); } @@ -187,17 +180,19 @@ public void testFetchesToDifferentBlobsDoNotBlockOnEachOther() throws Exception latch.await(); return new ByteArrayInputStream(createData()); }).when(blobContainer).readBlob(eq("blocking-blob"), anyLong(), anyLong()); + List blobFetchRequestList = new ArrayList<>(); + blobFetchRequestList.add( + BlobFetchRequest.builder() + .blobName("blocking-blob") + .position(0) + .fileName("blocking-file") + .directory(directory) + .length(EIGHT_MB) + .build() + ); final Thread blockingThread = new Thread(() -> { try { - transferManager.fetchBlob( - BlobFetchRequest.builder() - .blobName("blocking-blob") - .position(0) - .fileName("blocking-file") - .directory(directory) - .length(EIGHT_MB) - .build() - ); + transferManager.fetchBlob(blobFetchRequestList); } catch (IOException e) { throw new RuntimeException(e); } @@ -216,9 +211,11 @@ public void testFetchesToDifferentBlobsDoNotBlockOnEachOther() throws Exception } private IndexInput fetchBlobWithName(String blobname) throws IOException { - return transferManager.fetchBlob( + List blobFetchRequestList = new ArrayList<>(); + blobFetchRequestList.add( BlobFetchRequest.builder().blobName("blob").position(0).fileName(blobname).directory(directory).length(EIGHT_MB).build() ); + return transferManager.fetchBlob(blobFetchRequestList); } private static void assertIndexInputIsFunctional(IndexInput indexInput) throws IOException { From b58f5d2ec592ae6ea84350e572dca3825b3fc7c6 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Mon, 19 Feb 2024 03:33:10 +0000 Subject: [PATCH 3/8] apply spotless check Signed-off-by: Rishikesh1159 --- .../index/store/remote/utils/TransferManager.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java index 69eb6ba07b58e..b93e3703538af 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java @@ -82,7 +82,11 @@ public IndexInput fetchBlob(List blobFetchRequestList) throws } @SuppressWarnings("removal") - private static FileCachedIndexInput createIndexInput(FileCache fileCache, BlobContainer blobContainer, List requestList) { + private static FileCachedIndexInput createIndexInput( + FileCache fileCache, + BlobContainer blobContainer, + List requestList + ) { // We need to do a privileged action here in order to fetch from remote // and write to the local file cache in case this is invoked as a side // effect of a plugin (such as a scripted search) that doesn't have the From c49d57c9024e40ef6c0c73606d46f7fe8fe6a1da Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Wed, 21 Feb 2024 17:29:26 +0000 Subject: [PATCH 4/8] Address comments of using a different data structure to fetch blob parts. Signed-off-by: Rishikesh1159 --- .../snapshots/SearchableSnapshotIT.java | 5 +- .../file/OnDemandBlockSnapshotIndexInput.java | 17 +--- .../store/remote/utils/BlobFetchRequest.java | 88 +++++++++---------- .../store/remote/utils/TransferManager.java | 73 +++++++-------- .../OnDemandBlockSnapshotIndexInputTests.java | 7 +- .../remote/utils/TransferManagerTests.java | 34 +++---- 6 files changed, 100 insertions(+), 124 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java index 90bb2b501764e..b22d24e4f9fa3 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java @@ -469,9 +469,12 @@ private void takeSnapshot(Client client, String snapshotName, String repoName, S } private void createRepositoryWithSettings(Settings.Builder repositorySettings, String repoName) { + final Settings.Builder settings = Settings.builder(); + settings.put("location", randomRepoPath()).put("compress", randomBoolean()); + settings.put("chunk_size", 1000, ByteSizeUnit.BYTES); logger.info("--> Create a repository"); if (repositorySettings == null) { - createRepository(repoName, FsRepository.TYPE); + createRepository(repoName, FsRepository.TYPE, settings); } else { createRepository(repoName, FsRepository.TYPE, repositorySettings); } diff --git a/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java b/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java index 4cf537ab30037..dae8106e6ba38 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java +++ b/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java @@ -148,7 +148,7 @@ protected IndexInput fetchBlock(int blockId) throws IOException { // Block may be present on multiple chunks of a file, so we need // to make multiple blobFetchRequest to fetch an entire block. // Each fetchBlobRequest can fetch only a single chunk of a file. - List blobFetchRequestList = new ArrayList<>(); + List blobParts = new ArrayList<>(); while (diff > 0) { long partStart = pos % partSize; long partEnd; @@ -158,22 +158,13 @@ protected IndexInput fetchBlock(int blockId) throws IOException { partEnd = (partStart + diff); } long fetchBytes = partEnd - partStart; - BlobFetchRequest.Builder builder = BlobFetchRequest.builder(); - builder.position(partStart) - .length(fetchBytes) - .blobName(fileInfo.partName(partNum)) - .directory(directory) - .fileName(blockFileName); - BlobFetchRequest req = builder.build(); - blobFetchRequestList.add(req); + blobParts.add(new BlobFetchRequest.BlobPart(fileInfo.partName(partNum), partStart, fetchBytes)); partNum++; pos = pos + fetchBytes; diff = (blockEnd - pos); } - if (blobFetchRequestList.isEmpty()) { - throw new IOException("block size cannot be zero"); - } - return transferManager.fetchBlob(blobFetchRequestList); + BlobFetchRequest.Builder builder = BlobFetchRequest.builder().blobParts(blobParts).directory(directory).fileName(blockFileName); + return transferManager.fetchBlob(builder.build()); } @Override diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/BlobFetchRequest.java b/server/src/main/java/org/opensearch/index/store/remote/utils/BlobFetchRequest.java index d0508e9c6f4c7..45796244af42e 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/BlobFetchRequest.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/BlobFetchRequest.java @@ -12,6 +12,7 @@ import org.apache.lucene.store.FSDirectory; import java.nio.file.Path; +import java.util.List; /** * The specification to fetch specific block from blob store @@ -20,37 +21,19 @@ */ public class BlobFetchRequest { - private final long position; - - private final long length; - - private final String blobName; - private final Path filePath; private final Directory directory; private final String fileName; + private final List blobParts; + private BlobFetchRequest(Builder builder) { - this.position = builder.position; - this.length = builder.length; - this.blobName = builder.blobName; this.fileName = builder.fileName; this.filePath = builder.directory.getDirectory().resolve(fileName); this.directory = builder.directory; - } - - public long getPosition() { - return position; - } - - public long getLength() { - return length; - } - - public String getBlobName() { - return blobName; + this.blobParts = builder.blobParts; } public Path getFilePath() { @@ -65,6 +48,10 @@ public String getFileName() { return fileName; } + public List blobParts() { + return blobParts; + } + public static Builder builder() { return new Builder(); } @@ -72,12 +59,8 @@ public static Builder builder() { @Override public String toString() { return "BlobFetchRequest{" - + "position=" - + position - + ", length=" - + length - + ", blobName='" - + blobName + + "blobParts=" + + blobParts + '\'' + ", filePath=" + filePath @@ -90,35 +73,45 @@ public String toString() { } /** - * Builder for BlobFetchRequest + * BlobPart represents a single chunk of a file */ - public static final class Builder { + public static class BlobPart { + private String blobName; private long position; private long length; - private String blobName; - private FSDirectory directory; - private String fileName; - - private Builder() {} - - public Builder position(long position) { - this.position = position; - return this; - } - public Builder length(long length) { + public BlobPart(String blobName, long position, long length) { + this.blobName = blobName; if (length <= 0) { - throw new IllegalArgumentException("Length for blob fetch request needs to be non-negative"); + throw new IllegalArgumentException("Length for blob part fetch request needs to be non-negative"); } this.length = length; - return this; + this.position = position; } - public Builder blobName(String blobName) { - this.blobName = blobName; - return this; + public String getBlobName() { + return blobName; } + public long getPosition() { + return position; + } + + public long getLength() { + return length; + } + } + + /** + * Builder for BlobFetchRequest + */ + public static final class Builder { + private List blobParts; + private FSDirectory directory; + private String fileName; + + private Builder() {} + public Builder directory(FSDirectory directory) { this.directory = directory; return this; @@ -129,6 +122,11 @@ public Builder fileName(String fileName) { return this; } + public Builder blobParts(List blobParts) { + this.blobParts = blobParts; + return this; + } + public BlobFetchRequest build() { return new BlobFetchRequest(this); } diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java index b93e3703538af..01df121ccfcbf 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java @@ -17,7 +17,6 @@ import org.opensearch.index.store.remote.filecache.FileCache; import org.opensearch.index.store.remote.filecache.FileCachedIndexInput; -import java.io.BufferedOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -26,7 +25,6 @@ import java.nio.file.Path; import java.security.AccessController; import java.security.PrivilegedAction; -import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.atomic.AtomicBoolean; @@ -52,19 +50,17 @@ public TransferManager(final BlobContainer blobContainer, final FileCache fileCa /** * Given a blobFetchRequestList, return it's corresponding IndexInput. - * @param blobFetchRequestList to fetch + * @param blobFetchRequest to fetch * @return future of IndexInput augmented with internal caching maintenance tasks */ - public IndexInput fetchBlob(List blobFetchRequestList) throws IOException { + public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOException { - assert blobFetchRequestList.isEmpty() == false; - - final Path key = blobFetchRequestList.get(0).getFilePath(); + final Path key = blobFetchRequest.getFilePath(); final CachedIndexInput cacheEntry = fileCache.compute(key, (path, cachedIndexInput) -> { if (cachedIndexInput == null || cachedIndexInput.isClosed()) { // Doesn't exist or is closed, either way create a new one - return new DelayedCreationCachedIndexInput(fileCache, blobContainer, blobFetchRequestList); + return new DelayedCreationCachedIndexInput(fileCache, blobContainer, blobFetchRequest); } else { // already in the cache and ready to be used (open) return cachedIndexInput; @@ -82,41 +78,36 @@ public IndexInput fetchBlob(List blobFetchRequestList) throws } @SuppressWarnings("removal") - private static FileCachedIndexInput createIndexInput( - FileCache fileCache, - BlobContainer blobContainer, - List requestList - ) { + private static FileCachedIndexInput createIndexInput(FileCache fileCache, BlobContainer blobContainer, BlobFetchRequest request) { // We need to do a privileged action here in order to fetch from remote // and write to the local file cache in case this is invoked as a side // effect of a plugin (such as a scripted search) that doesn't have the // necessary permissions. - assert requestList.isEmpty() == false; return AccessController.doPrivileged((PrivilegedAction) () -> { try { - boolean needsAppend = false; - if (Files.exists(requestList.get(0).getFilePath()) == false) { - for (BlobFetchRequest request : requestList) { - try ( - InputStream snapshotFileInputStream = blobContainer.readBlob( - request.getBlobName(), - request.getPosition(), - request.getLength() - ); - OutputStream fileOutputStream = needsAppend - ? Files.newOutputStream(request.getFilePath(), APPEND) - : Files.newOutputStream(request.getFilePath()); - OutputStream localFileOutputStream = new BufferedOutputStream(fileOutputStream) - ) { - snapshotFileInputStream.transferTo(localFileOutputStream); + if (Files.exists(request.getFilePath()) == false) { + try ( + // Create a new empty file in the specified path + OutputStream fileOutputStream = Files.newOutputStream(request.getFilePath()); + + // All blob parts will be appended to file iteratively. + OutputStream localFileOutputStream = Files.newOutputStream(request.getFilePath(), APPEND) + ) { + for (BlobFetchRequest.BlobPart blobPart : request.blobParts()) { + try ( + InputStream snapshotFileInputStream = blobContainer.readBlob( + blobPart.getBlobName(), + blobPart.getPosition(), + blobPart.getLength() + ); + ) { + snapshotFileInputStream.transferTo(localFileOutputStream); + } } - needsAppend = true; } } - final IndexInput luceneIndexInput = requestList.get(0) - .getDirectory() - .openInput(requestList.get(0).getFileName(), IOContext.READ); - return new FileCachedIndexInput(fileCache, requestList.get(0).getFilePath(), luceneIndexInput); + final IndexInput luceneIndexInput = request.getDirectory().openInput(request.getFileName(), IOContext.READ); + return new FileCachedIndexInput(fileCache, request.getFilePath(), luceneIndexInput); } catch (IOException e) { throw new UncheckedIOException(e); } @@ -133,15 +124,15 @@ private static FileCachedIndexInput createIndexInput( private static class DelayedCreationCachedIndexInput implements CachedIndexInput { private final FileCache fileCache; private final BlobContainer blobContainer; - private final List requestList; + private final BlobFetchRequest request; private final CompletableFuture result = new CompletableFuture<>(); private final AtomicBoolean isStarted = new AtomicBoolean(false); private final AtomicBoolean isClosed = new AtomicBoolean(false); - private DelayedCreationCachedIndexInput(FileCache fileCache, BlobContainer blobContainer, List requestList) { + private DelayedCreationCachedIndexInput(FileCache fileCache, BlobContainer blobContainer, BlobFetchRequest request) { this.fileCache = fileCache; this.blobContainer = blobContainer; - this.requestList = requestList; + this.request = request; } @Override @@ -152,10 +143,10 @@ public IndexInput getIndexInput() throws IOException { if (isStarted.getAndSet(true) == false) { // We're the first one here, need to download the block try { - result.complete(createIndexInput(fileCache, blobContainer, requestList)); + result.complete(createIndexInput(fileCache, blobContainer, request)); } catch (Exception e) { result.completeExceptionally(e); - fileCache.remove(requestList.get(0).getFilePath()); + fileCache.remove(request.getFilePath()); } } try { @@ -173,8 +164,8 @@ public IndexInput getIndexInput() throws IOException { @Override public long length() { long length = 0; - for (BlobFetchRequest request : requestList) { - length += request.getLength(); + for (BlobFetchRequest.BlobPart blobPart : request.blobParts()) { + length += blobPart.getLength(); } return length; } diff --git a/server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInputTests.java b/server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInputTests.java index 7e786381e88b7..0e177d977f460 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInputTests.java +++ b/server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInputTests.java @@ -32,7 +32,6 @@ import java.io.EOFException; import java.io.IOException; import java.nio.file.Path; -import java.util.List; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.any; @@ -107,7 +106,7 @@ public void testChunkedRepository() throws IOException { indexInput.seek(repositoryChunkSize); } // Verify the second chunk is requested (i.e. ".part1") - verify(transferManager).fetchBlob(argThat(requestList -> requestList.get(0).getBlobName().equals("File_Name.part1"))); + verify(transferManager).fetchBlob(argThat(request -> request.blobParts().get(0).getBlobName().equals("File_Name.part1"))); } private void runAllTestsFor(int blockSizeShift) throws Exception { @@ -148,8 +147,8 @@ private OnDemandBlockSnapshotIndexInput createOnDemandBlockSnapshotIndexInput(in int blockSize = 1 << blockSizeShift; doAnswer(invocation -> { - List blobFetchRequestList = invocation.getArgument(0); - return blobFetchRequestList.get(0).getDirectory().openInput(blobFetchRequestList.get(0).getFileName(), IOContext.READ); + BlobFetchRequest blobFetchRequest = invocation.getArgument(0); + return blobFetchRequest.getDirectory().openInput(blobFetchRequest.getFileName(), IOContext.READ); }).when(transferManager).fetchBlob(any()); FSDirectory directory = null; diff --git a/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTests.java b/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTests.java index b1bfc6888d4af..7ae3944eb6944 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTests.java @@ -163,11 +163,12 @@ public void testUsageExceedsCapacity() throws Exception { public void testDownloadFails() throws Exception { doThrow(new IOException("Expected test exception")).when(blobContainer).readBlob(eq("failure-blob"), anyLong(), anyLong()); - List blobFetchRequestList = new ArrayList<>(); - blobFetchRequestList.add( - BlobFetchRequest.builder().blobName("failure-blob").position(0).fileName("file").directory(directory).length(EIGHT_MB).build() + List blobParts = new ArrayList<>(); + blobParts.add(new BlobFetchRequest.BlobPart("failure-blob", 0, EIGHT_MB)); + expectThrows( + IOException.class, + () -> transferManager.fetchBlob(BlobFetchRequest.builder().fileName("file").directory(directory).blobParts(blobParts).build()) ); - expectThrows(IOException.class, () -> transferManager.fetchBlob(blobFetchRequestList)); MatcherAssert.assertThat(fileCache.usage().activeUsage(), equalTo(0L)); MatcherAssert.assertThat(fileCache.usage().usage(), equalTo(0L)); } @@ -180,19 +181,14 @@ public void testFetchesToDifferentBlobsDoNotBlockOnEachOther() throws Exception latch.await(); return new ByteArrayInputStream(createData()); }).when(blobContainer).readBlob(eq("blocking-blob"), anyLong(), anyLong()); - List blobFetchRequestList = new ArrayList<>(); - blobFetchRequestList.add( - BlobFetchRequest.builder() - .blobName("blocking-blob") - .position(0) - .fileName("blocking-file") - .directory(directory) - .length(EIGHT_MB) - .build() - ); + List blobParts = new ArrayList<>(); + blobParts.add(new BlobFetchRequest.BlobPart("blocking-blob", 0, EIGHT_MB)); + final Thread blockingThread = new Thread(() -> { try { - transferManager.fetchBlob(blobFetchRequestList); + transferManager.fetchBlob( + BlobFetchRequest.builder().fileName("blocking-file").directory(directory).blobParts(blobParts).build() + ); } catch (IOException e) { throw new RuntimeException(e); } @@ -211,11 +207,9 @@ public void testFetchesToDifferentBlobsDoNotBlockOnEachOther() throws Exception } private IndexInput fetchBlobWithName(String blobname) throws IOException { - List blobFetchRequestList = new ArrayList<>(); - blobFetchRequestList.add( - BlobFetchRequest.builder().blobName("blob").position(0).fileName(blobname).directory(directory).length(EIGHT_MB).build() - ); - return transferManager.fetchBlob(blobFetchRequestList); + List blobParts = new ArrayList<>(); + blobParts.add(new BlobFetchRequest.BlobPart("blob", 0, EIGHT_MB)); + return transferManager.fetchBlob(BlobFetchRequest.builder().fileName(blobname).directory(directory).blobParts(blobParts).build()); } private static void assertIndexInputIsFunctional(IndexInput indexInput) throws IOException { From d3ae7cb8bad19195a68acb9259ec11337cbf0786 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Wed, 21 Feb 2024 17:32:16 +0000 Subject: [PATCH 5/8] remove unnecessary code. Signed-off-by: Rishikesh1159 --- .../java/org/opensearch/snapshots/SearchableSnapshotIT.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java index b22d24e4f9fa3..90bb2b501764e 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java @@ -469,12 +469,9 @@ private void takeSnapshot(Client client, String snapshotName, String repoName, S } private void createRepositoryWithSettings(Settings.Builder repositorySettings, String repoName) { - final Settings.Builder settings = Settings.builder(); - settings.put("location", randomRepoPath()).put("compress", randomBoolean()); - settings.put("chunk_size", 1000, ByteSizeUnit.BYTES); logger.info("--> Create a repository"); if (repositorySettings == null) { - createRepository(repoName, FsRepository.TYPE, settings); + createRepository(repoName, FsRepository.TYPE); } else { createRepository(repoName, FsRepository.TYPE, repositorySettings); } From 780663992de799ef0ce9308c27f9f4ce4484b1fb Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Thu, 22 Feb 2024 00:22:37 +0000 Subject: [PATCH 6/8] Refactor outputstream usage. Signed-off-by: Rishikesh1159 --- .../index/store/remote/utils/TransferManager.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java index 01df121ccfcbf..8341b1dd2cf36 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java @@ -17,6 +17,7 @@ import org.opensearch.index.store.remote.filecache.FileCache; import org.opensearch.index.store.remote.filecache.FileCachedIndexInput; +import java.io.BufferedOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -29,8 +30,6 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.atomic.AtomicBoolean; -import static java.nio.file.StandardOpenOption.APPEND; - /** * This acts as entry point to fetch {@link BlobFetchRequest} and return actual {@link IndexInput}. Utilizes the BlobContainer interface to * read snapshot files located within a repository. This basically adapts BlobContainer snapshots files into IndexInput @@ -87,11 +86,8 @@ private static FileCachedIndexInput createIndexInput(FileCache fileCache, BlobCo try { if (Files.exists(request.getFilePath()) == false) { try ( - // Create a new empty file in the specified path OutputStream fileOutputStream = Files.newOutputStream(request.getFilePath()); - - // All blob parts will be appended to file iteratively. - OutputStream localFileOutputStream = Files.newOutputStream(request.getFilePath(), APPEND) + OutputStream localFileOutputStream = new BufferedOutputStream(fileOutputStream) ) { for (BlobFetchRequest.BlobPart blobPart : request.blobParts()) { try ( From ee541031007fb9f95e110c4e7c636a14950cb5ca Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Thu, 22 Feb 2024 19:28:17 +0000 Subject: [PATCH 7/8] refactor blobpart logic into a separate method and add unit tests. Signed-off-by: Rishikesh1159 --- .../file/OnDemandBlockSnapshotIndexInput.java | 20 +++++++++---- .../store/remote/utils/BlobFetchRequest.java | 7 +++++ .../store/remote/utils/TransferManager.java | 6 +--- .../OnDemandBlockSnapshotIndexInputTests.java | 30 +++++++++++++++++++ 4 files changed, 53 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java b/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java index dae8106e6ba38..8097fd08da50a 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java +++ b/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java @@ -138,6 +138,20 @@ protected IndexInput fetchBlock(int blockId) throws IOException { final long blockStart = getBlockStart(blockId); final long blockEnd = blockStart + getActualBlockSize(blockId); + // Block may be present on multiple chunks of a file, so we need + // to fetch each chunk/blob part separately to fetch an entire block. + BlobFetchRequest blobFetchRequest = BlobFetchRequest.builder() + .blobParts(getBlobParts(blockStart, blockEnd)) + .directory(directory) + .fileName(blockFileName) + .build(); + return transferManager.fetchBlob(blobFetchRequest); + } + + /** + * Returns list of blob parts/chunks in a file for a given block. + */ + protected List getBlobParts(long blockStart, long blockEnd) { // If the snapshot file is chunked, we must account for this by // choosing the appropriate file part and updating the position // accordingly. @@ -145,9 +159,6 @@ protected IndexInput fetchBlock(int blockId) throws IOException { long pos = blockStart; long diff = (blockEnd - blockStart); - // Block may be present on multiple chunks of a file, so we need - // to make multiple blobFetchRequest to fetch an entire block. - // Each fetchBlobRequest can fetch only a single chunk of a file. List blobParts = new ArrayList<>(); while (diff > 0) { long partStart = pos % partSize; @@ -163,8 +174,7 @@ protected IndexInput fetchBlock(int blockId) throws IOException { pos = pos + fetchBytes; diff = (blockEnd - pos); } - BlobFetchRequest.Builder builder = BlobFetchRequest.builder().blobParts(blobParts).directory(directory).fileName(blockFileName); - return transferManager.fetchBlob(builder.build()); + return blobParts; } @Override diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/BlobFetchRequest.java b/server/src/main/java/org/opensearch/index/store/remote/utils/BlobFetchRequest.java index 45796244af42e..f7e6545b5010e 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/BlobFetchRequest.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/BlobFetchRequest.java @@ -29,11 +29,14 @@ public class BlobFetchRequest { private final List blobParts; + private final long blobLength; + private BlobFetchRequest(Builder builder) { this.fileName = builder.fileName; this.filePath = builder.directory.getDirectory().resolve(fileName); this.directory = builder.directory; this.blobParts = builder.blobParts; + this.blobLength = builder.blobParts.stream().mapToLong(o -> o.getLength()).sum(); } public Path getFilePath() { @@ -52,6 +55,10 @@ public List blobParts() { return blobParts; } + public long getBlobLength() { + return blobLength; + } + public static Builder builder() { return new Builder(); } diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java index 8341b1dd2cf36..98cad7bfadb09 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java @@ -159,11 +159,7 @@ public IndexInput getIndexInput() throws IOException { @Override public long length() { - long length = 0; - for (BlobFetchRequest.BlobPart blobPart : request.blobParts()) { - length += blobPart.getLength(); - } - return length; + return request.getBlobLength(); } @Override diff --git a/server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInputTests.java b/server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInputTests.java index 0e177d977f460..fe0bbaca56d90 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInputTests.java +++ b/server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInputTests.java @@ -115,6 +115,7 @@ private void runAllTestsFor(int blockSizeShift) throws Exception { TestGroup.testGetBlock(blockedSnapshotFile, blockSize, FILE_SIZE); TestGroup.testGetBlockOffset(blockedSnapshotFile, blockSize, FILE_SIZE); TestGroup.testGetBlockStart(blockedSnapshotFile, blockSize); + TestGroup.testGetBlobParts(blockedSnapshotFile); TestGroup.testCurrentBlockStart(blockedSnapshotFile, blockSize); TestGroup.testCurrentBlockPosition(blockedSnapshotFile, blockSize); TestGroup.testClone(blockedSnapshotFile, blockSize); @@ -252,6 +253,35 @@ public static void testGetBlockStart(OnDemandBlockSnapshotIndexInput blockedSnap assertEquals(blockSize * 2, blockedSnapshotFile.getBlockStart(2)); } + public static void testGetBlobParts(OnDemandBlockSnapshotIndexInput blockedSnapshotFile) { + // block id 0 + int blockId = 0; + long blockStart = blockedSnapshotFile.getBlockStart(blockId); + long blockEnd = blockStart + blockedSnapshotFile.getActualBlockSize(blockId); + assertEquals( + (blockEnd - blockStart), + blockedSnapshotFile.getBlobParts(blockStart, blockEnd).stream().mapToLong(o -> o.getLength()).sum() + ); + + // block 1 + blockId = 1; + blockStart = blockedSnapshotFile.getBlockStart(blockId); + blockEnd = blockStart + blockedSnapshotFile.getActualBlockSize(blockId); + assertEquals( + (blockEnd - blockStart), + blockedSnapshotFile.getBlobParts(blockStart, blockEnd).stream().mapToLong(o -> o.getLength()).sum() + ); + + // block 2 + blockId = 2; + blockStart = blockedSnapshotFile.getBlockStart(blockId); + blockEnd = blockStart + blockedSnapshotFile.getActualBlockSize(blockId); + assertEquals( + (blockEnd - blockStart), + blockedSnapshotFile.getBlobParts(blockStart, blockEnd).stream().mapToLong(o -> o.getLength()).sum() + ); + } + public static void testCurrentBlockStart(OnDemandBlockSnapshotIndexInput blockedSnapshotFile, int blockSize) throws IOException { // block 0 blockedSnapshotFile.seek(blockSize - 1); From f1a27f8e888889514e1981f82474768c1c345ae4 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Mon, 26 Feb 2024 22:37:04 +0000 Subject: [PATCH 8/8] Add new unit tests. Signed-off-by: Rishikesh1159 --- .../OnDemandBlockSnapshotIndexInputTests.java | 33 +++++++++++++++---- 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInputTests.java b/server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInputTests.java index fe0bbaca56d90..a135802c5f49c 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInputTests.java +++ b/server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInputTests.java @@ -78,11 +78,31 @@ public void test4MBBlock() throws Exception { runAllTestsFor(22); } - public void testChunkedRepository() throws IOException { - final long blockSize = new ByteSizeValue(1, ByteSizeUnit.KB).getBytes(); - final long repositoryChunkSize = new ByteSizeValue(2, ByteSizeUnit.KB).getBytes(); - final long fileSize = new ByteSizeValue(3, ByteSizeUnit.KB).getBytes(); + public void testChunkedRepositoryWithBlockSizeGreaterThanChunkSize() throws IOException { + verifyChunkedRepository( + new ByteSizeValue(8, ByteSizeUnit.KB).getBytes(), // block Size + new ByteSizeValue(2, ByteSizeUnit.KB).getBytes(), // repository chunk size + new ByteSizeValue(15, ByteSizeUnit.KB).getBytes() // file size + ); + } + + public void testChunkedRepositoryWithBlockSizeLessThanChunkSize() throws IOException { + verifyChunkedRepository( + new ByteSizeValue(1, ByteSizeUnit.KB).getBytes(), // block Size + new ByteSizeValue(2, ByteSizeUnit.KB).getBytes(), // repository chunk size + new ByteSizeValue(3, ByteSizeUnit.KB).getBytes() // file size + ); + } + public void testChunkedRepositoryWithBlockSizeEqualToChunkSize() throws IOException { + verifyChunkedRepository( + new ByteSizeValue(2, ByteSizeUnit.KB).getBytes(), // block Size + new ByteSizeValue(2, ByteSizeUnit.KB).getBytes(), // repository chunk size + new ByteSizeValue(15, ByteSizeUnit.KB).getBytes() // file size + ); + } + + private void verifyChunkedRepository(long blockSize, long repositoryChunkSize, long fileSize) throws IOException { when(transferManager.fetchBlob(any())).thenReturn(new ByteArrayIndexInput("test", new byte[(int) blockSize])); try ( FSDirectory directory = new MMapDirectory(path, lockFactory); @@ -105,8 +125,9 @@ public void testChunkedRepository() throws IOException { // Seek to the position past the first repository chunk indexInput.seek(repositoryChunkSize); } - // Verify the second chunk is requested (i.e. ".part1") - verify(transferManager).fetchBlob(argThat(request -> request.blobParts().get(0).getBlobName().equals("File_Name.part1"))); + + // Verify all the chunks related to block are added to the fetchBlob request + verify(transferManager).fetchBlob(argThat(request -> request.getBlobLength() == blockSize)); } private void runAllTestsFor(int blockSizeShift) throws Exception {