Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Searchable Snapshot] Fix bug of Searchable Snapshot Dependency on repository chunk_size #12277

Merged
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ protected Settings.Builder randomRepositorySettings() {
return settings;
}

private Settings.Builder chunkedRepositorySettings() {
private Settings.Builder chunkedRepositorySettings(long chunkSize) {
final Settings.Builder settings = Settings.builder();
settings.put("location", randomRepoPath()).put("compress", randomBoolean());
settings.put("chunk_size", 2 << 23, ByteSizeUnit.BYTES);
settings.put("chunk_size", chunkSize, ByteSizeUnit.BYTES);
return settings;
}

Expand Down Expand Up @@ -194,18 +194,44 @@ public void testSnapshottingSearchableSnapshots() throws Exception {
}

/**
* Tests a chunked repository scenario for searchable snapshots by creating an index,
* Tests a default 8mib chunked repository scenario for searchable snapshots by creating an index,
* taking a snapshot, restoring it as a searchable snapshot index.
*/
public void testCreateSearchableSnapshotWithChunks() throws Exception {
public void testCreateSearchableSnapshotWithDefaultChunks() throws Exception {
final int numReplicasIndex = randomIntBetween(1, 4);
final String indexName = "test-idx";
final String restoredIndexName = indexName + "-copy";
final String repoName = "test-repo";
final String snapshotName = "test-snap";
final Client client = client();

Settings.Builder repositorySettings = chunkedRepositorySettings();
Settings.Builder repositorySettings = chunkedRepositorySettings(2 << 23);

internalCluster().ensureAtLeastNumSearchAndDataNodes(numReplicasIndex + 1);
createIndexWithDocsAndEnsureGreen(numReplicasIndex, 1000, indexName);
createRepositoryWithSettings(repositorySettings, repoName);
takeSnapshot(client, snapshotName, repoName, indexName);

deleteIndicesAndEnsureGreen(client, indexName);
restoreSnapshotAndEnsureGreen(client, snapshotName, repoName);
assertRemoteSnapshotIndexSettings(client, restoredIndexName);

assertDocCount(restoredIndexName, 1000L);
}

/**
* Tests a small 1000 bytes chunked repository scenario for searchable snapshots by creating an index,
* taking a snapshot, restoring it as a searchable snapshot index.
*/
public void testCreateSearchableSnapshotWithSmallChunks() throws Exception {
final int numReplicasIndex = randomIntBetween(1, 4);
final String indexName = "test-idx";
final String restoredIndexName = indexName + "-copy";
final String repoName = "test-repo";
final String snapshotName = "test-snap";
final Client client = client();

Settings.Builder repositorySettings = chunkedRepositorySettings(1000);

internalCluster().ensureAtLeastNumSearchAndDataNodes(numReplicasIndex + 1);
createIndexWithDocsAndEnsureGreen(numReplicasIndex, 1000, indexName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.opensearch.index.store.remote.utils.TransferManager;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* This is an implementation of {@link OnDemandBlockIndexInput} where this class provides the main IndexInput using shard snapshot files.
Expand Down Expand Up @@ -139,20 +141,30 @@
// If the snapshot file is chunked, we must account for this by
// choosing the appropriate file part and updating the position
// accordingly.
final int part = (int) (blockStart / partSize);
final long partStart = part * partSize;

final long position = blockStart - partStart;
final long length = blockEnd - blockStart;

BlobFetchRequest blobFetchRequest = BlobFetchRequest.builder()
.position(position)
.length(length)
.blobName(fileInfo.partName(part))
.directory(directory)
.fileName(blockFileName)
.build();
return transferManager.fetchBlob(blobFetchRequest);
int partNum = (int) (blockStart / partSize);
long pos = blockStart;
long diff = (blockEnd - blockStart);

// Block may be present on multiple chunks of a file, so we need
// to make multiple blobFetchRequest to fetch an entire block.
// Each fetchBlobRequest can fetch only a single chunk of a file.
List<BlobFetchRequest.BlobPart> blobParts = new ArrayList<>();
while (diff > 0) {
kotwanikunal marked this conversation as resolved.
Show resolved Hide resolved
long partStart = pos % partSize;
long partEnd;
if ((partStart + diff) > partSize) {
partEnd = partSize;

Check warning on line 156 in server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java#L156

Added line #L156 was not covered by tests
} else {
partEnd = (partStart + diff);
}
long fetchBytes = partEnd - partStart;
blobParts.add(new BlobFetchRequest.BlobPart(fileInfo.partName(partNum), partStart, fetchBytes));
partNum++;
pos = pos + fetchBytes;
diff = (blockEnd - pos);
}
BlobFetchRequest.Builder builder = BlobFetchRequest.builder().blobParts(blobParts).directory(directory).fileName(blockFileName);
return transferManager.fetchBlob(builder.build());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.lucene.store.FSDirectory;

import java.nio.file.Path;
import java.util.List;

/**
* The specification to fetch specific block from blob store
Expand All @@ -20,37 +21,19 @@
*/
public class BlobFetchRequest {

private final long position;

private final long length;

private final String blobName;

private final Path filePath;

private final Directory directory;

private final String fileName;

private final List<BlobPart> blobParts;

private BlobFetchRequest(Builder builder) {
this.position = builder.position;
this.length = builder.length;
this.blobName = builder.blobName;
this.fileName = builder.fileName;
this.filePath = builder.directory.getDirectory().resolve(fileName);
this.directory = builder.directory;
}

public long getPosition() {
return position;
}

public long getLength() {
return length;
}

public String getBlobName() {
return blobName;
this.blobParts = builder.blobParts;
}

public Path getFilePath() {
Expand All @@ -65,19 +48,19 @@
return fileName;
}

public List<BlobPart> blobParts() {
return blobParts;
}

public static Builder builder() {
return new Builder();
}

@Override
public String toString() {
return "BlobFetchRequest{"
+ "position="
+ position
+ ", length="
+ length
+ ", blobName='"
+ blobName
+ "blobParts="
+ blobParts
+ '\''
+ ", filePath="
+ filePath
Expand All @@ -90,35 +73,45 @@
}

/**
* Builder for BlobFetchRequest
* BlobPart represents a single chunk of a file
*/
public static final class Builder {
public static class BlobPart {
private String blobName;
private long position;
private long length;
private String blobName;
private FSDirectory directory;
private String fileName;

private Builder() {}

public Builder position(long position) {
this.position = position;
return this;
}

public Builder length(long length) {
public BlobPart(String blobName, long position, long length) {
this.blobName = blobName;
if (length <= 0) {
throw new IllegalArgumentException("Length for blob fetch request needs to be non-negative");
throw new IllegalArgumentException("Length for blob part fetch request needs to be non-negative");

Check warning on line 86 in server/src/main/java/org/opensearch/index/store/remote/utils/BlobFetchRequest.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/store/remote/utils/BlobFetchRequest.java#L86

Added line #L86 was not covered by tests
}
this.length = length;
return this;
this.position = position;
}

public Builder blobName(String blobName) {
this.blobName = blobName;
return this;
public String getBlobName() {
return blobName;
}

public long getPosition() {
return position;
}

public long getLength() {
return length;
}
}

/**
* Builder for BlobFetchRequest
*/
public static final class Builder {
private List<BlobPart> blobParts;
private FSDirectory directory;
private String fileName;

private Builder() {}

public Builder directory(FSDirectory directory) {
this.directory = directory;
return this;
Expand All @@ -129,6 +122,11 @@
return this;
}

public Builder blobParts(List<BlobPart> blobParts) {
this.blobParts = blobParts;
return this;
}

public BlobFetchRequest build() {
return new BlobFetchRequest(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,12 @@ public TransferManager(final BlobContainer blobContainer, final FileCache fileCa
}

/**
* Given a blobFetchRequest, return it's corresponding IndexInput.
* Given a blobFetchRequestList, return it's corresponding IndexInput.
* @param blobFetchRequest to fetch
* @return future of IndexInput augmented with internal caching maintenance tasks
*/
public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOException {

final Path key = blobFetchRequest.getFilePath();

final CachedIndexInput cacheEntry = fileCache.compute(key, (path, cachedIndexInput) -> {
Expand Down Expand Up @@ -85,15 +86,20 @@ private static FileCachedIndexInput createIndexInput(FileCache fileCache, BlobCo
try {
if (Files.exists(request.getFilePath()) == false) {
try (
InputStream snapshotFileInputStream = blobContainer.readBlob(
request.getBlobName(),
request.getPosition(),
request.getLength()
);
OutputStream fileOutputStream = Files.newOutputStream(request.getFilePath());
OutputStream localFileOutputStream = new BufferedOutputStream(fileOutputStream)
) {
snapshotFileInputStream.transferTo(localFileOutputStream);
for (BlobFetchRequest.BlobPart blobPart : request.blobParts()) {
try (
InputStream snapshotFileInputStream = blobContainer.readBlob(
blobPart.getBlobName(),
blobPart.getPosition(),
blobPart.getLength()
);
) {
snapshotFileInputStream.transferTo(localFileOutputStream);
}
}
}
}
final IndexInput luceneIndexInput = request.getDirectory().openInput(request.getFileName(), IOContext.READ);
Expand Down Expand Up @@ -153,7 +159,11 @@ public IndexInput getIndexInput() throws IOException {

@Override
public long length() {
return request.getLength();
long length = 0;
for (BlobFetchRequest.BlobPart blobPart : request.blobParts()) {
Rishikesh1159 marked this conversation as resolved.
Show resolved Hide resolved
length += blobPart.getLength();
}
return length;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void testChunkedRepository() throws IOException {
indexInput.seek(repositoryChunkSize);
}
// Verify the second chunk is requested (i.e. ".part1")
verify(transferManager).fetchBlob(argThat(request -> request.getBlobName().equals("File_Name.part1")));
verify(transferManager).fetchBlob(argThat(request -> request.blobParts().get(0).getBlobName().equals("File_Name.part1")));
}

private void runAllTestsFor(int blockSizeShift) throws Exception {
andrross marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,17 +163,11 @@ public void testUsageExceedsCapacity() throws Exception {

public void testDownloadFails() throws Exception {
doThrow(new IOException("Expected test exception")).when(blobContainer).readBlob(eq("failure-blob"), anyLong(), anyLong());
List<BlobFetchRequest.BlobPart> blobParts = new ArrayList<>();
blobParts.add(new BlobFetchRequest.BlobPart("failure-blob", 0, EIGHT_MB));
expectThrows(
IOException.class,
() -> transferManager.fetchBlob(
BlobFetchRequest.builder()
.blobName("failure-blob")
.position(0)
.fileName("file")
.directory(directory)
.length(EIGHT_MB)
.build()
)
() -> transferManager.fetchBlob(BlobFetchRequest.builder().fileName("file").directory(directory).blobParts(blobParts).build())
);
MatcherAssert.assertThat(fileCache.usage().activeUsage(), equalTo(0L));
MatcherAssert.assertThat(fileCache.usage().usage(), equalTo(0L));
Expand All @@ -187,16 +181,13 @@ public void testFetchesToDifferentBlobsDoNotBlockOnEachOther() throws Exception
latch.await();
return new ByteArrayInputStream(createData());
}).when(blobContainer).readBlob(eq("blocking-blob"), anyLong(), anyLong());
List<BlobFetchRequest.BlobPart> blobParts = new ArrayList<>();
blobParts.add(new BlobFetchRequest.BlobPart("blocking-blob", 0, EIGHT_MB));

final Thread blockingThread = new Thread(() -> {
try {
transferManager.fetchBlob(
BlobFetchRequest.builder()
.blobName("blocking-blob")
.position(0)
.fileName("blocking-file")
.directory(directory)
.length(EIGHT_MB)
.build()
BlobFetchRequest.builder().fileName("blocking-file").directory(directory).blobParts(blobParts).build()
);
} catch (IOException e) {
throw new RuntimeException(e);
Expand All @@ -216,9 +207,9 @@ public void testFetchesToDifferentBlobsDoNotBlockOnEachOther() throws Exception
}

private IndexInput fetchBlobWithName(String blobname) throws IOException {
return transferManager.fetchBlob(
BlobFetchRequest.builder().blobName("blob").position(0).fileName(blobname).directory(directory).length(EIGHT_MB).build()
);
List<BlobFetchRequest.BlobPart> blobParts = new ArrayList<>();
blobParts.add(new BlobFetchRequest.BlobPart("blob", 0, EIGHT_MB));
return transferManager.fetchBlob(BlobFetchRequest.builder().fileName(blobname).directory(directory).blobParts(blobParts).build());
}

private static void assertIndexInputIsFunctional(IndexInput indexInput) throws IOException {
Expand Down
Loading