Skip to content

Commit

Permalink
testFetchBlobWithConcurrentCacheEvictions
Browse files Browse the repository at this point in the history
Signed-off-by: Finn Carroll <[email protected]>
  • Loading branch information
finnegancarroll committed Sep 3, 2024
1 parent 7101937 commit 42aa9db
Showing 1 changed file with 38 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;

@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
public abstract class TransferManagerTestCase extends OpenSearchTestCase {
Expand Down Expand Up @@ -98,6 +99,43 @@ public void testConcurrentAccess() throws Exception {
}
}

public void testFetchBlobWithConcurrentCacheEvictions() {
// Submit 256 tasks to an executor with 16 threads that will each randomly
// request one of eight blobs. Given that the cache can only hold two
// blobs this will lead to a huge amount of contention and thrashing.
final ExecutorService testRunner = Executors.newFixedThreadPool(16);
try {
final List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < 256; i++) {
// request an index input and immediately close it
final String blobname = "blob-" + randomIntBetween(0, 7);
futures.add(testRunner.submit(() -> {
try {
try (IndexInput indexInput = fetchBlobWithName(blobname)) {
assertIndexInputIsFunctional(indexInput);
}
} catch (IOException ignored) { // fetchBlobWithName may fail due to fixed capacity
} catch (Exception e) {
throw new AssertionError(e);
}
}));
}
// Wait for all threads to complete
try {
for (Future<?> future : futures) {
future.get(10, TimeUnit.SECONDS);
}
} catch (java.util.concurrent.ExecutionException ignored) { // Index input may be null
} catch (Exception e) {
throw new AssertionError(e);
}

} finally {
assertTrue(terminate(testRunner));
}
MatcherAssert.assertThat("Expected many evictions to happen", fileCache.stats().evictionCount(), greaterThan(0L));
}

public void testOverflowDisabled() throws Exception {
initializeTransferManager();
IndexInput i1 = fetchBlobWithName("1");
Expand Down

0 comments on commit 42aa9db

Please sign in to comment.