Skip to content

Commit

Permalink
refactor blobpart logic into a separate method and add unit tests.
Browse files Browse the repository at this point in the history
Signed-off-by: Rishikesh1159 <[email protected]>
  • Loading branch information
Rishikesh1159 committed Feb 22, 2024
1 parent 7806639 commit ee54103
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,16 +138,27 @@ protected IndexInput fetchBlock(int blockId) throws IOException {
final long blockStart = getBlockStart(blockId);
final long blockEnd = blockStart + getActualBlockSize(blockId);

// Block may be present on multiple chunks of a file, so we need
// to fetch each chunk/blob part separately to fetch an entire block.
BlobFetchRequest blobFetchRequest = BlobFetchRequest.builder()
.blobParts(getBlobParts(blockStart, blockEnd))
.directory(directory)
.fileName(blockFileName)
.build();
return transferManager.fetchBlob(blobFetchRequest);
}

/**
* Returns list of blob parts/chunks in a file for a given block.
*/
protected List<BlobFetchRequest.BlobPart> getBlobParts(long blockStart, long blockEnd) {
// If the snapshot file is chunked, we must account for this by
// choosing the appropriate file part and updating the position
// accordingly.
int partNum = (int) (blockStart / partSize);
long pos = blockStart;
long diff = (blockEnd - blockStart);

// Block may be present on multiple chunks of a file, so we need
// to make multiple blobFetchRequest to fetch an entire block.
// Each fetchBlobRequest can fetch only a single chunk of a file.
List<BlobFetchRequest.BlobPart> blobParts = new ArrayList<>();
while (diff > 0) {
long partStart = pos % partSize;
Expand All @@ -163,8 +174,7 @@ protected IndexInput fetchBlock(int blockId) throws IOException {
pos = pos + fetchBytes;
diff = (blockEnd - pos);
}
BlobFetchRequest.Builder builder = BlobFetchRequest.builder().blobParts(blobParts).directory(directory).fileName(blockFileName);
return transferManager.fetchBlob(builder.build());
return blobParts;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,14 @@ public class BlobFetchRequest {

private final List<BlobPart> blobParts;

private final long blobLength;

private BlobFetchRequest(Builder builder) {
this.fileName = builder.fileName;
this.filePath = builder.directory.getDirectory().resolve(fileName);
this.directory = builder.directory;
this.blobParts = builder.blobParts;
this.blobLength = builder.blobParts.stream().mapToLong(o -> o.getLength()).sum();
}

public Path getFilePath() {
Expand All @@ -52,6 +55,10 @@ public List<BlobPart> blobParts() {
return blobParts;
}

public long getBlobLength() {
return blobLength;
}

public static Builder builder() {
return new Builder();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,7 @@ public IndexInput getIndexInput() throws IOException {

@Override
public long length() {
long length = 0;
for (BlobFetchRequest.BlobPart blobPart : request.blobParts()) {
length += blobPart.getLength();
}
return length;
return request.getBlobLength();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ private void runAllTestsFor(int blockSizeShift) throws Exception {
TestGroup.testGetBlock(blockedSnapshotFile, blockSize, FILE_SIZE);
TestGroup.testGetBlockOffset(blockedSnapshotFile, blockSize, FILE_SIZE);
TestGroup.testGetBlockStart(blockedSnapshotFile, blockSize);
TestGroup.testGetBlobParts(blockedSnapshotFile);
TestGroup.testCurrentBlockStart(blockedSnapshotFile, blockSize);
TestGroup.testCurrentBlockPosition(blockedSnapshotFile, blockSize);
TestGroup.testClone(blockedSnapshotFile, blockSize);
Expand Down Expand Up @@ -252,6 +253,35 @@ public static void testGetBlockStart(OnDemandBlockSnapshotIndexInput blockedSnap
assertEquals(blockSize * 2, blockedSnapshotFile.getBlockStart(2));
}

public static void testGetBlobParts(OnDemandBlockSnapshotIndexInput blockedSnapshotFile) {
// block id 0
int blockId = 0;
long blockStart = blockedSnapshotFile.getBlockStart(blockId);
long blockEnd = blockStart + blockedSnapshotFile.getActualBlockSize(blockId);
assertEquals(
(blockEnd - blockStart),
blockedSnapshotFile.getBlobParts(blockStart, blockEnd).stream().mapToLong(o -> o.getLength()).sum()
);

// block 1
blockId = 1;
blockStart = blockedSnapshotFile.getBlockStart(blockId);
blockEnd = blockStart + blockedSnapshotFile.getActualBlockSize(blockId);
assertEquals(
(blockEnd - blockStart),
blockedSnapshotFile.getBlobParts(blockStart, blockEnd).stream().mapToLong(o -> o.getLength()).sum()
);

// block 2
blockId = 2;
blockStart = blockedSnapshotFile.getBlockStart(blockId);
blockEnd = blockStart + blockedSnapshotFile.getActualBlockSize(blockId);
assertEquals(
(blockEnd - blockStart),
blockedSnapshotFile.getBlobParts(blockStart, blockEnd).stream().mapToLong(o -> o.getLength()).sum()
);
}

public static void testCurrentBlockStart(OnDemandBlockSnapshotIndexInput blockedSnapshotFile, int blockSize) throws IOException {
// block 0
blockedSnapshotFile.seek(blockSize - 1);
Expand Down

0 comments on commit ee54103

Please sign in to comment.