diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java index 91890cd25c2b1..b6d6913a9f8d4 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java @@ -242,6 +242,11 @@ private V compute(ICacheKey key, LoadAwareCacheLoader, V> loader if (pair != null) { try (ReleasableLock ignore = writeLock.acquire()) { onHeapCache.put(pair.v1(), pair.v2()); + } catch (Exception e) { + // TODO: Catch specific exceptions to know whether this resulted from cache or underlying removal + // listeners/stats. Needs better exception handling at underlying layers.For now swallowing + // exception. + logger.warn("Exception occurred while putting item onto heap cache", e); } } else { if (ex != null) { @@ -251,8 +256,8 @@ private V compute(ICacheKey key, LoadAwareCacheLoader, V> loader completableFutureMap.remove(key); // Remove key from map as not needed anymore. return null; }; + V value = null; if (future == null) { - V value = null; future = completableFutureMap.get(key); future.handle(handler); try { @@ -268,15 +273,12 @@ private V compute(ICacheKey key, LoadAwareCacheLoader, V> loader } else { future.complete(new Tuple<>(key, value)); } - } - V value; - try { - value = future.get().v2(); - if (future.isCompletedExceptionally()) { - throw new IllegalStateException("Future completed exceptionally but no error thrown"); + } else { + try { + value = future.get().v2(); + } catch (InterruptedException ex) { + throw new IllegalStateException(ex); } - } catch (InterruptedException ex) { - throw new IllegalStateException(ex); } return value; } @@ -387,12 +389,22 @@ void handleRemovalFromHeapTier(RemovalNotification, V> notification ICacheKey key = notification.getKey(); boolean wasEvicted = SPILLOVER_REMOVAL_REASONS.contains(notification.getRemovalReason()); boolean countEvictionTowardsTotal = false; // Don't count this eviction towards the cache's total if it ends up in the disk tier - if (caches.get(diskCache).isEnabled() && wasEvicted && evaluatePolicies(notification.getValue())) { + boolean exceptionOccurredOnDiskCachePut = false; + boolean canCacheOnDisk = caches.get(diskCache).isEnabled() && wasEvicted && evaluatePolicies(notification.getValue()); + if (canCacheOnDisk) { try (ReleasableLock ignore = writeLock.acquire()) { diskCache.put(key, notification.getValue()); // spill over to the disk tier and increment its stats + } catch (Exception ex) { + // TODO: Catch specific exceptions. Needs better exception handling. We are just swallowing exception + // in this case as it shouldn't cause upstream request to fail. + logger.warn("Exception occurred while putting item to disk cache", ex); + exceptionOccurredOnDiskCachePut = true; } - updateStatsOnPut(TIER_DIMENSION_VALUE_DISK, key, notification.getValue()); - } else { + if (!exceptionOccurredOnDiskCachePut) { + updateStatsOnPut(TIER_DIMENSION_VALUE_DISK, key, notification.getValue()); + } + } + if (!canCacheOnDisk || exceptionOccurredOnDiskCachePut) { // If the value is not going to the disk cache, send this notification to the TSC's removal listener // as the value is leaving the TSC entirely removalListener.onRemoval(notification); diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java index a63aae4e096c2..98a4cc8bd3924 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java @@ -60,6 +60,10 @@ import static org.opensearch.cache.common.tier.TieredSpilloverCacheStatsHolder.TIER_DIMENSION_VALUE_DISK; import static org.opensearch.cache.common.tier.TieredSpilloverCacheStatsHolder.TIER_DIMENSION_VALUE_ON_HEAP; import static org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings.MAXIMUM_SIZE_IN_BYTES_KEY; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class TieredSpilloverCacheTests extends OpenSearchTestCase { static final List dimensionNames = List.of("dim1", "dim2", "dim3"); @@ -866,10 +870,9 @@ public void testComputIfAbsentConcurrentlyWithMultipleKeys() throws Exception { CountDownLatch countDownLatch = new CountDownLatch(iterations * numberOfKeys); // To wait for all threads to finish. List, String>> loadAwareCacheLoaderList = new CopyOnWriteArrayList<>(); - - for (int i = 0; i < iterations; i++) { - for (int j = 0; j < numberOfKeys; j++) { - int finalJ = j; + for (int j = 0; j < numberOfKeys; j++) { + int finalJ = j; + for (int i = 0; i < iterations; i++) { executorService.submit(() -> { try { LoadAwareCacheLoader, String> loadAwareCacheLoader = new LoadAwareCacheLoader<>() { @@ -890,8 +893,9 @@ public String load(ICacheKey key) { tieredSpilloverCache.computeIfAbsent(iCacheKeyList.get(finalJ), loadAwareCacheLoader); } catch (Exception e) { throw new RuntimeException(e); + } finally { + countDownLatch.countDown(); } - countDownLatch.countDown(); }); } } @@ -913,6 +917,23 @@ public String load(ICacheKey key) { } public void testComputeIfAbsentConcurrentlyAndThrowsException() throws Exception { + LoadAwareCacheLoader, String> loadAwareCacheLoader = new LoadAwareCacheLoader<>() { + boolean isLoaded = false; + + @Override + public boolean isLoaded() { + return isLoaded; + } + + @Override + public String load(ICacheKey key) { + throw new RuntimeException("Testing"); + } + }; + verifyComputeIfAbsentThrowsException(RuntimeException.class, loadAwareCacheLoader, "Testing"); + } + + public void testComputeIfAbsentWithOnHeapCacheThrowingExceptionOnPut() throws Exception { int onHeapCacheSize = randomIntBetween(100, 300); int diskCacheSize = randomIntBetween(200, 400); int keyValueSize = 50; @@ -926,67 +947,60 @@ public void testComputeIfAbsentConcurrentlyAndThrowsException() throws Exception onHeapCacheSize * keyValueSize + "b" ) .build(); + ICache.Factory onHeapCacheFactory = mock(OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.class); + ICache mockOnHeapCache = mock(ICache.class); + when(onHeapCacheFactory.create(any(), any(), any())).thenReturn(mockOnHeapCache); + doThrow(new RuntimeException("Testing")).when(mockOnHeapCache).put(any(), any()); + CacheConfig cacheConfig = new CacheConfig.Builder().setKeyType(String.class) + .setKeyType(String.class) + .setWeigher((k, v) -> keyValueSize) + .setSettings(settings) + .setDimensionNames(dimensionNames) + .setRemovalListener(removalListener) + .setKeySerializer(new StringSerializer()) + .setValueSerializer(new StringSerializer()) + .setSettings( + Settings.builder() + .put( + CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(), + TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ) + .put(FeatureFlags.PLUGGABLE_CACHE, "true") + .put(settings) + .build() + ) + .setClusterSettings(clusterSettings) + .build(); + ICache.Factory mockDiskCacheFactory = new MockDiskCache.MockDiskCacheFactory(0, diskCacheSize, false); - TieredSpilloverCache tieredSpilloverCache = initializeTieredSpilloverCache( - keyValueSize, - diskCacheSize, - removalListener, - settings, - 0 - ); - - int numberOfSameKeys = randomIntBetween(10, onHeapCacheSize - 1); - ICacheKey key = getICacheKey(UUID.randomUUID().toString()); - String value = UUID.randomUUID().toString(); - AtomicInteger exceptionCount = new AtomicInteger(); - - Thread[] threads = new Thread[numberOfSameKeys]; - Phaser phaser = new Phaser(numberOfSameKeys + 1); - CountDownLatch countDownLatch = new CountDownLatch(numberOfSameKeys); // To wait for all threads to finish. - - List, String>> loadAwareCacheLoaderList = new CopyOnWriteArrayList<>(); - - for (int i = 0; i < numberOfSameKeys; i++) { - threads[i] = new Thread(() -> { - try { - LoadAwareCacheLoader, String> loadAwareCacheLoader = new LoadAwareCacheLoader<>() { - boolean isLoaded = false; - - @Override - public boolean isLoaded() { - return isLoaded; - } + TieredSpilloverCache tieredSpilloverCache = new TieredSpilloverCache.Builder().setCacheType( + CacheType.INDICES_REQUEST_CACHE + ) + .setRemovalListener(removalListener) + .setOnHeapCacheFactory(onHeapCacheFactory) + .setDiskCacheFactory(mockDiskCacheFactory) + .setCacheConfig(cacheConfig) + .build(); + String value = ""; + try { + value = tieredSpilloverCache.computeIfAbsent(getICacheKey("test"), new LoadAwareCacheLoader<>() { + @Override + public boolean isLoaded() { + return false; + } - @Override - public String load(ICacheKey key) { - throw new RuntimeException("Testing"); - } - }; - loadAwareCacheLoaderList.add(loadAwareCacheLoader); - phaser.arriveAndAwaitAdvance(); - tieredSpilloverCache.computeIfAbsent(key, loadAwareCacheLoader); - } catch (Exception e) { - exceptionCount.incrementAndGet(); - assertEquals(ExecutionException.class, e.getClass()); - assertEquals(RuntimeException.class, e.getCause().getClass()); - assertEquals("Testing", e.getCause().getMessage()); - } finally { - countDownLatch.countDown(); + @Override + public String load(ICacheKey key) { + return "test"; } }); - threads[i].start(); - } - phaser.arriveAndAwaitAdvance(); - countDownLatch.await(); // Wait for rest of tasks to be cancelled. - - // Verify exception count was equal to number of requests - assertEquals(numberOfSameKeys, exceptionCount.get()); + } catch (Exception ex) {} + assertEquals("test", value); assertEquals(0, tieredSpilloverCache.completableFutureMap.size()); } - public void testComputeIfAbsentConcurrentlyWithLoaderReturningNull() throws Exception { - int onHeapCacheSize = randomIntBetween(100, 300); - int diskCacheSize = randomIntBetween(200, 400); + public void testComputeIfAbsentWithDiskCacheThrowingExceptionOnPut() throws Exception { + int onHeapCacheSize = 0; int keyValueSize = 50; MockCacheRemovalListener removalListener = new MockCacheRemovalListener<>(); @@ -998,62 +1012,75 @@ public void testComputeIfAbsentConcurrentlyWithLoaderReturningNull() throws Exce onHeapCacheSize * keyValueSize + "b" ) .build(); + ICache.Factory onHeapCacheFactory = new OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory(); + CacheConfig cacheConfig = new CacheConfig.Builder().setKeyType(String.class) + .setKeyType(String.class) + .setWeigher((k, v) -> keyValueSize) + .setSettings(settings) + .setDimensionNames(dimensionNames) + .setRemovalListener(removalListener) + .setKeySerializer(new StringSerializer()) + .setValueSerializer(new StringSerializer()) + .setSettings( + Settings.builder() + .put( + CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(), + TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ) + .put(FeatureFlags.PLUGGABLE_CACHE, "true") + .put(settings) + .build() + ) + .setClusterSettings(clusterSettings) + .build(); + ICache.Factory mockDiskCacheFactory = mock(MockDiskCache.MockDiskCacheFactory.class); + ICache mockDiskCache = mock(ICache.class); + when(mockDiskCacheFactory.create(any(), any(), any())).thenReturn(mockDiskCache); + doThrow(new RuntimeException("Test")).when(mockDiskCache).put(any(), any()); - TieredSpilloverCache tieredSpilloverCache = initializeTieredSpilloverCache( - keyValueSize, - diskCacheSize, - removalListener, - settings, - 0 - ); - - int numberOfSameKeys = randomIntBetween(10, onHeapCacheSize - 1); - ICacheKey key = getICacheKey(UUID.randomUUID().toString()); - String value = UUID.randomUUID().toString(); - AtomicInteger exceptionCount = new AtomicInteger(); - - Thread[] threads = new Thread[numberOfSameKeys]; - Phaser phaser = new Phaser(numberOfSameKeys + 1); - CountDownLatch countDownLatch = new CountDownLatch(numberOfSameKeys); // To wait for all threads to finish. + TieredSpilloverCache tieredSpilloverCache = new TieredSpilloverCache.Builder().setCacheType( + CacheType.INDICES_REQUEST_CACHE + ) + .setRemovalListener(removalListener) + .setOnHeapCacheFactory(onHeapCacheFactory) + .setDiskCacheFactory(mockDiskCacheFactory) + .setCacheConfig(cacheConfig) + .build(); + String value = ""; + value = tieredSpilloverCache.computeIfAbsent(getICacheKey("test"), new LoadAwareCacheLoader<>() { + @Override + public boolean isLoaded() { + return false; + } - List, String>> loadAwareCacheLoaderList = new CopyOnWriteArrayList<>(); + @Override + public String load(ICacheKey key) { + return "test"; + } + }); + ImmutableCacheStats diskStats = getStatsSnapshotForTier(tieredSpilloverCache, TIER_DIMENSION_VALUE_DISK); - for (int i = 0; i < numberOfSameKeys; i++) { - threads[i] = new Thread(() -> { - try { - LoadAwareCacheLoader, String> loadAwareCacheLoader = new LoadAwareCacheLoader<>() { - boolean isLoaded = false; + assertEquals(0, diskStats.getSizeInBytes()); + assertEquals(1, removalListener.evictionsMetric.count()); + assertEquals("test", value); + assertEquals(0, tieredSpilloverCache.completableFutureMap.size()); + } - @Override - public boolean isLoaded() { - return isLoaded; - } + public void testComputeIfAbsentConcurrentlyWithLoaderReturningNull() throws Exception { + LoadAwareCacheLoader, String> loadAwareCacheLoader = new LoadAwareCacheLoader<>() { + boolean isLoaded = false; - @Override - public String load(ICacheKey key) { - return null; - } - }; - loadAwareCacheLoaderList.add(loadAwareCacheLoader); - phaser.arriveAndAwaitAdvance(); - tieredSpilloverCache.computeIfAbsent(key, loadAwareCacheLoader); - } catch (Exception e) { - exceptionCount.incrementAndGet(); - assertEquals(ExecutionException.class, e.getClass()); - assertEquals(NullPointerException.class, e.getCause().getClass()); - assertEquals("Loader returned a null value", e.getCause().getMessage()); - } finally { - countDownLatch.countDown(); - } - }); - threads[i].start(); - } - phaser.arriveAndAwaitAdvance(); - countDownLatch.await(); // Wait for rest of tasks to be cancelled. + @Override + public boolean isLoaded() { + return isLoaded; + } - // Verify exception count was equal to number of requests - assertEquals(numberOfSameKeys, exceptionCount.get()); - assertEquals(0, tieredSpilloverCache.completableFutureMap.size()); + @Override + public String load(ICacheKey key) { + return null; + } + }; + verifyComputeIfAbsentThrowsException(NullPointerException.class, loadAwareCacheLoader, "Loader returned a " + "null value"); } public void testConcurrencyForEvictionFlowFromOnHeapToDiskTier() throws Exception { @@ -1731,6 +1758,66 @@ private ImmutableCacheStats getStatsSnapshotForTier(TieredSpilloverCache t return snapshot; } + private void verifyComputeIfAbsentThrowsException( + Class expectedException, + LoadAwareCacheLoader loader, + String expectedExceptionMessage + ) throws InterruptedException { + int onHeapCacheSize = randomIntBetween(100, 300); + int diskCacheSize = randomIntBetween(200, 400); + int keyValueSize = 50; + + MockCacheRemovalListener removalListener = new MockCacheRemovalListener<>(); + Settings settings = Settings.builder() + .put( + OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE) + .get(MAXIMUM_SIZE_IN_BYTES_KEY) + .getKey(), + onHeapCacheSize * keyValueSize + "b" + ) + .build(); + + TieredSpilloverCache tieredSpilloverCache = initializeTieredSpilloverCache( + keyValueSize, + diskCacheSize, + removalListener, + settings, + 0 + ); + + int numberOfSameKeys = randomIntBetween(10, onHeapCacheSize - 1); + ICacheKey key = getICacheKey(UUID.randomUUID().toString()); + String value = UUID.randomUUID().toString(); + AtomicInteger exceptionCount = new AtomicInteger(); + + Thread[] threads = new Thread[numberOfSameKeys]; + Phaser phaser = new Phaser(numberOfSameKeys + 1); + CountDownLatch countDownLatch = new CountDownLatch(numberOfSameKeys); // To wait for all threads to finish. + + for (int i = 0; i < numberOfSameKeys; i++) { + threads[i] = new Thread(() -> { + try { + phaser.arriveAndAwaitAdvance(); + tieredSpilloverCache.computeIfAbsent(key, loader); + } catch (Exception e) { + exceptionCount.incrementAndGet(); + assertEquals(ExecutionException.class, e.getClass()); + assertEquals(expectedException, e.getCause().getClass()); + assertEquals(expectedExceptionMessage, e.getCause().getMessage()); + } finally { + countDownLatch.countDown(); + } + }); + threads[i].start(); + } + phaser.arriveAndAwaitAdvance(); + countDownLatch.await(); // Wait for rest of tasks to be cancelled. + + // Verify exception count was equal to number of requests + assertEquals(numberOfSameKeys, exceptionCount.get()); + assertEquals(0, tieredSpilloverCache.completableFutureMap.size()); + } + private ImmutableCacheStats getTotalStatsSnapshot(TieredSpilloverCache tsc) throws IOException { ImmutableCacheStatsHolder cacheStats = tsc.stats(new String[0]); return cacheStats.getStatsForDimensionValues(List.of());