Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Mitigation for remote snapshot filecache overflow #15760

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions release-notes/opensearch.release-notes-2.17.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,4 @@
- Fix unchecked cast in dynamic action map getter ([#15394](https://github.com/opensearch-project/OpenSearch/pull/15394))
- Fix null values indexed as "null" strings in flat_object field ([#14069](https://github.com/opensearch-project/OpenSearch/pull/14069))
- Fix terms query on wildcard field returns nothing ([#15607](https://github.com/opensearch-project/OpenSearch/pull/15607))
- Fix remote snapshot file_cache exceeding capacity ([#15077](https://github.com/opensearch-project/OpenSearch/pull/15077))
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,19 @@ public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOExceptio
@SuppressWarnings("removal")
private static FileCachedIndexInput createIndexInput(FileCache fileCache, StreamReader streamReader, BlobFetchRequest request) {
try {
// This local file cache is ref counted and may not strictly enforce configured capacity.
// If we find available capacity is exceeded, deny further BlobFetchRequests.
if (fileCache.capacity() < fileCache.usage().usage()) {
fileCache.prune();
throw new IOException(
"Local file cache capacity ("
+ fileCache.capacity()
+ ") exceeded ("
+ fileCache.usage().usage()
+ ") - BlobFetchRequest failed: "
+ request.getFilePath()
);
}
if (Files.exists(request.getFilePath()) == false) {
logger.trace("Fetching from Remote in createIndexInput of Transfer Manager");
try (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void testConcurrentAccess() throws Exception {
}
}

public void testFetchBlobWithConcurrentCacheEvictions() throws Exception {
public void testFetchBlobWithConcurrentCacheEvictions() {
// Submit 256 tasks to an executor with 16 threads that will each randomly
// request one of eight blobs. Given that the cache can only hold two
// blobs this will lead to a huge amount of contention and thrashing.
Expand All @@ -114,41 +114,34 @@ public void testFetchBlobWithConcurrentCacheEvictions() throws Exception {
try (IndexInput indexInput = fetchBlobWithName(blobname)) {
assertIndexInputIsFunctional(indexInput);
}
} catch (IOException ignored) { // fetchBlobWithName may fail due to fixed capacity
} catch (Exception e) {
throw new AssertionError(e);
}
}));
}
// Wait for all threads to complete
for (Future<?> future : futures) {
future.get(10, TimeUnit.SECONDS);
try {
for (Future<?> future : futures) {
future.get(10, TimeUnit.SECONDS);
}
} catch (java.util.concurrent.ExecutionException ignored) { // Index input may be null
} catch (Exception e) {
throw new AssertionError(e);
}

} finally {
assertTrue(terminate(testRunner));
}
MatcherAssert.assertThat("Expected many evictions to happen", fileCache.stats().evictionCount(), greaterThan(0L));
}

public void testUsageExceedsCapacity() throws Exception {
// Fetch resources that exceed the configured capacity of the cache and assert that the
// returned IndexInputs are still functional.
try (IndexInput i1 = fetchBlobWithName("1"); IndexInput i2 = fetchBlobWithName("2"); IndexInput i3 = fetchBlobWithName("3")) {
assertIndexInputIsFunctional(i1);
assertIndexInputIsFunctional(i2);
assertIndexInputIsFunctional(i3);
MatcherAssert.assertThat(fileCache.usage().activeUsage(), equalTo((long) EIGHT_MB * 3));
MatcherAssert.assertThat(fileCache.usage().usage(), equalTo((long) EIGHT_MB * 3));
}
MatcherAssert.assertThat(fileCache.usage().activeUsage(), equalTo(0L));
MatcherAssert.assertThat(fileCache.usage().usage(), equalTo((long) EIGHT_MB * 3));
// Fetch another resource which will trigger an eviction
try (IndexInput i1 = fetchBlobWithName("1")) {
assertIndexInputIsFunctional(i1);
MatcherAssert.assertThat(fileCache.usage().activeUsage(), equalTo((long) EIGHT_MB));
MatcherAssert.assertThat(fileCache.usage().usage(), equalTo((long) EIGHT_MB));
}
MatcherAssert.assertThat(fileCache.usage().activeUsage(), equalTo(0L));
MatcherAssert.assertThat(fileCache.usage().usage(), equalTo((long) EIGHT_MB));
public void testOverflowDisabled() throws Exception {
initializeTransferManager();
IndexInput i1 = fetchBlobWithName("1");
IndexInput i2 = fetchBlobWithName("2");

assertThrows(IOException.class, () -> { IndexInput i3 = fetchBlobWithName("3"); });
}

public void testDownloadFails() throws Exception {
Expand Down
Loading