diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index d22f131853a78..11a226caab8cf 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -38,6 +38,7 @@ import org.apache.lucene.index.IndexReader; import org.apache.lucene.util.Accountable; import org.apache.lucene.util.RamUsageEstimator; +import org.opensearch.OpenSearchParseException; import org.opensearch.common.CheckedSupplier; import org.opensearch.common.cache.CacheType; import org.opensearch.common.cache.ICache; @@ -47,10 +48,12 @@ import org.opensearch.common.cache.policy.CachedQueryResult; import org.opensearch.common.cache.service.CacheService; import org.opensearch.common.cache.store.config.CacheConfig; +import org.opensearch.common.lease.Releasable; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.RatioValue; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.core.common.bytes.BytesReference; @@ -60,20 +63,26 @@ import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.shard.IndexShard; +import org.opensearch.threadpool.ThreadPool; import java.io.Closeable; import java.io.IOException; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.function.ToLongBiFunction; +import static org.opensearch.indices.IndicesService.INDICES_CACHE_CLEAN_INTERVAL_SETTING; + /** * The indices request cache allows to cache a shard level request stage responses, helping with improving * similar requests that are potentially expensive (because of aggs for example). The cache is fully coherent @@ -113,21 +122,43 @@ public final class IndicesRequestCache implements RemovalListener INDICES_REQUEST_CACHE_CLEAN_INTERVAL_SETTING = Setting.positiveTimeSetting( + "indices.requests.cache.cleanup.interval", + INDICES_CACHE_CLEAN_INTERVAL_SETTING, + Property.NodeScope + ); + public static final Setting INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING = new Setting<>( + "indices.requests.cache.cleanup.staleness_threshold", + "0%", + IndicesRequestCache::validateStalenessSetting, + Property.NodeScope + ); private final static long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Key.class); private final ConcurrentMap registeredClosedListeners = ConcurrentCollections.newConcurrentMap(); - private final Set keysToClean = ConcurrentCollections.newConcurrentSet(); private final ByteSizeValue size; private final TimeValue expire; private final ICache cache; private final Function> cacheEntityLookup; - - IndicesRequestCache(Settings settings, Function> cacheEntityFunction, CacheService cacheService) { + // pkg-private for testing + final IndicesRequestCacheCleanupManager cacheCleanupManager; + + IndicesRequestCache( + Settings settings, + Function> cacheEntityFunction, + CacheService cacheService, + ThreadPool threadPool + ) { this.size = INDICES_CACHE_QUERY_SIZE.get(settings); this.expire = INDICES_CACHE_QUERY_EXPIRE.exists(settings) ? INDICES_CACHE_QUERY_EXPIRE.get(settings) : null; long sizeInBytes = size.getBytes(); ToLongBiFunction weigher = (k, v) -> k.ramBytesUsed() + v.ramBytesUsed(); + this.cacheCleanupManager = new IndicesRequestCacheCleanupManager( + threadPool, + INDICES_REQUEST_CACHE_CLEAN_INTERVAL_SETTING.get(settings), + getStalenessThreshold(settings) + ); this.cacheEntityLookup = cacheEntityFunction; this.cache = cacheService.createCache( new CacheConfig.Builder().setSettings(settings) @@ -153,18 +184,28 @@ public final class IndicesRequestCache implements RemovalListener notification) { // In case this event happens for an old shard, we can safely ignore this as we don't keep track for old // shards as part of request cache. - cacheEntityLookup.apply(notification.getKey().shardId).ifPresent(entity -> entity.onRemoval(notification)); + Key key = notification.getKey(); + cacheEntityLookup.apply(key.shardId).ifPresent(entity -> entity.onRemoval(notification)); + cacheCleanupManager.updateCleanupKeyToCountMapOnCacheEviction( + new CleanupKey(cacheEntityLookup.apply(key.shardId).orElse(null), key.readerCacheKeyId) + ); } BytesReference getOrCompute( @@ -185,7 +226,7 @@ BytesReference getOrCompute( BytesReference value = cache.computeIfAbsent(key, cacheLoader); if (cacheLoader.isLoaded()) { cacheEntity.onMiss(); - // see if its the first time we see this reader, and make sure to register a cleanup key + // see if it's the first time we see this reader, and make sure to register a cleanup key CleanupKey cleanupKey = new CleanupKey(cacheEntity, readerCacheKeyId); if (!registeredClosedListeners.containsKey(cleanupKey)) { Boolean previous = registeredClosedListeners.putIfAbsent(cleanupKey, Boolean.TRUE); @@ -193,6 +234,7 @@ BytesReference getOrCompute( OpenSearchDirectoryReader.addReaderCloseListener(reader, cleanupKey); } } + cacheCleanupManager.updateCleanupKeyToCountMapOnCacheInsertion(cleanupKey); } else { cacheEntity.onHit(); } @@ -354,9 +396,11 @@ private CleanupKey(CacheEntity entity, String readerCacheKeyId) { @Override public void onClose(IndexReader.CacheKey cacheKey) { - Boolean remove = registeredClosedListeners.remove(this); - if (remove != null) { - keysToClean.add(this); + // Remove the current CleanupKey from the registeredClosedListeners map + // If the key was present, enqueue it for cleanup + Boolean wasRegistered = registeredClosedListeners.remove(this); + if (wasRegistered != null) { + cacheCleanupManager.enqueueCleanupKey(this); } } @@ -380,40 +424,280 @@ public int hashCode() { } } - /** - * Logic to clean up in-memory cache. - */ - synchronized void cleanCache() { - final Set currentKeysToClean = new HashSet<>(); - final Set currentFullClean = new HashSet<>(); - currentKeysToClean.clear(); - currentFullClean.clear(); - for (Iterator iterator = keysToClean.iterator(); iterator.hasNext();) { - CleanupKey cleanupKey = iterator.next(); - iterator.remove(); - if (cleanupKey.readerCacheKeyId == null || !cleanupKey.entity.isOpen()) { - // null indicates full cleanup, as does a closed shard - currentFullClean.add(((IndexShard) cleanupKey.entity.getCacheIdentity()).shardId()); - } else { - currentKeysToClean.add(cleanupKey); + /* + * The IndicesRequestCacheCleanupManager manages the cleanup of stale keys in IndicesRequestCache. + * + * It also keeps track of the number of stale keys in the cache (staleKeysCount) and a staleness threshold, + * which is used to determine when the cache should be cleaned. + * + * If Staleness threshold is 0, we do not keep track of stale keys in the cache + * */ + class IndicesRequestCacheCleanupManager implements Closeable { + private final Set keysToClean; + private final ConcurrentMap> cleanupKeyToCountMap; + private final AtomicInteger staleKeysCount; + private final double stalenessThreshold; + private final IndicesRequestCacheCleaner cacheCleaner; + + IndicesRequestCacheCleanupManager(ThreadPool threadpool, TimeValue cleanInterval, double stalenessThreshold) { + this.stalenessThreshold = stalenessThreshold; + this.keysToClean = ConcurrentCollections.newConcurrentSet(); + this.cleanupKeyToCountMap = ConcurrentCollections.newConcurrentMap(); + this.staleKeysCount = new AtomicInteger(0); + this.cacheCleaner = new IndicesRequestCacheCleaner(this, threadpool, cleanInterval); + threadpool.schedule(cacheCleaner, cleanInterval, ThreadPool.Names.SAME); + } + + /** + * Enqueue cleanup key. + * + * @param cleanupKey the cleanup key + */ + void enqueueCleanupKey(CleanupKey cleanupKey) { + keysToClean.add(cleanupKey); + incrementStaleKeysCount(cleanupKey); + } + + /** + * Updates the cleanupKeyToCountMap with the given CleanupKey. + * If the ShardId associated with the CleanupKey does not exist in the map, a new entry is created. + * The method increments the count of the CleanupKey in the map. + *

+ * Why use ShardID as the key ? + * CacheEntity mainly contains IndexShard, both of these classes do not override equals() and hashCode() methods. + * ShardID class properly overrides equals() and hashCode() methods. + * Therefore, to avoid modifying CacheEntity and IndexShard classes to override these methods, we use ShardID as the key. + * + * @param cleanupKey the CleanupKey to be updated in the map + */ + private void updateCleanupKeyToCountMapOnCacheInsertion(CleanupKey cleanupKey) { + if (stalenessThreshold == 0.0 || cleanupKey.entity == null) { + return; + } + IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity(); + if (indexShard == null) { + logger.warn("IndexShard is null for CleanupKey: {} while cleaning Indices Request Cache", cleanupKey.readerCacheKeyId); + return; } + ShardId shardId = indexShard.shardId(); + + // If the key doesn't exist, it's added with a value of 1. + // If the key exists, its value is incremented by 1. + cleanupKeyToCountMap.computeIfAbsent(shardId, k -> new HashMap<>()).merge(cleanupKey.readerCacheKeyId, 1, Integer::sum); + } + + private void updateCleanupKeyToCountMapOnCacheEviction(CleanupKey cleanupKey) { + if (stalenessThreshold == 0.0 || cleanupKey.entity == null) { + return; + } + IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity(); + if (indexShard == null) { + logger.warn("IndexShard is null for CleanupKey: {} while cleaning Indices Request Cache", cleanupKey.readerCacheKeyId); + return; + } + ShardId shardId = indexShard.shardId(); + + cleanupKeyToCountMap.computeIfPresent(shardId, (shard, keyCountMap) -> { + keyCountMap.computeIfPresent(cleanupKey.readerCacheKeyId, (key, currentValue) -> { + // decrement the stale key count + staleKeysCount.decrementAndGet(); + int newValue = currentValue - 1; + // Remove the key if the new value is zero by returning null; otherwise, update with the new value. + return newValue == 0 ? null : newValue; + }); + return keyCountMap; + }); + } + + /** + * Updates the count of stale keys in the cache. + * This method is called when a CleanupKey is added to the keysToClean set. + * + * It increments the staleKeysCount by the count of the CleanupKey in the cleanupKeyToCountMap. + * If the CleanupKey's readerCacheKeyId is null or the CleanupKey's entity is not open, it increments the staleKeysCount + * by the total count of keys associated with the CleanupKey's ShardId in the cleanupKeyToCountMap and removes the ShardId from the map. + * + * @param cleanupKey the CleanupKey that has been marked for cleanup + */ + private void incrementStaleKeysCount(CleanupKey cleanupKey) { + if (stalenessThreshold == 0.0 || cleanupKey.entity == null) { + return; + } + IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity(); + if (indexShard == null) { + logger.warn("IndexShard is null for CleanupKey: {}", cleanupKey.readerCacheKeyId); + return; + } + ShardId shardId = indexShard.shardId(); + + // Using computeIfPresent to atomically operate on the countMap for a given shardId + cleanupKeyToCountMap.computeIfPresent(shardId, (key, countMap) -> { + if (cleanupKey.readerCacheKeyId == null) { + // Aggregate and add to staleKeysCount atomically if readerCacheKeyId is null + int totalSum = countMap.values().stream().mapToInt(Integer::intValue).sum(); + staleKeysCount.addAndGet(totalSum); + // Return null to automatically remove the mapping for shardId + return null; + } else { + // Update staleKeysCount based on specific readerCacheKeyId, then remove it from the countMap + countMap.computeIfPresent(cleanupKey.readerCacheKeyId, (k, v) -> { + staleKeysCount.addAndGet(v); + // Return null to remove the key after updating staleKeysCount + return null; + }); + + // Check if countMap is empty after removal to decide if we need to remove the shardId entry + if (countMap.isEmpty()) { + return null; // Returning null removes the entry for shardId + } + } + return countMap; // Return the modified countMap to keep the mapping + }); + } + + // package private for testing + AtomicInteger getStaleKeysCount() { + return staleKeysCount; + } + + /** + * Clean cache based on stalenessThreshold + */ + void cleanCache() { + cleanCache(stalenessThreshold); + } + + /** + * Force Clean cache without checking stalenessThreshold + */ + private void forceCleanCache() { + cleanCache(0); } - if (!currentKeysToClean.isEmpty() || !currentFullClean.isEmpty()) { + + /** + * Cleans the cache based on the provided staleness threshold. + *

If the percentage of stale keys in the cache is less than this threshold,the cache cleanup process is skipped. + * @param stalenessThreshold The staleness threshold as a double. + */ + private synchronized void cleanCache(double stalenessThreshold) { + if (logger.isDebugEnabled()) { + logger.debug("Cleaning Indices Request Cache with threshold : " + stalenessThreshold); + } + if (canSkipCacheCleanup(stalenessThreshold)) { + return; + } + // Contains CleanupKey objects with open shard but invalidated readerCacheKeyId. + final Set cleanupKeysFromOutdatedReaders = new HashSet<>(); + // Contains CleanupKey objects of a closed shard. + final Set cleanupKeysFromClosedShards = new HashSet<>(); + + for (Iterator iterator = keysToClean.iterator(); iterator.hasNext();) { + CleanupKey cleanupKey = iterator.next(); + iterator.remove(); + if (cleanupKey.readerCacheKeyId == null || !cleanupKey.entity.isOpen()) { + // null indicates full cleanup, as does a closed shard + cleanupKeysFromClosedShards.add(((IndexShard) cleanupKey.entity.getCacheIdentity()).shardId()); + } else { + cleanupKeysFromOutdatedReaders.add(cleanupKey); + } + } + + if (cleanupKeysFromOutdatedReaders.isEmpty() && cleanupKeysFromClosedShards.isEmpty()) { + return; + } + for (Iterator iterator = cache.keys().iterator(); iterator.hasNext();) { Key key = iterator.next(); - if (currentFullClean.contains(key.shardId)) { + if (cleanupKeysFromClosedShards.contains(key.shardId)) { iterator.remove(); } else { - // If the flow comes here, then we should have a open shard available on node. - if (currentKeysToClean.contains( - new CleanupKey(cacheEntityLookup.apply(key.shardId).orElse(null), key.readerCacheKeyId) - )) { + CleanupKey cleanupKey = new CleanupKey(cacheEntityLookup.apply(key.shardId).orElse(null), key.readerCacheKeyId); + if (cleanupKeysFromOutdatedReaders.contains(cleanupKey)) { iterator.remove(); } } } + cache.refresh(); + } + + /** + * Determines whether the cache cleanup process can be skipped based on the staleness threshold. + * + *

If the percentage of stale keys is less than the provided staleness threshold returns true, + * indicating that the cache cleanup process can be skipped. + * + * @param cleanThresholdPercent The staleness threshold as a percentage. + * @return true if the cache cleanup process can be skipped, false otherwise. + */ + private synchronized boolean canSkipCacheCleanup(double cleanThresholdPercent) { + if (cleanThresholdPercent == 0.0) { + return false; + } + double staleKeysInCachePercentage = staleKeysInCachePercentage(); + if (staleKeysInCachePercentage < cleanThresholdPercent) { + if (logger.isDebugEnabled()) { + logger.debug( + "Skipping Indices Request cache cleanup since the percentage of stale keys : " + + staleKeysInCachePercentage + + " is less than the threshold : " + + stalenessThreshold + ); + } + return true; + } + return false; + } + + /** + * Calculates and returns the percentage of stale keys in the cache. + * + * @return The percentage of stale keys in the cache as a double. Returns 0 if there are no keys in the cache or no stale keys. + */ + private synchronized double staleKeysInCachePercentage() { + long totalKeysInCache = count(); + if (totalKeysInCache == 0 || staleKeysCount.get() == 0) { + return 0; + } + return ((double) staleKeysCount.get() / totalKeysInCache); + } + + @Override + public void close() { + this.cacheCleaner.close(); + } + + private final class IndicesRequestCacheCleaner implements Runnable, Releasable { + + private final IndicesRequestCacheCleanupManager cacheCleanupManager; + private final ThreadPool threadPool; + private final TimeValue interval; + + IndicesRequestCacheCleaner(IndicesRequestCacheCleanupManager cacheCleanupManager, ThreadPool threadPool, TimeValue interval) { + this.cacheCleanupManager = cacheCleanupManager; + this.threadPool = threadPool; + this.interval = interval; + } + + private final AtomicBoolean closed = new AtomicBoolean(false); + + @Override + public void run() { + try { + this.cacheCleanupManager.cleanCache(); + } catch (Exception e) { + logger.warn("Exception during periodic indices request cache cleanup:", e); + } + // Reschedule itself to run again if not closed + if (closed.get() == false) { + threadPool.scheduleUnlessShuttingDown(interval, ThreadPool.Names.SAME, this); + } + } + + @Override + public void close() { + closed.compareAndSet(false, true); + } } - cache.refresh(); } /** @@ -426,4 +710,26 @@ long count() { int numRegisteredCloseListeners() { // for testing return registeredClosedListeners.size(); } + + /** + * Validates the staleness setting for the cache cleanup threshold. + * + *

This method checks if the provided staleness threshold is a valid percentage or a valid double value. + * If the staleness threshold is not valid, it throws an OpenSearchParseException. + * + * @param staleThreshold The staleness threshold to validate. + * @return The validated staleness threshold. + * @throws OpenSearchParseException If the staleness threshold is not a valid percentage or double value. + * + *

package private for testing + */ + static String validateStalenessSetting(String staleThreshold) { + try { + RatioValue.parseRatioValue(staleThreshold); + } catch (OpenSearchParseException e) { + e.addSuppressed(e); + throw e; + } + return staleThreshold; + } } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 4372f21f8de0d..ce1e3193c7ee1 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -403,7 +403,7 @@ public IndicesService( return Optional.empty(); } return Optional.of(new IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), cacheService); + }), cacheService, threadPool); this.indicesQueryCache = new IndicesQueryCache(settings); this.mapperRegistry = mapperRegistry; this.namedWriteableRegistry = namedWriteableRegistry; @@ -432,7 +432,7 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon } }); this.cleanInterval = INDICES_CACHE_CLEAN_INTERVAL_SETTING.get(settings); - this.cacheCleaner = new CacheCleaner(indicesFieldDataCache, indicesRequestCache, logger, threadPool, this.cleanInterval); + this.cacheCleaner = new CacheCleaner(indicesFieldDataCache, logger, threadPool, this.cleanInterval); this.metaStateService = metaStateService; this.engineFactoryProviders = engineFactoryProviders; @@ -1576,17 +1576,9 @@ private static final class CacheCleaner implements Runnable, Releasable { private final ThreadPool threadPool; private final TimeValue interval; private final AtomicBoolean closed = new AtomicBoolean(false); - private final IndicesRequestCache requestCache; - - CacheCleaner( - IndicesFieldDataCache cache, - IndicesRequestCache requestCache, - Logger logger, - ThreadPool threadPool, - TimeValue interval - ) { + + CacheCleaner(IndicesFieldDataCache cache, Logger logger, ThreadPool threadPool, TimeValue interval) { this.cache = cache; - this.requestCache = requestCache; this.logger = logger; this.threadPool = threadPool; this.interval = interval; @@ -1609,12 +1601,6 @@ public void run() { TimeValue.nsecToMSec(System.nanoTime() - startTimeNS) ); } - - try { - this.requestCache.cleanCache(); - } catch (Exception e) { - logger.warn("Exception during periodic request cache cleanup:", e); - } // Reschedule itself to run again if not closed if (closed.get() == false) { threadPool.scheduleUnlessShuttingDown(interval, ThreadPool.Names.SAME, this); diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index b9cbbb2c65162..594b9aac971b7 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -46,6 +46,8 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.util.BytesRef; import org.opensearch.common.CheckedSupplier; +import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.RemovalReason; import org.opensearch.common.cache.module.CacheModule; import org.opensearch.common.cache.service.CacheService; import org.opensearch.common.io.stream.BytesStreamOutput; @@ -67,25 +69,34 @@ import org.opensearch.index.query.TermQueryBuilder; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardState; +import org.opensearch.node.Node; import org.opensearch.test.OpenSearchSingleNodeTestCase; +import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import static org.opensearch.indices.IndicesRequestCache.INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class IndicesRequestCacheTests extends OpenSearchSingleNodeTestCase { + private ThreadPool getThreadPool() { + return new ThreadPool(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "default tracer tests").build()); + } public void testBasicOperationsCache() throws Exception { IndexShard indexShard = createIndex("test").getShard(0); + ThreadPool threadPool = getThreadPool(); IndicesRequestCache cache = new IndicesRequestCache( Settings.EMPTY, (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))), - new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService() + new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), + threadPool ); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -128,7 +139,7 @@ public void testBasicOperationsCache() throws Exception { indexShard.close("test", true, true); // closed shard but reader is still open cache.clear(entity); } - cache.cleanCache(); + cache.cacheCleanupManager.cleanCache(); assertEquals(1, requestCacheStats.stats().getHitCount()); assertEquals(1, requestCacheStats.stats().getMissCount()); assertEquals(0, requestCacheStats.stats().getEvictions()); @@ -137,16 +148,19 @@ public void testBasicOperationsCache() throws Exception { assertEquals(0, requestCacheStats.stats().getMemorySize().bytesAsInt()); IOUtils.close(reader, writer, dir, cache); + terminate(threadPool); assertEquals(0, cache.numRegisteredCloseListeners()); } public void testBasicOperationsCacheWithFeatureFlag() throws Exception { IndexShard indexShard = createIndex("test").getShard(0); CacheService cacheService = new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(); + ThreadPool threadPool = getThreadPool(); IndicesRequestCache cache = new IndicesRequestCache( Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.PLUGGABLE_CACHE, "true").build(), (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))), - cacheService + cacheService, + threadPool ); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -189,7 +203,7 @@ public void testBasicOperationsCacheWithFeatureFlag() throws Exception { indexShard.close("test", true, true); // closed shard but reader is still open cache.clear(entity); } - cache.cleanCache(); + cache.cacheCleanupManager.cleanCache(); assertEquals(1, requestCacheStats.stats().getHitCount()); assertEquals(1, requestCacheStats.stats().getMissCount()); assertEquals(0, requestCacheStats.stats().getEvictions()); @@ -198,12 +212,14 @@ public void testBasicOperationsCacheWithFeatureFlag() throws Exception { assertEquals(0, requestCacheStats.stats().getMemorySize().bytesAsInt()); IOUtils.close(reader, writer, dir, cache); + terminate(threadPool); assertEquals(0, cache.numRegisteredCloseListeners()); } public void testCacheDifferentReaders() throws Exception { IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndexShard indexShard = createIndex("test").getShard(0); + ThreadPool threadPool = getThreadPool(); IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, (shardId -> { IndexService indexService = null; try { @@ -212,7 +228,7 @@ public void testCacheDifferentReaders() throws Exception { return Optional.empty(); } return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService()); + }), new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), threadPool); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -281,7 +297,7 @@ public void testCacheDifferentReaders() throws Exception { // Closing the cache doesn't change returned entities reader.close(); - cache.cleanCache(); + cache.cacheCleanupManager.cleanCache(); assertEquals(2, requestCacheStats.stats().getMissCount()); assertEquals(0, requestCacheStats.stats().getEvictions()); assertTrue(loader.loadedFromCache); @@ -296,7 +312,7 @@ public void testCacheDifferentReaders() throws Exception { indexShard.close("test", true, true); // closed shard but reader is still open cache.clear(secondEntity); } - cache.cleanCache(); + cache.cacheCleanupManager.cleanCache(); assertEquals(2, requestCacheStats.stats().getMissCount()); assertEquals(0, requestCacheStats.stats().getEvictions()); assertTrue(loader.loadedFromCache); @@ -304,17 +320,436 @@ public void testCacheDifferentReaders() throws Exception { assertEquals(0, requestCacheStats.stats().getMemorySize().bytesAsInt()); IOUtils.close(secondReader, writer, dir, cache); + terminate(threadPool); assertEquals(0, cache.numRegisteredCloseListeners()); } + public void testCacheCleanupThresholdSettingValidator_Valid_Percentage() { + String s = IndicesRequestCache.validateStalenessSetting("50%"); + assertEquals("50%", s); + } + + public void testCacheCleanupThresholdSettingValidator_Valid_Double() { + String s = IndicesRequestCache.validateStalenessSetting("0.5"); + assertEquals("0.5", s); + } + + public void testCacheCleanupThresholdSettingValidator_Valid_DecimalPercentage() { + String s = IndicesRequestCache.validateStalenessSetting("0.5%"); + assertEquals("0.5%", s); + } + + public void testCacheCleanupThresholdSettingValidator_InValid_MB() { + assertThrows(IllegalArgumentException.class, () -> { IndicesRequestCache.validateStalenessSetting("50mb"); }); + } + + public void testCacheCleanupThresholdSettingValidator_Invalid_Percentage() { + assertThrows(IllegalArgumentException.class, () -> { IndicesRequestCache.validateStalenessSetting("500%"); }); + } + + public void testCacheCleanupBasedOnZeroThreshold() throws Exception { + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexShard indexShard = createIndex("test").getShard(0); + ThreadPool threadPool = getThreadPool(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0%").build(); + IndicesRequestCache cache = new IndicesRequestCache(settings, (shardId -> { + IndexService indexService = null; + try { + indexService = indicesService.indexServiceSafe(shardId.getIndex()); + } catch (IndexNotFoundException ex) { + return Optional.empty(); + } + return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); + }), new CacheModule(new ArrayList<>(), settings).getCacheService(), threadPool); + Directory dir = newDirectory(); + IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); + + writer.addDocument(newDoc(0, "foo")); + DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); + BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + if (randomBoolean()) { + writer.flush(); + IOUtils.close(writer); + writer = new IndexWriter(dir, newIndexWriterConfig()); + } + writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); + DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + + // Get 2 entries into the cache + IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); + Loader loader = new Loader(reader, 0); + cache.getOrCompute(entity, loader, reader, termBytes); + + entity = new IndicesService.IndexShardCacheEntity(indexShard); + loader = new Loader(reader, 0); + cache.getOrCompute(entity, loader, reader, termBytes); + + IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); + loader = new Loader(secondReader, 0); + cache.getOrCompute(entity, loader, secondReader, termBytes); + + secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); + loader = new Loader(secondReader, 0); + cache.getOrCompute(secondEntity, loader, secondReader, termBytes); + assertEquals(2, cache.count()); + + // Close the reader, to be enqueued for cleanup + // 1 out of 2 keys ie 50% are now stale. + reader.close(); + // cache count should not be affected + assertEquals(2, cache.count()); + // clean cache with 0% staleness threshold + cache.cacheCleanupManager.cleanCache(); + // cleanup should remove the stale-key + assertEquals(1, cache.count()); + + IOUtils.close(secondReader, writer, dir, cache); + terminate(threadPool); + } + + public void testCacheCleanupBasedOnStaleThreshold_StalenessEqualToThreshold() throws Exception { + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexShard indexShard = createIndex("test").getShard(0); + ThreadPool threadPool = getThreadPool(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.5").build(); + IndicesRequestCache cache = new IndicesRequestCache(settings, (shardId -> { + IndexService indexService = null; + try { + indexService = indicesService.indexServiceSafe(shardId.getIndex()); + } catch (IndexNotFoundException ex) { + return Optional.empty(); + } + return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); + }), new CacheModule(new ArrayList<>(), settings).getCacheService(), threadPool); + Directory dir = newDirectory(); + IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); + + writer.addDocument(newDoc(0, "foo")); + DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); + BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + if (randomBoolean()) { + writer.flush(); + IOUtils.close(writer); + writer = new IndexWriter(dir, newIndexWriterConfig()); + } + writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); + DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + + // Get 2 entries into the cache + IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); + Loader loader = new Loader(reader, 0); + cache.getOrCompute(entity, loader, reader, termBytes); + + entity = new IndicesService.IndexShardCacheEntity(indexShard); + loader = new Loader(reader, 0); + cache.getOrCompute(entity, loader, reader, termBytes); + + IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); + loader = new Loader(secondReader, 0); + cache.getOrCompute(entity, loader, secondReader, termBytes); + + secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); + loader = new Loader(secondReader, 0); + cache.getOrCompute(secondEntity, loader, secondReader, termBytes); + assertEquals(2, cache.count()); + + // Close the reader, to be enqueued for cleanup + // 1 out of 2 keys ie 50% are now stale. + reader.close(); + // cache count should not be affected + assertEquals(2, cache.count()); + + // clean cache with 50% staleness threshold + cache.cacheCleanupManager.cleanCache(); + // cleanup should have taken effect + assertEquals(1, cache.count()); + + IOUtils.close(secondReader, writer, dir, cache); + terminate(threadPool); + } + + public void testStaleCount_OnRemovalNotificationOfStaleKey_DecrementsStaleCount() throws Exception { + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexShard indexShard = createIndex("test").getShard(0); + ThreadPool threadPool = getThreadPool(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51").build(); + IndicesRequestCache cache = new IndicesRequestCache(settings, (shardId -> { + IndexService indexService = null; + try { + indexService = indicesService.indexServiceSafe(shardId.getIndex()); + } catch (IndexNotFoundException ex) { + return Optional.empty(); + } + return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); + }), new CacheModule(new ArrayList<>(), settings).getCacheService(), threadPool); + Directory dir = newDirectory(); + IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); + + writer.addDocument(newDoc(0, "foo")); + DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); + BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + if (randomBoolean()) { + writer.flush(); + IOUtils.close(writer); + writer = new IndexWriter(dir, newIndexWriterConfig()); + } + writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); + DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + + // Get 2 entries into the cache + IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); + Loader loader = new Loader(reader, 0); + cache.getOrCompute(entity, loader, reader, termBytes); + + entity = new IndicesService.IndexShardCacheEntity(indexShard); + loader = new Loader(reader, 0); + cache.getOrCompute(entity, loader, reader, termBytes); + + IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); + loader = new Loader(secondReader, 0); + cache.getOrCompute(entity, loader, secondReader, termBytes); + + secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); + loader = new Loader(secondReader, 0); + cache.getOrCompute(secondEntity, loader, secondReader, termBytes); + assertEquals(2, cache.count()); + + // Close the reader, to be enqueued for cleanup + reader.close(); + AtomicInteger staleKeysCount = cache.cacheCleanupManager.getStaleKeysCount(); + // 1 out of 2 keys ie 50% are now stale. + assertEquals(1, staleKeysCount.get()); + // cache count should not be affected + assertEquals(2, cache.count()); + + OpenSearchDirectoryReader.DelegatingCacheHelper delegatingCacheHelper = + (OpenSearchDirectoryReader.DelegatingCacheHelper) secondReader.getReaderCacheHelper(); + String readerCacheKeyId = delegatingCacheHelper.getDelegatingCacheKey().getId(); + IndicesRequestCache.Key key = new IndicesRequestCache.Key( + ((IndexShard) secondEntity.getCacheIdentity()).shardId(), + termBytes, + readerCacheKeyId + ); + + cache.onRemoval(new RemovalNotification(key, termBytes, RemovalReason.EVICTED)); + staleKeysCount = cache.cacheCleanupManager.getStaleKeysCount(); + // eviction of previous stale key from the cache should decrement staleKeysCount in iRC + assertEquals(0, staleKeysCount.get()); + + IOUtils.close(secondReader, writer, dir, cache); + terminate(threadPool); + } + + public void testStaleCount_OnRemovalNotificationOfStaleKey_DoesNotDecrementsStaleCount() throws Exception { + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexShard indexShard = createIndex("test").getShard(0); + ThreadPool threadPool = getThreadPool(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51").build(); + IndicesRequestCache cache = new IndicesRequestCache(settings, (shardId -> { + IndexService indexService = null; + try { + indexService = indicesService.indexServiceSafe(shardId.getIndex()); + } catch (IndexNotFoundException ex) { + return Optional.empty(); + } + return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); + }), new CacheModule(new ArrayList<>(), settings).getCacheService(), threadPool); + Directory dir = newDirectory(); + IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); + + writer.addDocument(newDoc(0, "foo")); + DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); + BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + if (randomBoolean()) { + writer.flush(); + IOUtils.close(writer); + writer = new IndexWriter(dir, newIndexWriterConfig()); + } + writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); + DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + + // Get 2 entries into the cache + IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); + Loader loader = new Loader(reader, 0); + cache.getOrCompute(entity, loader, reader, termBytes); + + entity = new IndicesService.IndexShardCacheEntity(indexShard); + loader = new Loader(reader, 0); + cache.getOrCompute(entity, loader, reader, termBytes); + + IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); + loader = new Loader(secondReader, 0); + cache.getOrCompute(entity, loader, secondReader, termBytes); + + secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); + loader = new Loader(secondReader, 0); + cache.getOrCompute(secondEntity, loader, secondReader, termBytes); + assertEquals(2, cache.count()); + + // Close the reader, to be enqueued for cleanup + reader.close(); + AtomicInteger staleKeysCount = cache.cacheCleanupManager.getStaleKeysCount(); + // 1 out of 2 keys ie 50% are now stale. + assertEquals(1, staleKeysCount.get()); + // cache count should not be affected + assertEquals(2, cache.count()); + + OpenSearchDirectoryReader.DelegatingCacheHelper delegatingCacheHelper = (OpenSearchDirectoryReader.DelegatingCacheHelper) reader + .getReaderCacheHelper(); + String readerCacheKeyId = delegatingCacheHelper.getDelegatingCacheKey().getId(); + IndicesRequestCache.Key key = new IndicesRequestCache.Key( + ((IndexShard) secondEntity.getCacheIdentity()).shardId(), + termBytes, + readerCacheKeyId + ); + + cache.onRemoval(new RemovalNotification(key, termBytes, RemovalReason.EVICTED)); + staleKeysCount = cache.cacheCleanupManager.getStaleKeysCount(); + // eviction of NON-stale key from the cache should NOT decrement staleKeysCount in iRC + assertEquals(1, staleKeysCount.get()); + + IOUtils.close(secondReader, writer, dir, cache); + terminate(threadPool); + } + + public void testCacheCleanupBasedOnStaleThreshold_StalenessGreaterThanThreshold() throws Exception { + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexShard indexShard = createIndex("test").getShard(0); + ThreadPool threadPool = getThreadPool(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.49").build(); + IndicesRequestCache cache = new IndicesRequestCache(settings, (shardId -> { + IndexService indexService = null; + try { + indexService = indicesService.indexServiceSafe(shardId.getIndex()); + } catch (IndexNotFoundException ex) { + return Optional.empty(); + } + return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); + }), new CacheModule(new ArrayList<>(), settings).getCacheService(), threadPool); + Directory dir = newDirectory(); + IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); + + writer.addDocument(newDoc(0, "foo")); + DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); + BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + if (randomBoolean()) { + writer.flush(); + IOUtils.close(writer); + writer = new IndexWriter(dir, newIndexWriterConfig()); + } + writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); + DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + + // Get 2 entries into the cache + IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); + Loader loader = new Loader(reader, 0); + cache.getOrCompute(entity, loader, reader, termBytes); + + entity = new IndicesService.IndexShardCacheEntity(indexShard); + loader = new Loader(reader, 0); + cache.getOrCompute(entity, loader, reader, termBytes); + + IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); + loader = new Loader(secondReader, 0); + cache.getOrCompute(entity, loader, secondReader, termBytes); + + secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); + loader = new Loader(secondReader, 0); + cache.getOrCompute(secondEntity, loader, secondReader, termBytes); + assertEquals(2, cache.count()); + + // Close the reader, to be enqueued for cleanup + // 1 out of 2 keys ie 50% are now stale. + reader.close(); + // cache count should not be affected + assertEquals(2, cache.count()); + + // clean cache with 49% staleness threshold + cache.cacheCleanupManager.cleanCache(); + // cleanup should have taken effect with 49% threshold + assertEquals(1, cache.count()); + + IOUtils.close(secondReader, writer, dir, cache); + terminate(threadPool); + } + + public void testCacheCleanupBasedOnStaleThreshold_StalenessLesserThanThreshold() throws Exception { + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexShard indexShard = createIndex("test").getShard(0); + ThreadPool threadPool = getThreadPool(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%").build(); + IndicesRequestCache cache = new IndicesRequestCache(settings, (shardId -> { + IndexService indexService = null; + try { + indexService = indicesService.indexServiceSafe(shardId.getIndex()); + } catch (IndexNotFoundException ex) { + return Optional.empty(); + } + return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); + }), new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), threadPool); + Directory dir = newDirectory(); + IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); + + writer.addDocument(newDoc(0, "foo")); + DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); + BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + if (randomBoolean()) { + writer.flush(); + IOUtils.close(writer); + writer = new IndexWriter(dir, newIndexWriterConfig()); + } + writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); + DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + + // Get 2 entries into the cache + IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); + Loader loader = new Loader(reader, 0); + cache.getOrCompute(entity, loader, reader, termBytes); + + entity = new IndicesService.IndexShardCacheEntity(indexShard); + loader = new Loader(reader, 0); + cache.getOrCompute(entity, loader, reader, termBytes); + + IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); + loader = new Loader(secondReader, 0); + cache.getOrCompute(entity, loader, secondReader, termBytes); + + secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); + loader = new Loader(secondReader, 0); + cache.getOrCompute(secondEntity, loader, secondReader, termBytes); + assertEquals(2, cache.count()); + + // Close the reader, to be enqueued for cleanup + // 1 out of 2 keys ie 50% are now stale. + reader.close(); + // cache count should not be affected + assertEquals(2, cache.count()); + + // clean cache with 51% staleness threshold + cache.cacheCleanupManager.cleanCache(); + // cleanup should have been ignored + assertEquals(2, cache.count()); + + IOUtils.close(secondReader, writer, dir, cache); + terminate(threadPool); + } + public void testEviction() throws Exception { final ByteSizeValue size; { IndexShard indexShard = createIndex("test").getShard(0); + ThreadPool threadPool = getThreadPool(); IndicesRequestCache cache = new IndicesRequestCache( Settings.EMPTY, (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))), - new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService() + new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), + threadPool ); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -337,12 +772,15 @@ public void testEviction() throws Exception { assertEquals("bar", value2.streamInput().readString()); size = indexShard.requestCache().stats().getMemorySize(); IOUtils.close(reader, secondReader, writer, dir, cache); + terminate(threadPool); } IndexShard indexShard = createIndex("test1").getShard(0); + ThreadPool threadPool = getThreadPool(); IndicesRequestCache cache = new IndicesRequestCache( Settings.builder().put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), size.getBytes() + 1 + "b").build(), (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))), - new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService() + new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), + threadPool ); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -374,11 +812,13 @@ public void testEviction() throws Exception { assertEquals(2, cache.count()); assertEquals(1, indexShard.requestCache().stats().getEvictions()); IOUtils.close(reader, secondReader, thirdReader, writer, dir, cache); + terminate(threadPool); } public void testClearAllEntityIdentity() throws Exception { IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndexShard indexShard = createIndex("test").getShard(0); + ThreadPool threadPool = getThreadPool(); IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, (shardId -> { IndexService indexService = null; try { @@ -387,7 +827,7 @@ public void testClearAllEntityIdentity() throws Exception { return Optional.empty(); } return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService()); + }), new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), threadPool); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -422,7 +862,7 @@ public void testClearAllEntityIdentity() throws Exception { final long hitCount = requestCacheStats.getHitCount(); // clear all for the indexShard Idendity even though is't still open cache.clear(randomFrom(entity, secondEntity)); - cache.cleanCache(); + cache.cacheCleanupManager.cleanCache(); assertEquals(1, cache.count()); // third has not been validated since it's a different identity value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes); @@ -432,7 +872,7 @@ public void testClearAllEntityIdentity() throws Exception { assertEquals("baz", value3.streamInput().readString()); IOUtils.close(reader, secondReader, thirdReader, writer, dir, cache); - + terminate(threadPool); } public Iterable newDoc(int id, String value) { @@ -474,6 +914,7 @@ public BytesReference get() { public void testInvalidate() throws Exception { IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndexShard indexShard = createIndex("test").getShard(0); + ThreadPool threadPool = getThreadPool(); IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, (shardId -> { IndexService indexService = null; try { @@ -482,7 +923,7 @@ public void testInvalidate() throws Exception { return Optional.empty(); } return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService()); + }), new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), threadPool); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -539,7 +980,7 @@ public void testInvalidate() throws Exception { indexShard.close("test", true, true); // closed shard but reader is still open cache.clear(entity); } - cache.cleanCache(); + cache.cacheCleanupManager.cleanCache(); assertEquals(1, requestCacheStats.stats().getHitCount()); assertEquals(2, requestCacheStats.stats().getMissCount()); assertEquals(0, requestCacheStats.stats().getEvictions()); @@ -547,6 +988,7 @@ public void testInvalidate() throws Exception { assertEquals(0, requestCacheStats.stats().getMemorySize().bytesAsInt()); IOUtils.close(reader, writer, dir, cache); + terminate(threadPool); assertEquals(0, cache.numRegisteredCloseListeners()); }