Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.11] Use file length from metadata while downloading segments from remote store #10531

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -193,10 +192,14 @@ public IndexOutput createOutput(String name, IOContext context) {
*/
@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
return openInput(name, fileLength(name), context);
}

public IndexInput openInput(String name, long fileLength, IOContext context) throws IOException {
InputStream inputStream = null;
try {
inputStream = blobContainer.readBlob(name);
return new RemoteIndexInput(name, downloadRateLimiter.apply(inputStream), fileLength(name));
return new RemoteIndexInput(name, downloadRateLimiter.apply(inputStream), fileLength);
} catch (Exception e) {
// Incase the RemoteIndexInput creation fails, close the input stream to avoid file handler leak.
if (inputStream != null) {
Expand Down Expand Up @@ -230,9 +233,9 @@ public void close() throws IOException {
@Override
public long fileLength(String name) throws IOException {
// ToDo: Instead of calling remote store each time, keep a cache with segment metadata
Map<String, BlobMetadata> metadata = blobContainer.listBlobsByPrefix(name);
if (metadata.containsKey(name)) {
return metadata.get(name).length();
List<BlobMetadata> metadata = blobContainer.listBlobsByPrefixInSortedOrder(name, 1, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC);
if (metadata.size() == 1 && metadata.get(0).name().equals(name)) {
return metadata.get(0).length();
}
throw new NoSuchFileException(name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,8 +434,9 @@ public IndexOutput createOutput(String name, IOContext context) throws IOExcepti
@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
String remoteFilename = getExistingRemoteFilename(name);
long fileLength = fileLength(name);
if (remoteFilename != null) {
return remoteDataDirectory.openInput(remoteFilename, context);
return remoteDataDirectory.openInput(remoteFilename, fileLength, context);
} else {
throw new NoSuchFileException(name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.nio.file.NoSuchFileException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -41,11 +40,13 @@

import org.mockito.Mockito;

import static org.opensearch.common.blobstore.BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -204,13 +205,29 @@ public void testCreateOutput() {
public void testOpenInput() throws IOException {
InputStream mockInputStream = mock(InputStream.class);
when(blobContainer.readBlob("segment_1")).thenReturn(mockInputStream);
Map<String, BlobMetadata> fileInfo = new HashMap<>();
fileInfo.put("segment_1", new PlainBlobMetadata("segment_1", 100));
when(blobContainer.listBlobsByPrefix("segment_1")).thenReturn(fileInfo);

BlobMetadata blobMetadata = new PlainBlobMetadata("segment_1", 100);

when(blobContainer.listBlobsByPrefixInSortedOrder("segment_1", 1, LEXICOGRAPHIC)).thenReturn(List.of(blobMetadata));

IndexInput indexInput = remoteDirectory.openInput("segment_1", IOContext.DEFAULT);
assertTrue(indexInput instanceof RemoteIndexInput);
assertEquals(100, indexInput.length());
verify(blobContainer).listBlobsByPrefixInSortedOrder("segment_1", 1, LEXICOGRAPHIC);
}

public void testOpenInputWithLength() throws IOException {
InputStream mockInputStream = mock(InputStream.class);
when(blobContainer.readBlob("segment_1")).thenReturn(mockInputStream);

BlobMetadata blobMetadata = new PlainBlobMetadata("segment_1", 100);

when(blobContainer.listBlobsByPrefixInSortedOrder("segment_1", 1, LEXICOGRAPHIC)).thenReturn(List.of(blobMetadata));

IndexInput indexInput = remoteDirectory.openInput("segment_1", 100, IOContext.DEFAULT);
assertTrue(indexInput instanceof RemoteIndexInput);
assertEquals(100, indexInput.length());
verify(blobContainer, times(0)).listBlobsByPrefixInSortedOrder("segment_1", 1, LEXICOGRAPHIC);
}

public void testOpenInputIOException() throws IOException {
Expand All @@ -228,9 +245,8 @@ public void testOpenInputNoSuchFileException() throws IOException {
}

public void testFileLength() throws IOException {
Map<String, BlobMetadata> fileInfo = new HashMap<>();
fileInfo.put("segment_1", new PlainBlobMetadata("segment_1", 100));
when(blobContainer.listBlobsByPrefix("segment_1")).thenReturn(fileInfo);
BlobMetadata blobMetadata = new PlainBlobMetadata("segment_1", 100);
when(blobContainer.listBlobsByPrefixInSortedOrder("segment_1", 1, LEXICOGRAPHIC)).thenReturn(List.of(blobMetadata));

assertEquals(100, remoteDirectory.fileLength("segment_1"));
}
Expand All @@ -246,13 +262,7 @@ public void testListFilesByPrefixInLexicographicOrder() throws IOException {
LatchedActionListener<List<BlobMetadata>> latchedActionListener = invocation.getArgument(3);
latchedActionListener.onResponse(List.of(new PlainBlobMetadata("metadata_1", 1)));
return null;
}).when(blobContainer)
.listBlobsByPrefixInSortedOrder(
eq("metadata"),
eq(1),
eq(BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC),
any(ActionListener.class)
);
}).when(blobContainer).listBlobsByPrefixInSortedOrder(eq("metadata"), eq(1), eq(LEXICOGRAPHIC), any(ActionListener.class));

assertEquals(List.of("metadata_1"), remoteDirectory.listFilesByPrefixInLexicographicOrder("metadata", 1));
}
Expand All @@ -262,13 +272,7 @@ public void testListFilesByPrefixInLexicographicOrderEmpty() throws IOException
LatchedActionListener<List<BlobMetadata>> latchedActionListener = invocation.getArgument(3);
latchedActionListener.onResponse(List.of());
return null;
}).when(blobContainer)
.listBlobsByPrefixInSortedOrder(
eq("metadata"),
eq(1),
eq(BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC),
any(ActionListener.class)
);
}).when(blobContainer).listBlobsByPrefixInSortedOrder(eq("metadata"), eq(1), eq(LEXICOGRAPHIC), any(ActionListener.class));

assertEquals(List.of(), remoteDirectory.listFilesByPrefixInLexicographicOrder("metadata", 1));
}
Expand All @@ -278,13 +282,7 @@ public void testListFilesByPrefixInLexicographicOrderException() {
LatchedActionListener<List<BlobMetadata>> latchedActionListener = invocation.getArgument(3);
latchedActionListener.onFailure(new IOException("Error"));
return null;
}).when(blobContainer)
.listBlobsByPrefixInSortedOrder(
eq("metadata"),
eq(1),
eq(BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC),
any(ActionListener.class)
);
}).when(blobContainer).listBlobsByPrefixInSortedOrder(eq("metadata"), eq(1), eq(LEXICOGRAPHIC), any(ActionListener.class));

assertThrows(IOException.class, () -> remoteDirectory.listFilesByPrefixInLexicographicOrder("metadata", 1));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import static org.opensearch.test.RemoteStoreTestUtils.createMetadataFileBytes;
import static org.opensearch.test.RemoteStoreTestUtils.getDummyMetadata;
import static org.hamcrest.CoreMatchers.is;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.contains;
import static org.mockito.Mockito.any;
Expand Down Expand Up @@ -391,7 +392,7 @@ public void testOpenInput() throws IOException {
remoteSegmentStoreDirectory.init();

IndexInput indexInput = mock(IndexInput.class);
when(remoteDataDirectory.openInput(startsWith("_0.si"), eq(IOContext.DEFAULT))).thenReturn(indexInput);
when(remoteDataDirectory.openInput(startsWith("_0.si"), anyLong(), eq(IOContext.DEFAULT))).thenReturn(indexInput);

assertEquals(indexInput, remoteSegmentStoreDirectory.openInput("_0.si", IOContext.DEFAULT));
}
Expand All @@ -404,7 +405,7 @@ public void testOpenInputException() throws IOException {
populateMetadata();
remoteSegmentStoreDirectory.init();

when(remoteDataDirectory.openInput(startsWith("_0.si"), eq(IOContext.DEFAULT))).thenThrow(new IOException("Error"));
when(remoteDataDirectory.openInput(startsWith("_0.si"), anyLong(), eq(IOContext.DEFAULT))).thenThrow(new IOException("Error"));

assertThrows(IOException.class, () -> remoteSegmentStoreDirectory.openInput("_0.si", IOContext.DEFAULT));
}
Expand Down