From 94e12c52ecb336fd55274336c060bf9cb5619661 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Tue, 12 Mar 2024 11:51:59 -0700 Subject: [PATCH 01/29] Introduce IndicesRequestCacheCleanupManager Signed-off-by: Kiran Prakash --- .../indices/IndicesRequestCache.java | 291 ++++++++++++++++-- 1 file changed, 265 insertions(+), 26 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index d22f131853a78..71a83c4a61a2f 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -71,6 +71,11 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; import java.util.function.ToLongBiFunction; @@ -380,40 +385,274 @@ public int hashCode() { } } - /** - * Logic to clean up in-memory cache. - */ - synchronized void cleanCache() { - final Set currentKeysToClean = new HashSet<>(); - final Set currentFullClean = new HashSet<>(); - currentKeysToClean.clear(); - currentFullClean.clear(); - for (Iterator iterator = keysToClean.iterator(); iterator.hasNext();) { - CleanupKey cleanupKey = iterator.next(); - iterator.remove(); - if (cleanupKey.readerCacheKeyId == null || !cleanupKey.entity.isOpen()) { - // null indicates full cleanup, as does a closed shard - currentFullClean.add(((IndexShard) cleanupKey.entity.getCacheIdentity()).shardId()); - } else { - currentKeysToClean.add(cleanupKey); + /* + * The IndicesRequestCacheCleanupManager manages the cleanup of stale keys in IndicesRequestCache. + * + * It also keeps track of the number of stale keys in the cache (staleKeysCount) and a staleness threshold, + * which is used to determine when the cache should be cleaned. + * + * If Staleness threshold is 0, we do not keep track of stale keys in the cache + * */ + class IndicesRequestCacheCleanupManager { + private final Set keysToClean; + private final ConcurrentMap> cleanupKeyToCountMap; + private final AtomicInteger staleKeysCount; + private final double stalenessThreshold; + private final Lock readLock; + private final Lock writeLock; + + IndicesRequestCacheCleanupManager(double stalenessThreshold) { + this.stalenessThreshold = stalenessThreshold; + this.keysToClean = ConcurrentCollections.newConcurrentSet(); + this.cleanupKeyToCountMap = ConcurrentCollections.newConcurrentMap(); + this.staleKeysCount = new AtomicInteger(0); + ReadWriteLock rwLock = new ReentrantReadWriteLock(); + this.readLock = rwLock.readLock(); + this.writeLock = rwLock.writeLock(); + } + + /** + * Enqueue cleanup key. + * + * @param cleanupKey the cleanup key + */ + void enqueueCleanupKey(CleanupKey cleanupKey) { + keysToClean.add(cleanupKey); + updateStaleKeysCount(cleanupKey); + } + + /** + * Updates the cleanupKeyToCountMap with the given CleanupKey. + * If the ShardId associated with the CleanupKey does not exist in the map, a new entry is created. + * The method increments the count of the CleanupKey in the map. + *

+ * Why use ShardID as the key ? + * CacheEntity mainly contains IndexShard, both of these classes do not override equals() and hashCode() methods. + * ShardID class properly overrides equals() and hashCode() methods. + * Therefore, to avoid modifying CacheEntity and IndexShard classes to override these methods, we use ShardID as the key. + * + * @param cleanupKey the CleanupKey to be updated in the map + */ + private void updateCleanupKeyToCountMap(CleanupKey cleanupKey) { + if (stalenessThreshold == 0.0) { + return; } + IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity(); + if (indexShard == null) { + logger.warn("IndexShard is null for CleanupKey: {} while cleaning Indices Request Cache", cleanupKey.readerCacheKeyId); + return; + } + ShardId shardId = indexShard.shardId(); + + // If the key doesn't exist, it's added with a value of 1. + // If the key exists, its value is incremented by 1. + cleanupKeyToCountMap.computeIfAbsent(shardId, k -> ConcurrentCollections.newConcurrentMap()) + .merge(cleanupKey.readerCacheKeyId, 1, Integer::sum); } - if (!currentKeysToClean.isEmpty() || !currentFullClean.isEmpty()) { - for (Iterator iterator = cache.keys().iterator(); iterator.hasNext();) { - Key key = iterator.next(); - if (currentFullClean.contains(key.shardId)) { - iterator.remove(); + + /** + * Updates the count of stale keys in the cache. + * This method is called when a CleanupKey is added to the keysToClean set. + * + * It increments the staleKeysCount by the count of the CleanupKey in the cleanupKeyToCountMap. + * If the CleanupKey's readerCacheKeyId is null or the CleanupKey's entity is not open, it increments the staleKeysCount + * by the total count of keys associated with the CleanupKey's ShardId in the cleanupKeyToCountMap and removes the ShardId from the map. + * + * @param cleanupKey the CleanupKey that has been marked for cleanup + */ + private void updateStaleKeysCount(CleanupKey cleanupKey) { + if (stalenessThreshold == 0.0) { + return; + } + IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity(); + if (indexShard == null) { + logger.warn("IndexShard is null for CleanupKey: {}", cleanupKey.readerCacheKeyId); + return; + } + ShardId shardId = indexShard.shardId(); + + // Using computeIfPresent to atomically operate on the countMap for a given shardId + cleanupKeyToCountMap.computeIfPresent(shardId, (key, countMap) -> { + if (cleanupKey.readerCacheKeyId == null) { + // Aggregate and add to staleKeysCount atomically if readerCacheKeyId is null + int totalSum = countMap.values().stream().mapToInt(Integer::intValue).sum(); + staleKeysCount.addAndGet(totalSum); + // Return null to automatically remove the mapping for shardId + return null; } else { - // If the flow comes here, then we should have a open shard available on node. - if (currentKeysToClean.contains( - new CleanupKey(cacheEntityLookup.apply(key.shardId).orElse(null), key.readerCacheKeyId) - )) { + // Update staleKeysCount based on specific readerCacheKeyId, then remove it from the countMap + countMap.computeIfPresent(cleanupKey.readerCacheKeyId, (k, v) -> { + staleKeysCount.addAndGet(v); + // Return null to remove the key after updating staleKeysCount + return null; + }); + + // Check if countMap is empty after removal to decide if we need to remove the shardId entry + if (countMap.isEmpty()) { + return null; // Returning null removes the entry for shardId + } + } + return countMap; // Return the modified countMap to keep the mapping + }); + } + + /** + * Clean cache based on stalenessThreshold + */ + void cleanCache() { + cleanCache(stalenessThreshold); + } + + /** + * Force Clean cache without checking stalenessThreshold + */ + private void forceCleanCache() { + cleanCache(0); + } + + /** + * Cleans the cache based on the provided staleness threshold. + *

If the percentage of stale keys in the cache is less than this threshold,the cache cleanup process is skipped. + * @param threshold The staleness threshold as a double. + */ + private void cleanCache(double threshold) { + if (logger.isDebugEnabled()) { + logger.debug("Cleaning Indices Request Cache with threshold : " + threshold); + } + writeLock.lock(); + try { + if (canSkipCacheCleanup(threshold)) { + return; + } + // Contains CleanupKey objects with open shard but invalidated readerCacheKeyId. + final Set cleanupKeysFromOutdatedReaders = new HashSet<>(); + // Contains CleanupKey objects of a closed shard. + final Set cleanupKeysFromClosedShards = new HashSet<>(); + + processCleanupKeys(cleanupKeysFromOutdatedReaders, cleanupKeysFromClosedShards); + + if (cleanupKeysFromOutdatedReaders.isEmpty() && cleanupKeysFromClosedShards.isEmpty()) { + return; + } + + for (Iterator iterator = cache.keys().iterator(); iterator.hasNext();) { + Key key = iterator.next(); + if (shouldRemoveKey(key, cleanupKeysFromOutdatedReaders, cleanupKeysFromClosedShards)) { iterator.remove(); } } + cache.refresh(); + } finally { + writeLock.unlock(); + } + } + + /** + * Processes the CleanupKeys in the keysToClean set and categorizes them into two sets: + * cleanupKeysFromOutdatedReaders and cleanupKeysFromClosedShards. + * + *

For each CleanupKey in keysToClean, if the readerCacheKeyId is null or the entity is not open, + * the shardId of the entity is added to cleanupKeysFromClosedShards. Otherwise, the CleanupKey is added + * to cleanupKeysFromOutdatedReaders. + * + * @param cleanupKeysFromOutdatedReaders A set to hold CleanupKeys with open shard but invalidated readerCacheKeyId. + * @param cleanupKeysFromClosedShards A set to hold CleanupKeys of a closed shard. + */ + private void processCleanupKeys(Set cleanupKeysFromOutdatedReaders, Set cleanupKeysFromClosedShards) { + writeLock.lock(); + try { + for (Iterator iterator = keysToClean.iterator(); iterator.hasNext();) { + CleanupKey cleanupKey = iterator.next(); + iterator.remove(); + if (cleanupKey.readerCacheKeyId == null || !cleanupKey.entity.isOpen()) { + // null indicates full cleanup, as does a closed shard + cleanupKeysFromClosedShards.add(((IndexShard) cleanupKey.entity.getCacheIdentity()).shardId()); + } else { + cleanupKeysFromOutdatedReaders.add(cleanupKey); + } + } + } finally { + writeLock.unlock(); + } + } + + /** + * Determines whether a key should be removed from the cache. + * + *

This method checks if the key's shardId is present in the cleanupKeysFromClosedShards set, + * indicating that the shard has been closed and the key should be removed. If the shardId is not present, + * it checks if the key's readerCacheKeyId is present in the cleanupKeysFromOutdatedReaders set, + * indicating that the reader has been invalidated and the key should be removed. + * + * @param key The key to check for removal. + * @param cleanupKeysFromOutdatedReaders A set of CleanupKeys with open shard but invalidated readerCacheKeyId. + * @param cleanupKeysFromClosedShards A set of CleanupKeys of a closed shard. + * @return true if the key should be removed, false otherwise. + */ + private boolean shouldRemoveKey(Key key, Set cleanupKeysFromOutdatedReaders, Set cleanupKeysFromClosedShards) { + readLock.lock(); + try { + if (cleanupKeysFromClosedShards.contains(key.shardId)) { + return true; + } else { + Optional cacheEntity = cacheEntityLookup.apply(key.shardId); + if (cacheEntity.isPresent()) { + CleanupKey cleanupKey = new CleanupKey(cacheEntity.get(), key.readerCacheKeyId); + return cleanupKeysFromOutdatedReaders.contains(cleanupKey); + } + } + return false; + } finally { + readLock.unlock(); + } + } + + /** + * Determines whether the cache cleanup process can be skipped based on the staleness threshold. + * + *

If the percentage of stale keys is less than the provided staleness threshold returns true, + * indicating that the cache cleanup process can be skipped. + * + * @param cleanThresholdPercent The staleness threshold as a percentage. + * @return true if the cache cleanup process can be skipped, false otherwise. + */ + private boolean canSkipCacheCleanup(double cleanThresholdPercent) { + if (cleanThresholdPercent == 0.0) { + return false; + } + readLock.lock(); + try { + if (staleKeysInCachePercentage() < cleanThresholdPercent) { + if (logger.isDebugEnabled()) { + logger.debug( + "Skipping disk cache keys cleanup since the percentage of stale keys in disk cache is less than the threshold" + ); + } + return true; + } + } finally { + readLock.unlock(); + } + return false; + } + + /** + * Calculates and returns the percentage of stale keys in the cache. + * + * @return The percentage of stale keys in the cache as a double. Returns 0 if there are no keys in the cache or no stale keys. + */ + private double staleKeysInCachePercentage() { + readLock.lock(); + try { + long totalKeysInCache = count(); + if (totalKeysInCache == 0 || staleKeysCount.get() == 0) { + return 0; + } + return ((double) staleKeysCount.get() / totalKeysInCache); + } finally { + readLock.unlock(); } } - cache.refresh(); } /** From 78190da770e558bc1862ffd57d68b70413ace188 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Tue, 12 Mar 2024 11:56:21 -0700 Subject: [PATCH 02/29] using cleanup mgr to enqueue cleanups Signed-off-by: Kiran Prakash --- .../java/org/opensearch/indices/IndicesRequestCache.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 71a83c4a61a2f..5e4a4f813f134 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -122,7 +122,6 @@ public final class IndicesRequestCache implements RemovalListener registeredClosedListeners = ConcurrentCollections.newConcurrentMap(); - private final Set keysToClean = ConcurrentCollections.newConcurrentSet(); private final ByteSizeValue size; private final TimeValue expire; private final ICache cache; @@ -161,8 +160,8 @@ public void close() { } void clear(CacheEntity entity) { - keysToClean.add(new CleanupKey(entity, null)); - cleanCache(); + cacheCleanupManager.enqueueCleanupKey(new CleanupKey(entity, null)); + cacheCleanupManager.forceCleanCache(); } @Override @@ -361,7 +360,7 @@ private CleanupKey(CacheEntity entity, String readerCacheKeyId) { public void onClose(IndexReader.CacheKey cacheKey) { Boolean remove = registeredClosedListeners.remove(this); if (remove != null) { - keysToClean.add(this); + cacheCleanupManager.enqueueCleanupKey(this); } } From a4ed23ec1c8b8757c7aa02ba4a60b11d3d6aa5f5 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Tue, 12 Mar 2024 11:57:28 -0700 Subject: [PATCH 03/29] readability improvements Signed-off-by: Kiran Prakash --- .../java/org/opensearch/indices/IndicesRequestCache.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 5e4a4f813f134..985ec0500d213 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -358,8 +358,10 @@ private CleanupKey(CacheEntity entity, String readerCacheKeyId) { @Override public void onClose(IndexReader.CacheKey cacheKey) { - Boolean remove = registeredClosedListeners.remove(this); - if (remove != null) { + // Remove the current CleanupKey from the registeredClosedListeners map + // If the key was present, enqueue it for cleanup + Boolean wasRegistered = registeredClosedListeners.remove(this); + if (wasRegistered != null) { cacheCleanupManager.enqueueCleanupKey(this); } } From dcf5dc13a5a17312fdc45b36a9eb56c9532005d9 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Tue, 12 Mar 2024 11:58:24 -0700 Subject: [PATCH 04/29] update updateCleanupKeyToCountMap on new cache entry Signed-off-by: Kiran Prakash --- .../main/java/org/opensearch/indices/IndicesRequestCache.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 985ec0500d213..e55bea784f61f 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -197,6 +197,7 @@ BytesReference getOrCompute( OpenSearchDirectoryReader.addReaderCloseListener(reader, cleanupKey); } } + cacheCleanupManager.updateCleanupKeyToCountMap(cleanupKey); } else { cacheEntity.onHit(); } From 7ecad50214befb0fdb4f29b2fd2ffee492779d07 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Tue, 12 Mar 2024 12:02:46 -0700 Subject: [PATCH 05/29] create IndicesRequestCacheCleanupManager & settings and validators Signed-off-by: Kiran Prakash --- .../indices/IndicesRequestCache.java | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index e55bea784f61f..38ebfa72ef3f2 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -38,6 +38,7 @@ import org.apache.lucene.index.IndexReader; import org.apache.lucene.util.Accountable; import org.apache.lucene.util.RamUsageEstimator; +import org.opensearch.OpenSearchParseException; import org.opensearch.common.CheckedSupplier; import org.opensearch.common.cache.CacheType; import org.opensearch.common.cache.ICache; @@ -51,6 +52,7 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.RatioValue; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.core.common.bytes.BytesReference; @@ -118,6 +120,17 @@ public final class IndicesRequestCache implements RemovalListener INDICES_REQUEST_CACHE_CLEAN_INTERVAL_SETTING = Setting.positiveTimeSetting( + "indices.requests.cache.cleanup.interval", + TimeValue.timeValueMinutes(1), + Property.NodeScope + ); + public static final Setting INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING = new Setting<>( + "indices.requests.cache.cleanup.staleness_threshold", + "0%", + IndicesRequestCache::validateStalenessSetting, + Property.NodeScope + ); private final static long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Key.class); @@ -132,6 +145,7 @@ public final class IndicesRequestCache implements RemovalListener weigher = (k, v) -> k.ramBytesUsed() + v.ramBytesUsed(); + this.cacheCleanupManager = new IndicesRequestCacheCleanupManager(getStalenessThreshold(settings)); this.cacheEntityLookup = cacheEntityFunction; this.cache = cacheService.createCache( new CacheConfig.Builder().setSettings(settings) @@ -159,6 +173,11 @@ public void close() { cache.invalidateAll(); } + private double getStalenessThreshold(Settings settings) { + String threshold = INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.get(settings); + return RatioValue.parseRatioValue(threshold).getAsRatio(); + } + void clear(CacheEntity entity) { cacheCleanupManager.enqueueCleanupKey(new CleanupKey(entity, null)); cacheCleanupManager.forceCleanCache(); @@ -667,4 +686,26 @@ long count() { int numRegisteredCloseListeners() { // for testing return registeredClosedListeners.size(); } + + /** + * Validates the staleness setting for the cache cleanup threshold. + * + *

This method checks if the provided staleness threshold is a valid percentage or a valid double value. + * If the staleness threshold is not valid, it throws an OpenSearchParseException. + * + * @param staleThreshold The staleness threshold to validate. + * @return The validated staleness threshold. + * @throws OpenSearchParseException If the staleness threshold is not a valid percentage or double value. + * + *

package private for testing + */ + static String validateStalenessSetting(String staleThreshold) { + try { + RatioValue.parseRatioValue(staleThreshold); + } catch (OpenSearchParseException e) { + e.addSuppressed(e); + throw e; + } + return staleThreshold; + } } From 5d320257527196d853da532d61dd658b717eff2a Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Tue, 12 Mar 2024 12:03:47 -0700 Subject: [PATCH 06/29] Add IRC specific cache cleaner and remove from IndicesService Signed-off-by: Kiran Prakash --- .../indices/IndicesRequestCache.java | 71 ++++++++++++++++++- .../opensearch/indices/IndicesService.java | 26 ++----- 2 files changed, 72 insertions(+), 25 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 38ebfa72ef3f2..560502a5e826d 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -48,6 +48,8 @@ import org.opensearch.common.cache.policy.CachedQueryResult; import org.opensearch.common.cache.service.CacheService; import org.opensearch.common.cache.store.config.CacheConfig; +import org.opensearch.common.lease.Releasable; +import org.opensearch.common.lifecycle.AbstractLifecycleComponent; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; @@ -62,6 +64,7 @@ import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.shard.IndexShard; +import org.opensearch.threadpool.ThreadPool; import java.io.Closeable; import java.io.IOException; @@ -96,7 +99,10 @@ * * @opensearch.internal */ -public final class IndicesRequestCache implements RemovalListener, Closeable { +public final class IndicesRequestCache extends AbstractLifecycleComponent + implements + RemovalListener, + Closeable { private static final Logger logger = LogManager.getLogger(IndicesRequestCache.class); @@ -139,13 +145,26 @@ public final class IndicesRequestCache implements RemovalListener cache; private final Function> cacheEntityLookup; - - IndicesRequestCache(Settings settings, Function> cacheEntityFunction, CacheService cacheService) { + // pkg-private for testing + final IndicesRequestCacheCleanupManager cacheCleanupManager; + private final IndicesRequestCacheCleaner cacheCleaner; + private final TimeValue cleanInterval; + private final ThreadPool threadpool; + + IndicesRequestCache( + Settings settings, + Function> cacheEntityFunction, + CacheService cacheService, + ThreadPool threadPool + ) { this.size = INDICES_CACHE_QUERY_SIZE.get(settings); this.expire = INDICES_CACHE_QUERY_EXPIRE.exists(settings) ? INDICES_CACHE_QUERY_EXPIRE.get(settings) : null; + this.cleanInterval = INDICES_REQUEST_CACHE_CLEAN_INTERVAL_SETTING.get(settings); long sizeInBytes = size.getBytes(); ToLongBiFunction weigher = (k, v) -> k.ramBytesUsed() + v.ramBytesUsed(); this.cacheCleanupManager = new IndicesRequestCacheCleanupManager(getStalenessThreshold(settings)); + this.threadpool = threadPool; + this.cacheCleaner = new IndicesRequestCacheCleaner(this, this.threadpool, this.cleanInterval); this.cacheEntityLookup = cacheEntityFunction; this.cache = cacheService.createCache( new CacheConfig.Builder().setSettings(settings) @@ -168,6 +187,19 @@ public final class IndicesRequestCache implements RemovalListener Date: Tue, 12 Mar 2024 12:04:04 -0700 Subject: [PATCH 07/29] Unit Tests Signed-off-by: Kiran Prakash --- .../indices/IndicesRequestCacheTests.java | 321 +++++++++++++++++- 1 file changed, 307 insertions(+), 14 deletions(-) diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index b9cbbb2c65162..8b903db485ca7 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -67,7 +67,9 @@ import org.opensearch.index.query.TermQueryBuilder; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardState; +import org.opensearch.node.Node; import org.opensearch.test.OpenSearchSingleNodeTestCase; +import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.util.ArrayList; @@ -75,17 +77,23 @@ import java.util.Optional; import java.util.UUID; +import static org.opensearch.indices.IndicesRequestCache.INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class IndicesRequestCacheTests extends OpenSearchSingleNodeTestCase { + private ThreadPool getThreadPool() { + return new ThreadPool(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "default tracer tests").build()); + } public void testBasicOperationsCache() throws Exception { IndexShard indexShard = createIndex("test").getShard(0); + ThreadPool threadPool = getThreadPool(); IndicesRequestCache cache = new IndicesRequestCache( Settings.EMPTY, (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))), - new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService() + new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), + threadPool ); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -128,7 +136,7 @@ public void testBasicOperationsCache() throws Exception { indexShard.close("test", true, true); // closed shard but reader is still open cache.clear(entity); } - cache.cleanCache(); + cache.cacheCleanupManager.cleanCache(); assertEquals(1, requestCacheStats.stats().getHitCount()); assertEquals(1, requestCacheStats.stats().getMissCount()); assertEquals(0, requestCacheStats.stats().getEvictions()); @@ -137,16 +145,19 @@ public void testBasicOperationsCache() throws Exception { assertEquals(0, requestCacheStats.stats().getMemorySize().bytesAsInt()); IOUtils.close(reader, writer, dir, cache); + terminate(threadPool); assertEquals(0, cache.numRegisteredCloseListeners()); } public void testBasicOperationsCacheWithFeatureFlag() throws Exception { IndexShard indexShard = createIndex("test").getShard(0); CacheService cacheService = new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(); + ThreadPool threadPool = getThreadPool(); IndicesRequestCache cache = new IndicesRequestCache( Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.PLUGGABLE_CACHE, "true").build(), (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))), - cacheService + cacheService, + threadPool ); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -189,7 +200,7 @@ public void testBasicOperationsCacheWithFeatureFlag() throws Exception { indexShard.close("test", true, true); // closed shard but reader is still open cache.clear(entity); } - cache.cleanCache(); + cache.cacheCleanupManager.cleanCache(); assertEquals(1, requestCacheStats.stats().getHitCount()); assertEquals(1, requestCacheStats.stats().getMissCount()); assertEquals(0, requestCacheStats.stats().getEvictions()); @@ -198,12 +209,14 @@ public void testBasicOperationsCacheWithFeatureFlag() throws Exception { assertEquals(0, requestCacheStats.stats().getMemorySize().bytesAsInt()); IOUtils.close(reader, writer, dir, cache); + terminate(threadPool); assertEquals(0, cache.numRegisteredCloseListeners()); } public void testCacheDifferentReaders() throws Exception { IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndexShard indexShard = createIndex("test").getShard(0); + ThreadPool threadPool = getThreadPool(); IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, (shardId -> { IndexService indexService = null; try { @@ -212,7 +225,7 @@ public void testCacheDifferentReaders() throws Exception { return Optional.empty(); } return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService()); + }), new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), threadPool); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -281,7 +294,7 @@ public void testCacheDifferentReaders() throws Exception { // Closing the cache doesn't change returned entities reader.close(); - cache.cleanCache(); + cache.cacheCleanupManager.cleanCache(); assertEquals(2, requestCacheStats.stats().getMissCount()); assertEquals(0, requestCacheStats.stats().getEvictions()); assertTrue(loader.loadedFromCache); @@ -296,7 +309,7 @@ public void testCacheDifferentReaders() throws Exception { indexShard.close("test", true, true); // closed shard but reader is still open cache.clear(secondEntity); } - cache.cleanCache(); + cache.cacheCleanupManager.cleanCache(); assertEquals(2, requestCacheStats.stats().getMissCount()); assertEquals(0, requestCacheStats.stats().getEvictions()); assertTrue(loader.loadedFromCache); @@ -304,17 +317,290 @@ public void testCacheDifferentReaders() throws Exception { assertEquals(0, requestCacheStats.stats().getMemorySize().bytesAsInt()); IOUtils.close(secondReader, writer, dir, cache); + terminate(threadPool); assertEquals(0, cache.numRegisteredCloseListeners()); } + public void testCacheCleanupThresholdSettingValidator_Valid_Percentage() { + String s = IndicesRequestCache.validateStalenessSetting("50%"); + assertEquals("50%", s); + } + + public void testCacheCleanupThresholdSettingValidator_Valid_Double() { + String s = IndicesRequestCache.validateStalenessSetting("0.5"); + assertEquals("0.5", s); + } + + public void testCacheCleanupThresholdSettingValidator_Valid_DecimalPercentage() { + String s = IndicesRequestCache.validateStalenessSetting("0.5%"); + assertEquals("0.5%", s); + } + + public void testCacheCleanupThresholdSettingValidator_InValid_MB() { + assertThrows(IllegalArgumentException.class, () -> { IndicesRequestCache.validateStalenessSetting("50mb"); }); + } + + public void testCacheCleanupThresholdSettingValidator_Invalid_Percentage() { + assertThrows(IllegalArgumentException.class, () -> { IndicesRequestCache.validateStalenessSetting("500%"); }); + } + + public void testCacheCleanupBasedOnZeroThreshold() throws Exception { + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexShard indexShard = createIndex("test").getShard(0); + ThreadPool threadPool = getThreadPool(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0%").build(); + IndicesRequestCache cache = new IndicesRequestCache(settings, (shardId -> { + IndexService indexService = null; + try { + indexService = indicesService.indexServiceSafe(shardId.getIndex()); + } catch (IndexNotFoundException ex) { + return Optional.empty(); + } + return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); + }), new CacheModule(new ArrayList<>(), settings).getCacheService(), threadPool); + Directory dir = newDirectory(); + IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); + + writer.addDocument(newDoc(0, "foo")); + DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); + BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + if (randomBoolean()) { + writer.flush(); + IOUtils.close(writer); + writer = new IndexWriter(dir, newIndexWriterConfig()); + } + writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); + DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + + // Get 2 entries into the cache + IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); + Loader loader = new Loader(reader, 0); + cache.getOrCompute(entity, loader, reader, termBytes); + + entity = new IndicesService.IndexShardCacheEntity(indexShard); + loader = new Loader(reader, 0); + cache.getOrCompute(entity, loader, reader, termBytes); + + IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); + loader = new Loader(secondReader, 0); + cache.getOrCompute(entity, loader, secondReader, termBytes); + + secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); + loader = new Loader(secondReader, 0); + cache.getOrCompute(secondEntity, loader, secondReader, termBytes); + assertEquals(2, cache.count()); + + // Close the reader, to be enqueued for cleanup + // 1 out of 2 keys ie 50% are now stale. + reader.close(); + // cache count should not be affected + assertEquals(2, cache.count()); + // clean cache with 0% staleness threshold + cache.cacheCleanupManager.cleanCache(); + // cleanup should remove the stale-key + assertEquals(1, cache.count()); + + IOUtils.close(secondReader, writer, dir, cache); + terminate(threadPool); + } + + public void testCacheCleanupBasedOnStaleThreshold_StalenessEqualToThreshold() throws Exception { + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexShard indexShard = createIndex("test").getShard(0); + ThreadPool threadPool = getThreadPool(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.5").build(); + IndicesRequestCache cache = new IndicesRequestCache(settings, (shardId -> { + IndexService indexService = null; + try { + indexService = indicesService.indexServiceSafe(shardId.getIndex()); + } catch (IndexNotFoundException ex) { + return Optional.empty(); + } + return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); + }), new CacheModule(new ArrayList<>(), settings).getCacheService(), threadPool); + Directory dir = newDirectory(); + IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); + + writer.addDocument(newDoc(0, "foo")); + DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); + BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + if (randomBoolean()) { + writer.flush(); + IOUtils.close(writer); + writer = new IndexWriter(dir, newIndexWriterConfig()); + } + writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); + DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + + // Get 2 entries into the cache + IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); + Loader loader = new Loader(reader, 0); + cache.getOrCompute(entity, loader, reader, termBytes); + + entity = new IndicesService.IndexShardCacheEntity(indexShard); + loader = new Loader(reader, 0); + cache.getOrCompute(entity, loader, reader, termBytes); + + IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); + loader = new Loader(secondReader, 0); + cache.getOrCompute(entity, loader, secondReader, termBytes); + + secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); + loader = new Loader(secondReader, 0); + cache.getOrCompute(secondEntity, loader, secondReader, termBytes); + assertEquals(2, cache.count()); + + // Close the reader, to be enqueued for cleanup + // 1 out of 2 keys ie 50% are now stale. + reader.close(); + // cache count should not be affected + assertEquals(2, cache.count()); + + // clean cache with 50% staleness threshold + cache.cacheCleanupManager.cleanCache(); + // cleanup should have taken effect + assertEquals(1, cache.count()); + + IOUtils.close(secondReader, writer, dir, cache); + terminate(threadPool); + } + + public void testCacheCleanupBasedOnStaleThreshold_StalenessGreaterThanThreshold() throws Exception { + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexShard indexShard = createIndex("test").getShard(0); + ThreadPool threadPool = getThreadPool(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.49").build(); + IndicesRequestCache cache = new IndicesRequestCache(settings, (shardId -> { + IndexService indexService = null; + try { + indexService = indicesService.indexServiceSafe(shardId.getIndex()); + } catch (IndexNotFoundException ex) { + return Optional.empty(); + } + return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); + }), new CacheModule(new ArrayList<>(), settings).getCacheService(), threadPool); + Directory dir = newDirectory(); + IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); + + writer.addDocument(newDoc(0, "foo")); + DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); + BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + if (randomBoolean()) { + writer.flush(); + IOUtils.close(writer); + writer = new IndexWriter(dir, newIndexWriterConfig()); + } + writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); + DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + + // Get 2 entries into the cache + IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); + Loader loader = new Loader(reader, 0); + cache.getOrCompute(entity, loader, reader, termBytes); + + entity = new IndicesService.IndexShardCacheEntity(indexShard); + loader = new Loader(reader, 0); + cache.getOrCompute(entity, loader, reader, termBytes); + + IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); + loader = new Loader(secondReader, 0); + cache.getOrCompute(entity, loader, secondReader, termBytes); + + secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); + loader = new Loader(secondReader, 0); + cache.getOrCompute(secondEntity, loader, secondReader, termBytes); + assertEquals(2, cache.count()); + + // Close the reader, to be enqueued for cleanup + // 1 out of 2 keys ie 50% are now stale. + reader.close(); + // cache count should not be affected + assertEquals(2, cache.count()); + + // clean cache with 49% staleness threshold + cache.cacheCleanupManager.cleanCache(); + // cleanup should have taken effect with 49% threshold + assertEquals(1, cache.count()); + + IOUtils.close(secondReader, writer, dir, cache); + terminate(threadPool); + } + + public void testCacheCleanupBasedOnStaleThreshold_StalenessLesserThanThreshold() throws Exception { + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexShard indexShard = createIndex("test").getShard(0); + ThreadPool threadPool = getThreadPool(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%").build(); + IndicesRequestCache cache = new IndicesRequestCache(settings, (shardId -> { + IndexService indexService = null; + try { + indexService = indicesService.indexServiceSafe(shardId.getIndex()); + } catch (IndexNotFoundException ex) { + return Optional.empty(); + } + return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); + }), new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), threadPool); + Directory dir = newDirectory(); + IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); + + writer.addDocument(newDoc(0, "foo")); + DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); + BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + if (randomBoolean()) { + writer.flush(); + IOUtils.close(writer); + writer = new IndexWriter(dir, newIndexWriterConfig()); + } + writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); + DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + + // Get 2 entries into the cache + IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); + Loader loader = new Loader(reader, 0); + cache.getOrCompute(entity, loader, reader, termBytes); + + entity = new IndicesService.IndexShardCacheEntity(indexShard); + loader = new Loader(reader, 0); + cache.getOrCompute(entity, loader, reader, termBytes); + + IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); + loader = new Loader(secondReader, 0); + cache.getOrCompute(entity, loader, secondReader, termBytes); + + secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); + loader = new Loader(secondReader, 0); + cache.getOrCompute(secondEntity, loader, secondReader, termBytes); + assertEquals(2, cache.count()); + + // Close the reader, to be enqueued for cleanup + // 1 out of 2 keys ie 50% are now stale. + reader.close(); + // cache count should not be affected + assertEquals(2, cache.count()); + + // clean cache with 51% staleness threshold + cache.cacheCleanupManager.cleanCache(); + // cleanup should have been ignored + assertEquals(2, cache.count()); + + IOUtils.close(secondReader, writer, dir, cache); + terminate(threadPool); + } + public void testEviction() throws Exception { final ByteSizeValue size; { IndexShard indexShard = createIndex("test").getShard(0); + ThreadPool threadPool = getThreadPool(); IndicesRequestCache cache = new IndicesRequestCache( Settings.EMPTY, (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))), - new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService() + new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), + threadPool ); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -337,12 +623,15 @@ public void testEviction() throws Exception { assertEquals("bar", value2.streamInput().readString()); size = indexShard.requestCache().stats().getMemorySize(); IOUtils.close(reader, secondReader, writer, dir, cache); + terminate(threadPool); } IndexShard indexShard = createIndex("test1").getShard(0); + ThreadPool threadPool = getThreadPool(); IndicesRequestCache cache = new IndicesRequestCache( Settings.builder().put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), size.getBytes() + 1 + "b").build(), (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))), - new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService() + new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), + threadPool ); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -374,11 +663,13 @@ public void testEviction() throws Exception { assertEquals(2, cache.count()); assertEquals(1, indexShard.requestCache().stats().getEvictions()); IOUtils.close(reader, secondReader, thirdReader, writer, dir, cache); + terminate(threadPool); } public void testClearAllEntityIdentity() throws Exception { IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndexShard indexShard = createIndex("test").getShard(0); + ThreadPool threadPool = getThreadPool(); IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, (shardId -> { IndexService indexService = null; try { @@ -387,7 +678,7 @@ public void testClearAllEntityIdentity() throws Exception { return Optional.empty(); } return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService()); + }), new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), threadPool); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -422,7 +713,7 @@ public void testClearAllEntityIdentity() throws Exception { final long hitCount = requestCacheStats.getHitCount(); // clear all for the indexShard Idendity even though is't still open cache.clear(randomFrom(entity, secondEntity)); - cache.cleanCache(); + cache.cacheCleanupManager.cleanCache(); assertEquals(1, cache.count()); // third has not been validated since it's a different identity value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes); @@ -432,7 +723,7 @@ public void testClearAllEntityIdentity() throws Exception { assertEquals("baz", value3.streamInput().readString()); IOUtils.close(reader, secondReader, thirdReader, writer, dir, cache); - + terminate(threadPool); } public Iterable newDoc(int id, String value) { @@ -474,6 +765,7 @@ public BytesReference get() { public void testInvalidate() throws Exception { IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndexShard indexShard = createIndex("test").getShard(0); + ThreadPool threadPool = getThreadPool(); IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, (shardId -> { IndexService indexService = null; try { @@ -482,7 +774,7 @@ public void testInvalidate() throws Exception { return Optional.empty(); } return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService()); + }), new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), threadPool); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -539,7 +831,7 @@ public void testInvalidate() throws Exception { indexShard.close("test", true, true); // closed shard but reader is still open cache.clear(entity); } - cache.cleanCache(); + cache.cacheCleanupManager.cleanCache(); assertEquals(1, requestCacheStats.stats().getHitCount()); assertEquals(2, requestCacheStats.stats().getMissCount()); assertEquals(0, requestCacheStats.stats().getEvictions()); @@ -547,6 +839,7 @@ public void testInvalidate() throws Exception { assertEquals(0, requestCacheStats.stats().getMemorySize().bytesAsInt()); IOUtils.close(reader, writer, dir, cache); + terminate(threadPool); assertEquals(0, cache.numRegisteredCloseListeners()); } From 8b0ccb9e456b61433a8d818ca75f50deb736f983 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Tue, 12 Mar 2024 12:34:08 -0700 Subject: [PATCH 08/29] Update CHANGELOG.md Signed-off-by: Kiran Prakash --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5044e4c0e26b7..a56ea61554bfc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -115,6 +115,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Metrics Framework] Adds support for asynchronous gauge metric type. ([#12642](https://github.com/opensearch-project/OpenSearch/issues/12642)) - Make search query counters dynamic to support all query types ([#12601](https://github.com/opensearch-project/OpenSearch/pull/12601)) - [Tiered caching] Add policies controlling which values can enter pluggable caches [EXPERIMENTAL] ([#12542](https://github.com/opensearch-project/OpenSearch/pull/12542)) +- [Tiered caching] Add Stale keys Management and CacheCleaner to IndicesRequestCache ([#12625](https://github.com/opensearch-project/OpenSearch/pull/12625)) ### Dependencies - Bump `peter-evans/find-comment` from 2 to 3 ([#12288](https://github.com/opensearch-project/OpenSearch/pull/12288)) @@ -159,7 +160,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix `terms` query on `float` field when `doc_values` are turned off by reverting back to `FloatPoint` from `FloatField` ([#12499](https://github.com/opensearch-project/OpenSearch/pull/12499)) - Fix get task API does not refresh resource stats ([#11531](https://github.com/opensearch-project/OpenSearch/pull/11531)) - onShardResult and onShardFailure are executed on one shard causes opensearch jvm crashed ([#12158](https://github.com/opensearch-project/OpenSearch/pull/12158)) -- Avoid overflow when sorting missing last on `epoch_millis` datetime field ([#12676](https://github.com/opensearch-project/OpenSearch/pull/12676)) ### Security From 6975e0cca1bf5b2a0a8932f03cb27d3a6331b347 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Wed, 13 Mar 2024 14:35:21 -0700 Subject: [PATCH 09/29] move cachecleaner inside mgr Signed-off-by: Kiran Prakash --- .../indices/IndicesRequestCache.java | 118 ++++++++---------- 1 file changed, 52 insertions(+), 66 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 560502a5e826d..dc079140afddd 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -49,7 +49,6 @@ import org.opensearch.common.cache.service.CacheService; import org.opensearch.common.cache.store.config.CacheConfig; import org.opensearch.common.lease.Releasable; -import org.opensearch.common.lifecycle.AbstractLifecycleComponent; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; @@ -99,10 +98,7 @@ * * @opensearch.internal */ -public final class IndicesRequestCache extends AbstractLifecycleComponent - implements - RemovalListener, - Closeable { +public final class IndicesRequestCache implements RemovalListener, Closeable { private static final Logger logger = LogManager.getLogger(IndicesRequestCache.class); @@ -147,9 +143,6 @@ public final class IndicesRequestCache extends AbstractLifecycleComponent private final Function> cacheEntityLookup; // pkg-private for testing final IndicesRequestCacheCleanupManager cacheCleanupManager; - private final IndicesRequestCacheCleaner cacheCleaner; - private final TimeValue cleanInterval; - private final ThreadPool threadpool; IndicesRequestCache( Settings settings, @@ -159,12 +152,13 @@ public final class IndicesRequestCache extends AbstractLifecycleComponent ) { this.size = INDICES_CACHE_QUERY_SIZE.get(settings); this.expire = INDICES_CACHE_QUERY_EXPIRE.exists(settings) ? INDICES_CACHE_QUERY_EXPIRE.get(settings) : null; - this.cleanInterval = INDICES_REQUEST_CACHE_CLEAN_INTERVAL_SETTING.get(settings); long sizeInBytes = size.getBytes(); ToLongBiFunction weigher = (k, v) -> k.ramBytesUsed() + v.ramBytesUsed(); - this.cacheCleanupManager = new IndicesRequestCacheCleanupManager(getStalenessThreshold(settings)); - this.threadpool = threadPool; - this.cacheCleaner = new IndicesRequestCacheCleaner(this, this.threadpool, this.cleanInterval); + this.cacheCleanupManager = new IndicesRequestCacheCleanupManager( + threadPool, + INDICES_REQUEST_CACHE_CLEAN_INTERVAL_SETTING.get(settings), + getStalenessThreshold(settings) + ); this.cacheEntityLookup = cacheEntityFunction; this.cache = cacheService.createCache( new CacheConfig.Builder().setSettings(settings) @@ -187,22 +181,10 @@ public final class IndicesRequestCache extends AbstractLifecycleComponent ); } - @Override - protected void doStart() { - threadpool.schedule(this.cacheCleaner, this.cleanInterval, ThreadPool.Names.SAME); - } - - @Override - protected void doStop() { - cacheCleaner.close(); - } - - @Override - protected void doClose() throws IOException {} - @Override public void close() { cache.invalidateAll(); + cacheCleanupManager.close(); } private double getStalenessThreshold(Settings settings) { @@ -271,39 +253,6 @@ void invalidate(IndicesService.IndexShardCacheEntity cacheEntity, DirectoryReade cache.invalidate(new Key(((IndexShard) cacheEntity.getCacheIdentity()).shardId(), cacheKey, readerCacheKeyId)); } - private final class IndicesRequestCacheCleaner implements Runnable, Releasable { - - private final IndicesRequestCache indicesRequestCache; - private final ThreadPool threadPool; - private final TimeValue interval; - - IndicesRequestCacheCleaner(IndicesRequestCache indicesRequestCache, ThreadPool threadPool, TimeValue interval) { - this.indicesRequestCache = indicesRequestCache; - this.threadPool = threadPool; - this.interval = interval; - } - - private final AtomicBoolean closed = new AtomicBoolean(false); - - @Override - public void run() { - try { - this.indicesRequestCache.cacheCleanupManager.cleanCache(); - } catch (Exception e) { - logger.warn("Exception during periodic indices request cache cleanup:", e); - } - // Reschedule itself to run again if not closed - if (closed.get() == false) { - threadPool.scheduleUnlessShuttingDown(interval, ThreadPool.Names.SAME, this); - } - } - - @Override - public void close() { - closed.compareAndSet(false, true); - } - } - /** * Loader for the request cache * @@ -479,15 +428,16 @@ public int hashCode() { * * If Staleness threshold is 0, we do not keep track of stale keys in the cache * */ - class IndicesRequestCacheCleanupManager { + class IndicesRequestCacheCleanupManager implements Closeable { private final Set keysToClean; private final ConcurrentMap> cleanupKeyToCountMap; private final AtomicInteger staleKeysCount; private final double stalenessThreshold; private final Lock readLock; private final Lock writeLock; + private final IndicesRequestCacheCleaner cacheCleaner; - IndicesRequestCacheCleanupManager(double stalenessThreshold) { + IndicesRequestCacheCleanupManager(ThreadPool threadpool, TimeValue cleanInterval, double stalenessThreshold) { this.stalenessThreshold = stalenessThreshold; this.keysToClean = ConcurrentCollections.newConcurrentSet(); this.cleanupKeyToCountMap = ConcurrentCollections.newConcurrentMap(); @@ -495,6 +445,8 @@ class IndicesRequestCacheCleanupManager { ReadWriteLock rwLock = new ReentrantReadWriteLock(); this.readLock = rwLock.readLock(); this.writeLock = rwLock.writeLock(); + this.cacheCleaner = new IndicesRequestCacheCleaner(this, threadpool, cleanInterval); + threadpool.schedule(cacheCleaner, cleanInterval, ThreadPool.Names.SAME); } /** @@ -681,13 +633,9 @@ private boolean shouldRemoveKey(Key key, Set cleanupKeysFromOutdated if (cleanupKeysFromClosedShards.contains(key.shardId)) { return true; } else { - Optional cacheEntity = cacheEntityLookup.apply(key.shardId); - if (cacheEntity.isPresent()) { - CleanupKey cleanupKey = new CleanupKey(cacheEntity.get(), key.readerCacheKeyId); - return cleanupKeysFromOutdatedReaders.contains(cleanupKey); - } + CleanupKey cleanupKey = new CleanupKey(cacheEntityLookup.apply(key.shardId).orElse(null), key.readerCacheKeyId); + return cleanupKeysFromOutdatedReaders.contains(cleanupKey); } - return false; } finally { readLock.unlock(); } @@ -739,6 +687,44 @@ private double staleKeysInCachePercentage() { readLock.unlock(); } } + + @Override + public void close() { + this.cacheCleaner.close(); + } + + private final class IndicesRequestCacheCleaner implements Runnable, Releasable { + + private final IndicesRequestCacheCleanupManager cacheCleanupManager; + private final ThreadPool threadPool; + private final TimeValue interval; + + IndicesRequestCacheCleaner(IndicesRequestCacheCleanupManager cacheCleanupManager, ThreadPool threadPool, TimeValue interval) { + this.cacheCleanupManager = cacheCleanupManager; + this.threadPool = threadPool; + this.interval = interval; + } + + private final AtomicBoolean closed = new AtomicBoolean(false); + + @Override + public void run() { + try { + this.cacheCleanupManager.cleanCache(); + } catch (Exception e) { + logger.warn("Exception during periodic indices request cache cleanup:", e); + } + // Reschedule itself to run again if not closed + if (closed.get() == false) { + threadPool.scheduleUnlessShuttingDown(interval, ThreadPool.Names.SAME, this); + } + } + + @Override + public void close() { + closed.compareAndSet(false, true); + } + } } /** From 2deb00114b5e8a02ba4a27e9d12959f4f3136b2c Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Wed, 13 Mar 2024 14:36:47 -0700 Subject: [PATCH 10/29] remove processCleanupKeys Signed-off-by: Kiran Prakash --- .../indices/IndicesRequestCache.java | 40 +++++-------------- 1 file changed, 10 insertions(+), 30 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index dc079140afddd..1ad82b13576a5 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -567,7 +567,16 @@ private void cleanCache(double threshold) { // Contains CleanupKey objects of a closed shard. final Set cleanupKeysFromClosedShards = new HashSet<>(); - processCleanupKeys(cleanupKeysFromOutdatedReaders, cleanupKeysFromClosedShards); + for (Iterator iterator = keysToClean.iterator(); iterator.hasNext();) { + CleanupKey cleanupKey = iterator.next(); + iterator.remove(); + if (cleanupKey.readerCacheKeyId == null || !cleanupKey.entity.isOpen()) { + // null indicates full cleanup, as does a closed shard + cleanupKeysFromClosedShards.add(((IndexShard) cleanupKey.entity.getCacheIdentity()).shardId()); + } else { + cleanupKeysFromOutdatedReaders.add(cleanupKey); + } + } if (cleanupKeysFromOutdatedReaders.isEmpty() && cleanupKeysFromClosedShards.isEmpty()) { return; @@ -585,35 +594,6 @@ private void cleanCache(double threshold) { } } - /** - * Processes the CleanupKeys in the keysToClean set and categorizes them into two sets: - * cleanupKeysFromOutdatedReaders and cleanupKeysFromClosedShards. - * - *

For each CleanupKey in keysToClean, if the readerCacheKeyId is null or the entity is not open, - * the shardId of the entity is added to cleanupKeysFromClosedShards. Otherwise, the CleanupKey is added - * to cleanupKeysFromOutdatedReaders. - * - * @param cleanupKeysFromOutdatedReaders A set to hold CleanupKeys with open shard but invalidated readerCacheKeyId. - * @param cleanupKeysFromClosedShards A set to hold CleanupKeys of a closed shard. - */ - private void processCleanupKeys(Set cleanupKeysFromOutdatedReaders, Set cleanupKeysFromClosedShards) { - writeLock.lock(); - try { - for (Iterator iterator = keysToClean.iterator(); iterator.hasNext();) { - CleanupKey cleanupKey = iterator.next(); - iterator.remove(); - if (cleanupKey.readerCacheKeyId == null || !cleanupKey.entity.isOpen()) { - // null indicates full cleanup, as does a closed shard - cleanupKeysFromClosedShards.add(((IndexShard) cleanupKey.entity.getCacheIdentity()).shardId()); - } else { - cleanupKeysFromOutdatedReaders.add(cleanupKey); - } - } - } finally { - writeLock.unlock(); - } - } - /** * Determines whether a key should be removed from the cache. * From 7b81cb94e325a42d259bcdabd9b84230fc1f7754 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Wed, 13 Mar 2024 14:37:00 -0700 Subject: [PATCH 11/29] minor cleanups Signed-off-by: Kiran Prakash --- .../java/org/opensearch/indices/IndicesRequestCache.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 1ad82b13576a5..d94eaf294fe11 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -222,7 +222,7 @@ BytesReference getOrCompute( BytesReference value = cache.computeIfAbsent(key, cacheLoader); if (cacheLoader.isLoaded()) { cacheEntity.onMiss(); - // see if its the first time we see this reader, and make sure to register a cleanup key + // see if it's the first time we see this reader, and make sure to register a cleanup key CleanupKey cleanupKey = new CleanupKey(cacheEntity, readerCacheKeyId); if (!registeredClosedListeners.containsKey(cleanupKey)) { Boolean previous = registeredClosedListeners.putIfAbsent(cleanupKey, Boolean.TRUE); @@ -639,7 +639,8 @@ private boolean canSkipCacheCleanup(double cleanThresholdPercent) { if (staleKeysInCachePercentage() < cleanThresholdPercent) { if (logger.isDebugEnabled()) { logger.debug( - "Skipping disk cache keys cleanup since the percentage of stale keys in disk cache is less than the threshold" + "Skipping cache cleanup since the percentage of stale keys is less than the threshold : " + + stalenessThreshold ); } return true; From 840f5c36dd1b5adbbf5119544ff614ab24ca17ca Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Wed, 13 Mar 2024 14:37:26 -0700 Subject: [PATCH 12/29] add updateCleanupKeyToCountMapOnCacheEviction Signed-off-by: Kiran Prakash --- .../indices/IndicesRequestCache.java | 33 +++++++++++++++++-- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index d94eaf294fe11..13ac27104adb4 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -201,7 +201,11 @@ void clear(CacheEntity entity) { public void onRemoval(RemovalNotification notification) { // In case this event happens for an old shard, we can safely ignore this as we don't keep track for old // shards as part of request cache. - cacheEntityLookup.apply(notification.getKey().shardId).ifPresent(entity -> entity.onRemoval(notification)); + Key key = notification.getKey(); + cacheEntityLookup.apply(key.shardId).ifPresent(entity -> entity.onRemoval(notification)); + cacheCleanupManager.updateCleanupKeyToCountMapOnCacheEviction( + new CleanupKey(cacheEntityLookup.apply(key.shardId).orElse(null), key.readerCacheKeyId) + ); } BytesReference getOrCompute( @@ -230,7 +234,7 @@ BytesReference getOrCompute( OpenSearchDirectoryReader.addReaderCloseListener(reader, cleanupKey); } } - cacheCleanupManager.updateCleanupKeyToCountMap(cleanupKey); + cacheCleanupManager.updateCleanupKeyToCountMapOnCacheInsertion(cleanupKey); } else { cacheEntity.onHit(); } @@ -471,7 +475,7 @@ void enqueueCleanupKey(CleanupKey cleanupKey) { * * @param cleanupKey the CleanupKey to be updated in the map */ - private void updateCleanupKeyToCountMap(CleanupKey cleanupKey) { + private void updateCleanupKeyToCountMapOnCacheInsertion(CleanupKey cleanupKey) { if (stalenessThreshold == 0.0) { return; } @@ -488,6 +492,29 @@ private void updateCleanupKeyToCountMap(CleanupKey cleanupKey) { .merge(cleanupKey.readerCacheKeyId, 1, Integer::sum); } + private void updateCleanupKeyToCountMapOnCacheEviction(CleanupKey cleanupKey) { + if (stalenessThreshold == 0.0) { + return; + } + IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity(); + if (indexShard == null) { + logger.warn("IndexShard is null for CleanupKey: {} while cleaning Indices Request Cache", cleanupKey.readerCacheKeyId); + return; + } + ShardId shardId = indexShard.shardId(); + + writeLock.lock(); + try { + // If the key doesn't exist, ignore + ConcurrentMap keyCountMap = cleanupKeyToCountMap.get(shardId); + if (keyCountMap != null) { + keyCountMap.computeIfPresent(cleanupKey.readerCacheKeyId, (key, value) -> value - 1); + } + } finally { + writeLock.unlock(); + } + } + /** * Updates the count of stale keys in the cache. * This method is called when a CleanupKey is added to the keysToClean set. From 1de2f6676c0119e9e2087b6fc0e190baa7cffe20 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Wed, 13 Mar 2024 14:45:18 -0700 Subject: [PATCH 13/29] remove locks and make all methods synchronized Signed-off-by: Kiran Prakash --- .../indices/IndicesRequestCache.java | 131 +++++++----------- 1 file changed, 49 insertions(+), 82 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 13ac27104adb4..f0ca913e904fb 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -77,9 +77,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; import java.util.function.ToLongBiFunction; @@ -437,8 +434,6 @@ class IndicesRequestCacheCleanupManager implements Closeable { private final ConcurrentMap> cleanupKeyToCountMap; private final AtomicInteger staleKeysCount; private final double stalenessThreshold; - private final Lock readLock; - private final Lock writeLock; private final IndicesRequestCacheCleaner cacheCleaner; IndicesRequestCacheCleanupManager(ThreadPool threadpool, TimeValue cleanInterval, double stalenessThreshold) { @@ -446,9 +441,6 @@ class IndicesRequestCacheCleanupManager implements Closeable { this.keysToClean = ConcurrentCollections.newConcurrentSet(); this.cleanupKeyToCountMap = ConcurrentCollections.newConcurrentMap(); this.staleKeysCount = new AtomicInteger(0); - ReadWriteLock rwLock = new ReentrantReadWriteLock(); - this.readLock = rwLock.readLock(); - this.writeLock = rwLock.writeLock(); this.cacheCleaner = new IndicesRequestCacheCleaner(this, threadpool, cleanInterval); threadpool.schedule(cacheCleaner, cleanInterval, ThreadPool.Names.SAME); } @@ -492,7 +484,7 @@ private void updateCleanupKeyToCountMapOnCacheInsertion(CleanupKey cleanupKey) { .merge(cleanupKey.readerCacheKeyId, 1, Integer::sum); } - private void updateCleanupKeyToCountMapOnCacheEviction(CleanupKey cleanupKey) { + private synchronized void updateCleanupKeyToCountMapOnCacheEviction(CleanupKey cleanupKey) { if (stalenessThreshold == 0.0) { return; } @@ -503,15 +495,10 @@ private void updateCleanupKeyToCountMapOnCacheEviction(CleanupKey cleanupKey) { } ShardId shardId = indexShard.shardId(); - writeLock.lock(); - try { - // If the key doesn't exist, ignore - ConcurrentMap keyCountMap = cleanupKeyToCountMap.get(shardId); - if (keyCountMap != null) { - keyCountMap.computeIfPresent(cleanupKey.readerCacheKeyId, (key, value) -> value - 1); - } - } finally { - writeLock.unlock(); + // If the key doesn't exist, ignore + ConcurrentMap keyCountMap = cleanupKeyToCountMap.get(shardId); + if (keyCountMap != null) { + keyCountMap.computeIfPresent(cleanupKey.readerCacheKeyId, (key, value) -> value - 1); } } @@ -584,41 +571,36 @@ private void cleanCache(double threshold) { if (logger.isDebugEnabled()) { logger.debug("Cleaning Indices Request Cache with threshold : " + threshold); } - writeLock.lock(); - try { - if (canSkipCacheCleanup(threshold)) { - return; - } - // Contains CleanupKey objects with open shard but invalidated readerCacheKeyId. - final Set cleanupKeysFromOutdatedReaders = new HashSet<>(); - // Contains CleanupKey objects of a closed shard. - final Set cleanupKeysFromClosedShards = new HashSet<>(); - - for (Iterator iterator = keysToClean.iterator(); iterator.hasNext();) { - CleanupKey cleanupKey = iterator.next(); - iterator.remove(); - if (cleanupKey.readerCacheKeyId == null || !cleanupKey.entity.isOpen()) { - // null indicates full cleanup, as does a closed shard - cleanupKeysFromClosedShards.add(((IndexShard) cleanupKey.entity.getCacheIdentity()).shardId()); - } else { - cleanupKeysFromOutdatedReaders.add(cleanupKey); - } + if (canSkipCacheCleanup(threshold)) { + return; + } + // Contains CleanupKey objects with open shard but invalidated readerCacheKeyId. + final Set cleanupKeysFromOutdatedReaders = new HashSet<>(); + // Contains CleanupKey objects of a closed shard. + final Set cleanupKeysFromClosedShards = new HashSet<>(); + + for (Iterator iterator = keysToClean.iterator(); iterator.hasNext(); ) { + CleanupKey cleanupKey = iterator.next(); + iterator.remove(); + if (cleanupKey.readerCacheKeyId == null || !cleanupKey.entity.isOpen()) { + // null indicates full cleanup, as does a closed shard + cleanupKeysFromClosedShards.add(((IndexShard) cleanupKey.entity.getCacheIdentity()).shardId()); + } else { + cleanupKeysFromOutdatedReaders.add(cleanupKey); } + } - if (cleanupKeysFromOutdatedReaders.isEmpty() && cleanupKeysFromClosedShards.isEmpty()) { - return; - } + if (cleanupKeysFromOutdatedReaders.isEmpty() && cleanupKeysFromClosedShards.isEmpty()) { + return; + } - for (Iterator iterator = cache.keys().iterator(); iterator.hasNext();) { - Key key = iterator.next(); - if (shouldRemoveKey(key, cleanupKeysFromOutdatedReaders, cleanupKeysFromClosedShards)) { - iterator.remove(); - } + for (Iterator iterator = cache.keys().iterator(); iterator.hasNext(); ) { + Key key = iterator.next(); + if (shouldRemoveKey(key, cleanupKeysFromOutdatedReaders, cleanupKeysFromClosedShards)) { + iterator.remove(); } - cache.refresh(); - } finally { - writeLock.unlock(); } + cache.refresh(); } /** @@ -634,17 +616,12 @@ private void cleanCache(double threshold) { * @param cleanupKeysFromClosedShards A set of CleanupKeys of a closed shard. * @return true if the key should be removed, false otherwise. */ - private boolean shouldRemoveKey(Key key, Set cleanupKeysFromOutdatedReaders, Set cleanupKeysFromClosedShards) { - readLock.lock(); - try { - if (cleanupKeysFromClosedShards.contains(key.shardId)) { - return true; - } else { - CleanupKey cleanupKey = new CleanupKey(cacheEntityLookup.apply(key.shardId).orElse(null), key.readerCacheKeyId); - return cleanupKeysFromOutdatedReaders.contains(cleanupKey); - } - } finally { - readLock.unlock(); + private synchronized boolean shouldRemoveKey(Key key, Set cleanupKeysFromOutdatedReaders, Set cleanupKeysFromClosedShards) { + if (cleanupKeysFromClosedShards.contains(key.shardId)) { + return true; + } else { + CleanupKey cleanupKey = new CleanupKey(cacheEntityLookup.apply(key.shardId).orElse(null), key.readerCacheKeyId); + return cleanupKeysFromOutdatedReaders.contains(cleanupKey); } } @@ -657,23 +634,18 @@ private boolean shouldRemoveKey(Key key, Set cleanupKeysFromOutdated * @param cleanThresholdPercent The staleness threshold as a percentage. * @return true if the cache cleanup process can be skipped, false otherwise. */ - private boolean canSkipCacheCleanup(double cleanThresholdPercent) { + private synchronized boolean canSkipCacheCleanup(double cleanThresholdPercent) { if (cleanThresholdPercent == 0.0) { return false; } - readLock.lock(); - try { - if (staleKeysInCachePercentage() < cleanThresholdPercent) { - if (logger.isDebugEnabled()) { - logger.debug( - "Skipping cache cleanup since the percentage of stale keys is less than the threshold : " - + stalenessThreshold - ); - } - return true; + if (staleKeysInCachePercentage() < cleanThresholdPercent) { + if (logger.isDebugEnabled()) { + logger.debug( + "Skipping cache cleanup since the percentage of stale keys is less than the threshold : " + + stalenessThreshold + ); } - } finally { - readLock.unlock(); + return true; } return false; } @@ -683,17 +655,12 @@ private boolean canSkipCacheCleanup(double cleanThresholdPercent) { * * @return The percentage of stale keys in the cache as a double. Returns 0 if there are no keys in the cache or no stale keys. */ - private double staleKeysInCachePercentage() { - readLock.lock(); - try { - long totalKeysInCache = count(); - if (totalKeysInCache == 0 || staleKeysCount.get() == 0) { - return 0; - } - return ((double) staleKeysCount.get() / totalKeysInCache); - } finally { - readLock.unlock(); + private synchronized double staleKeysInCachePercentage() { + long totalKeysInCache = count(); + if (totalKeysInCache == 0 || staleKeysCount.get() == 0) { + return 0; } + return ((double) staleKeysCount.get() / totalKeysInCache); } @Override From d5f75728292d5a62672135571b6c88c1ca311b21 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Wed, 13 Mar 2024 16:27:26 -0700 Subject: [PATCH 14/29] spotless Signed-off-by: Kiran Prakash --- .../org/opensearch/indices/IndicesRequestCache.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index f0ca913e904fb..b93c137b1aaad 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -579,7 +579,7 @@ private void cleanCache(double threshold) { // Contains CleanupKey objects of a closed shard. final Set cleanupKeysFromClosedShards = new HashSet<>(); - for (Iterator iterator = keysToClean.iterator(); iterator.hasNext(); ) { + for (Iterator iterator = keysToClean.iterator(); iterator.hasNext();) { CleanupKey cleanupKey = iterator.next(); iterator.remove(); if (cleanupKey.readerCacheKeyId == null || !cleanupKey.entity.isOpen()) { @@ -594,7 +594,7 @@ private void cleanCache(double threshold) { return; } - for (Iterator iterator = cache.keys().iterator(); iterator.hasNext(); ) { + for (Iterator iterator = cache.keys().iterator(); iterator.hasNext();) { Key key = iterator.next(); if (shouldRemoveKey(key, cleanupKeysFromOutdatedReaders, cleanupKeysFromClosedShards)) { iterator.remove(); @@ -616,7 +616,11 @@ private void cleanCache(double threshold) { * @param cleanupKeysFromClosedShards A set of CleanupKeys of a closed shard. * @return true if the key should be removed, false otherwise. */ - private synchronized boolean shouldRemoveKey(Key key, Set cleanupKeysFromOutdatedReaders, Set cleanupKeysFromClosedShards) { + private synchronized boolean shouldRemoveKey( + Key key, + Set cleanupKeysFromOutdatedReaders, + Set cleanupKeysFromClosedShards + ) { if (cleanupKeysFromClosedShards.contains(key.shardId)) { return true; } else { @@ -641,8 +645,7 @@ private synchronized boolean canSkipCacheCleanup(double cleanThresholdPercent) { if (staleKeysInCachePercentage() < cleanThresholdPercent) { if (logger.isDebugEnabled()) { logger.debug( - "Skipping cache cleanup since the percentage of stale keys is less than the threshold : " - + stalenessThreshold + "Skipping cache cleanup since the percentage of stale keys is less than the threshold : " + stalenessThreshold ); } return true; From 41ba5be5173eb91531d2c1c9aa176cd9bf41cc26 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Wed, 13 Mar 2024 17:41:18 -0700 Subject: [PATCH 15/29] updateCleanupKeyToCountMapOnCacheEviction Signed-off-by: Kiran Prakash --- .../org/opensearch/indices/IndicesRequestCache.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index b93c137b1aaad..2368133fa6dae 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -494,11 +494,19 @@ private synchronized void updateCleanupKeyToCountMapOnCacheEviction(CleanupKey c return; } ShardId shardId = indexShard.shardId(); - // If the key doesn't exist, ignore ConcurrentMap keyCountMap = cleanupKeyToCountMap.get(shardId); if (keyCountMap != null) { - keyCountMap.computeIfPresent(cleanupKey.readerCacheKeyId, (key, value) -> value - 1); + keyCountMap.compute(cleanupKey.readerCacheKeyId, (key, currentValue) -> { + // If the key is not present, no action is needed, return null. + if (currentValue == null) return null; + // Calculate the new value. + int newValue = currentValue - 1; + // decrement the stale key count + staleKeysCount.decrementAndGet(); + // Remove the key if the new value is zero by returning null; otherwise, update with the new value. + return newValue == 0 ? null : newValue; + }); } } From 82a6b521a06dcbdbf442894b8dfb802cfdc21546 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Wed, 13 Mar 2024 18:12:01 -0700 Subject: [PATCH 16/29] Testing Signed-off-by: Kiran Prakash --- .../indices/IndicesRequestCache.java | 4 + .../indices/IndicesRequestCacheTests.java | 149 ++++++++++++++++++ 2 files changed, 153 insertions(+) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 2368133fa6dae..2557eaee8d77d 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -556,6 +556,10 @@ private void updateStaleKeysCount(CleanupKey cleanupKey) { }); } + AtomicInteger getStaleKeysCountForTesting() { + return staleKeysCount; + } + /** * Clean cache based on stalenessThreshold */ diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index 8b903db485ca7..4fe446c88e1e7 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -46,6 +46,8 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.util.BytesRef; import org.opensearch.common.CheckedSupplier; +import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.RemovalReason; import org.opensearch.common.cache.module.CacheModule; import org.opensearch.common.cache.service.CacheService; import org.opensearch.common.io.stream.BytesStreamOutput; @@ -76,6 +78,7 @@ import java.util.Arrays; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import static org.opensearch.indices.IndicesRequestCache.INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING; import static org.mockito.Mockito.mock; @@ -467,6 +470,152 @@ public void testCacheCleanupBasedOnStaleThreshold_StalenessEqualToThreshold() th terminate(threadPool); } + public void testStaleCount_OnRemovalNotificationOfStaleKey_DecrementsStaleCount() throws Exception { + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexShard indexShard = createIndex("test").getShard(0); + ThreadPool threadPool = getThreadPool(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51").build(); + IndicesRequestCache cache = new IndicesRequestCache(settings, (shardId -> { + IndexService indexService = null; + try { + indexService = indicesService.indexServiceSafe(shardId.getIndex()); + } catch (IndexNotFoundException ex) { + return Optional.empty(); + } + return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); + }), new CacheModule(new ArrayList<>(), settings).getCacheService(), threadPool); + Directory dir = newDirectory(); + IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); + + writer.addDocument(newDoc(0, "foo")); + DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); + BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + if (randomBoolean()) { + writer.flush(); + IOUtils.close(writer); + writer = new IndexWriter(dir, newIndexWriterConfig()); + } + writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); + DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + + // Get 2 entries into the cache + IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); + Loader loader = new Loader(reader, 0); + cache.getOrCompute(entity, loader, reader, termBytes); + + entity = new IndicesService.IndexShardCacheEntity(indexShard); + loader = new Loader(reader, 0); + cache.getOrCompute(entity, loader, reader, termBytes); + + IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); + loader = new Loader(secondReader, 0); + cache.getOrCompute(entity, loader, secondReader, termBytes); + + secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); + loader = new Loader(secondReader, 0); + cache.getOrCompute(secondEntity, loader, secondReader, termBytes); + assertEquals(2, cache.count()); + + // Close the reader, to be enqueued for cleanup + reader.close(); + AtomicInteger staleKeysCount = cache.cacheCleanupManager.getStaleKeysCountForTesting(); + // 1 out of 2 keys ie 50% are now stale. + assertEquals(1, staleKeysCount.get()); + // cache count should not be affected + assertEquals(2, cache.count()); + + OpenSearchDirectoryReader.DelegatingCacheHelper delegatingCacheHelper = + (OpenSearchDirectoryReader.DelegatingCacheHelper) secondReader.getReaderCacheHelper(); + String readerCacheKeyId = delegatingCacheHelper.getDelegatingCacheKey().getId(); + IndicesRequestCache.Key key = new IndicesRequestCache.Key( + ((IndexShard) secondEntity.getCacheIdentity()).shardId(), + termBytes, + readerCacheKeyId + ); + + cache.onRemoval(new RemovalNotification(key, termBytes, RemovalReason.EVICTED)); + staleKeysCount = cache.cacheCleanupManager.getStaleKeysCountForTesting(); + // eviction of previous stale key from the cache should decrement staleKeysCount in iRC + assertEquals(0, staleKeysCount.get()); + + IOUtils.close(secondReader, writer, dir, cache); + terminate(threadPool); + } + + public void testStaleCount_OnRemovalNotificationOfStaleKey_DoesNotDecrementsStaleCount() throws Exception { + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexShard indexShard = createIndex("test").getShard(0); + ThreadPool threadPool = getThreadPool(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51").build(); + IndicesRequestCache cache = new IndicesRequestCache(settings, (shardId -> { + IndexService indexService = null; + try { + indexService = indicesService.indexServiceSafe(shardId.getIndex()); + } catch (IndexNotFoundException ex) { + return Optional.empty(); + } + return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); + }), new CacheModule(new ArrayList<>(), settings).getCacheService(), threadPool); + Directory dir = newDirectory(); + IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); + + writer.addDocument(newDoc(0, "foo")); + DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); + BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + if (randomBoolean()) { + writer.flush(); + IOUtils.close(writer); + writer = new IndexWriter(dir, newIndexWriterConfig()); + } + writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); + DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + + // Get 2 entries into the cache + IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); + Loader loader = new Loader(reader, 0); + cache.getOrCompute(entity, loader, reader, termBytes); + + entity = new IndicesService.IndexShardCacheEntity(indexShard); + loader = new Loader(reader, 0); + cache.getOrCompute(entity, loader, reader, termBytes); + + IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); + loader = new Loader(secondReader, 0); + cache.getOrCompute(entity, loader, secondReader, termBytes); + + secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); + loader = new Loader(secondReader, 0); + cache.getOrCompute(secondEntity, loader, secondReader, termBytes); + assertEquals(2, cache.count()); + + // Close the reader, to be enqueued for cleanup + reader.close(); + AtomicInteger staleKeysCount = cache.cacheCleanupManager.getStaleKeysCountForTesting(); + // 1 out of 2 keys ie 50% are now stale. + assertEquals(1, staleKeysCount.get()); + // cache count should not be affected + assertEquals(2, cache.count()); + + OpenSearchDirectoryReader.DelegatingCacheHelper delegatingCacheHelper = (OpenSearchDirectoryReader.DelegatingCacheHelper) reader + .getReaderCacheHelper(); + String readerCacheKeyId = delegatingCacheHelper.getDelegatingCacheKey().getId(); + IndicesRequestCache.Key key = new IndicesRequestCache.Key( + ((IndexShard) secondEntity.getCacheIdentity()).shardId(), + termBytes, + readerCacheKeyId + ); + + cache.onRemoval(new RemovalNotification(key, termBytes, RemovalReason.EVICTED)); + staleKeysCount = cache.cacheCleanupManager.getStaleKeysCountForTesting(); + // eviction of NON-stale key from the cache should NOT decrement staleKeysCount in iRC + assertEquals(1, staleKeysCount.get()); + + IOUtils.close(secondReader, writer, dir, cache); + terminate(threadPool); + } + public void testCacheCleanupBasedOnStaleThreshold_StalenessGreaterThanThreshold() throws Exception { IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndexShard indexShard = createIndex("test").getShard(0); From f2ba23f9e7dea8f5619f6b0917d37441b6f949bb Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Thu, 14 Mar 2024 17:35:41 -0700 Subject: [PATCH 17/29] add Reschedule back to indices service Signed-off-by: Kiran Prakash --- .../src/main/java/org/opensearch/indices/IndicesService.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 65e68eb60a052..40c10e3a2fe96 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -1611,6 +1611,10 @@ public void run() { TimeValue.nsecToMSec(System.nanoTime() - startTimeNS) ); } + // Reschedule itself to run again if not closed + if (closed.get() == false) { + threadPool.scheduleUnlessShuttingDown(interval, ThreadPool.Names.SAME, this); + } } @Override From adff9e12e5ce416b20bcf043965fb44b9f87fef6 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Thu, 14 Mar 2024 17:36:21 -0700 Subject: [PATCH 18/29] rename updateStaleKeysCount to incrementStaleKeysCount Signed-off-by: Kiran Prakash --- .../java/org/opensearch/indices/IndicesRequestCache.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 2557eaee8d77d..ccb107b7b222a 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -452,7 +452,7 @@ class IndicesRequestCacheCleanupManager implements Closeable { */ void enqueueCleanupKey(CleanupKey cleanupKey) { keysToClean.add(cleanupKey); - updateStaleKeysCount(cleanupKey); + incrementStaleKeysCount(cleanupKey); } /** @@ -520,8 +520,7 @@ private synchronized void updateCleanupKeyToCountMapOnCacheEviction(CleanupKey c * * @param cleanupKey the CleanupKey that has been marked for cleanup */ - private void updateStaleKeysCount(CleanupKey cleanupKey) { - if (stalenessThreshold == 0.0) { + private void incrementStaleKeysCount(CleanupKey cleanupKey) { return; } IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity(); From e545ba7ce0554ff4f03ce1160a51ea6837cfd29e Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Thu, 14 Mar 2024 17:37:17 -0700 Subject: [PATCH 19/29] rename getStaleKeysCountForTesting to getStaleKeysCount Signed-off-by: Kiran Prakash --- .../java/org/opensearch/indices/IndicesRequestCache.java | 3 ++- .../org/opensearch/indices/IndicesRequestCacheTests.java | 8 ++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index ccb107b7b222a..40986c43f68cb 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -555,7 +555,8 @@ private void incrementStaleKeysCount(CleanupKey cleanupKey) { }); } - AtomicInteger getStaleKeysCountForTesting() { + // package private for testing + AtomicInteger getStaleKeysCount() { return staleKeysCount; } diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index 4fe446c88e1e7..594b9aac971b7 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -519,7 +519,7 @@ public void testStaleCount_OnRemovalNotificationOfStaleKey_DecrementsStaleCount( // Close the reader, to be enqueued for cleanup reader.close(); - AtomicInteger staleKeysCount = cache.cacheCleanupManager.getStaleKeysCountForTesting(); + AtomicInteger staleKeysCount = cache.cacheCleanupManager.getStaleKeysCount(); // 1 out of 2 keys ie 50% are now stale. assertEquals(1, staleKeysCount.get()); // cache count should not be affected @@ -535,7 +535,7 @@ public void testStaleCount_OnRemovalNotificationOfStaleKey_DecrementsStaleCount( ); cache.onRemoval(new RemovalNotification(key, termBytes, RemovalReason.EVICTED)); - staleKeysCount = cache.cacheCleanupManager.getStaleKeysCountForTesting(); + staleKeysCount = cache.cacheCleanupManager.getStaleKeysCount(); // eviction of previous stale key from the cache should decrement staleKeysCount in iRC assertEquals(0, staleKeysCount.get()); @@ -592,7 +592,7 @@ public void testStaleCount_OnRemovalNotificationOfStaleKey_DoesNotDecrementsStal // Close the reader, to be enqueued for cleanup reader.close(); - AtomicInteger staleKeysCount = cache.cacheCleanupManager.getStaleKeysCountForTesting(); + AtomicInteger staleKeysCount = cache.cacheCleanupManager.getStaleKeysCount(); // 1 out of 2 keys ie 50% are now stale. assertEquals(1, staleKeysCount.get()); // cache count should not be affected @@ -608,7 +608,7 @@ public void testStaleCount_OnRemovalNotificationOfStaleKey_DoesNotDecrementsStal ); cache.onRemoval(new RemovalNotification(key, termBytes, RemovalReason.EVICTED)); - staleKeysCount = cache.cacheCleanupManager.getStaleKeysCountForTesting(); + staleKeysCount = cache.cacheCleanupManager.getStaleKeysCount(); // eviction of NON-stale key from the cache should NOT decrement staleKeysCount in iRC assertEquals(1, staleKeysCount.get()); From 3dce4bd38dcd585ae17d5186dba6eb25ab441a8d Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Thu, 14 Mar 2024 17:37:38 -0700 Subject: [PATCH 20/29] rename threshold to stalenessThreshold Signed-off-by: Kiran Prakash --- .../java/org/opensearch/indices/IndicesRequestCache.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 40986c43f68cb..dc842e093b360 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -577,13 +577,13 @@ private void forceCleanCache() { /** * Cleans the cache based on the provided staleness threshold. *

If the percentage of stale keys in the cache is less than this threshold,the cache cleanup process is skipped. - * @param threshold The staleness threshold as a double. + * @param stalenessThreshold The staleness threshold as a double. */ - private void cleanCache(double threshold) { + private void cleanCache(double stalenessThreshold) { if (logger.isDebugEnabled()) { - logger.debug("Cleaning Indices Request Cache with threshold : " + threshold); + logger.debug("Cleaning Indices Request Cache with threshold : " + stalenessThreshold); } - if (canSkipCacheCleanup(threshold)) { + if (canSkipCacheCleanup(stalenessThreshold)) { return; } // Contains CleanupKey objects with open shard but invalidated readerCacheKeyId. From d05bd10c3696fe31d485d4d63c6495ab43e6b01a Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Thu, 14 Mar 2024 17:38:14 -0700 Subject: [PATCH 21/29] check for cleanupKey.entity == null Signed-off-by: Kiran Prakash --- .../java/org/opensearch/indices/IndicesRequestCache.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index dc842e093b360..6d6a1c11a05c1 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -468,7 +468,7 @@ void enqueueCleanupKey(CleanupKey cleanupKey) { * @param cleanupKey the CleanupKey to be updated in the map */ private void updateCleanupKeyToCountMapOnCacheInsertion(CleanupKey cleanupKey) { - if (stalenessThreshold == 0.0) { + if (stalenessThreshold == 0.0 || cleanupKey.entity == null) { return; } IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity(); @@ -485,7 +485,7 @@ private void updateCleanupKeyToCountMapOnCacheInsertion(CleanupKey cleanupKey) { } private synchronized void updateCleanupKeyToCountMapOnCacheEviction(CleanupKey cleanupKey) { - if (stalenessThreshold == 0.0) { + if (stalenessThreshold == 0.0 || cleanupKey.entity == null) { return; } IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity(); @@ -521,6 +521,7 @@ private synchronized void updateCleanupKeyToCountMapOnCacheEviction(CleanupKey c * @param cleanupKey the CleanupKey that has been marked for cleanup */ private void incrementStaleKeysCount(CleanupKey cleanupKey) { + if (stalenessThreshold == 0.0 || cleanupKey.entity == null) { return; } IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity(); From 50b624074a66ef072c32ad6a3863adaa51c9e796 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Thu, 14 Mar 2024 17:38:56 -0700 Subject: [PATCH 22/29] use computeIfPresent with keycountmap Signed-off-by: Kiran Prakash --- .../java/org/opensearch/indices/IndicesRequestCache.java | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 6d6a1c11a05c1..fb1ae362eae4f 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -497,13 +497,10 @@ private synchronized void updateCleanupKeyToCountMapOnCacheEviction(CleanupKey c // If the key doesn't exist, ignore ConcurrentMap keyCountMap = cleanupKeyToCountMap.get(shardId); if (keyCountMap != null) { - keyCountMap.compute(cleanupKey.readerCacheKeyId, (key, currentValue) -> { - // If the key is not present, no action is needed, return null. - if (currentValue == null) return null; - // Calculate the new value. - int newValue = currentValue - 1; + keyCountMap.computeIfPresent(cleanupKey.readerCacheKeyId, (key, currentValue) -> { // decrement the stale key count staleKeysCount.decrementAndGet(); + int newValue = currentValue - 1; // Remove the key if the new value is zero by returning null; otherwise, update with the new value. return newValue == 0 ? null : newValue; }); From ab019421fc0f9c20aa0b44467dc3335435bdceb3 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Thu, 14 Mar 2024 17:39:37 -0700 Subject: [PATCH 23/29] log both staleKeysInCache & Staleness in debug logs Signed-off-by: Kiran Prakash --- .../java/org/opensearch/indices/IndicesRequestCache.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index fb1ae362eae4f..63e4de934988f 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -652,10 +652,14 @@ private synchronized boolean canSkipCacheCleanup(double cleanThresholdPercent) { if (cleanThresholdPercent == 0.0) { return false; } - if (staleKeysInCachePercentage() < cleanThresholdPercent) { + double staleKeysInCachePercentage = staleKeysInCachePercentage(); + if (staleKeysInCachePercentage < cleanThresholdPercent) { if (logger.isDebugEnabled()) { logger.debug( - "Skipping cache cleanup since the percentage of stale keys is less than the threshold : " + stalenessThreshold + "Skipping Indices Request cache cleanup since the percentage of stale keys : " + + staleKeysInCachePercentage + + " is less than the threshold : " + + stalenessThreshold ); } return true; From fdb921ba1e7e53076b68c2057825f85e1958d191 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Fri, 15 Mar 2024 09:39:28 -0700 Subject: [PATCH 24/29] Use HashMap instead of ConcurrentMap Signed-off-by: Kiran Prakash --- .../java/org/opensearch/indices/IndicesRequestCache.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 63e4de934988f..87eacf6f109ad 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -69,6 +69,7 @@ import java.io.IOException; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.Objects; @@ -431,7 +432,7 @@ public int hashCode() { * */ class IndicesRequestCacheCleanupManager implements Closeable { private final Set keysToClean; - private final ConcurrentMap> cleanupKeyToCountMap; + private final ConcurrentMap> cleanupKeyToCountMap; private final AtomicInteger staleKeysCount; private final double stalenessThreshold; private final IndicesRequestCacheCleaner cacheCleaner; @@ -480,8 +481,7 @@ private void updateCleanupKeyToCountMapOnCacheInsertion(CleanupKey cleanupKey) { // If the key doesn't exist, it's added with a value of 1. // If the key exists, its value is incremented by 1. - cleanupKeyToCountMap.computeIfAbsent(shardId, k -> ConcurrentCollections.newConcurrentMap()) - .merge(cleanupKey.readerCacheKeyId, 1, Integer::sum); + cleanupKeyToCountMap.computeIfAbsent(shardId, k -> new HashMap<>()).merge(cleanupKey.readerCacheKeyId, 1, Integer::sum); } private synchronized void updateCleanupKeyToCountMapOnCacheEviction(CleanupKey cleanupKey) { @@ -495,7 +495,7 @@ private synchronized void updateCleanupKeyToCountMapOnCacheEviction(CleanupKey c } ShardId shardId = indexShard.shardId(); // If the key doesn't exist, ignore - ConcurrentMap keyCountMap = cleanupKeyToCountMap.get(shardId); + HashMap keyCountMap = cleanupKeyToCountMap.get(shardId); if (keyCountMap != null) { keyCountMap.computeIfPresent(cleanupKey.readerCacheKeyId, (key, currentValue) -> { // decrement the stale key count From cb2cb3c74c541698d2276f09fa6a70294e286948 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Fri, 15 Mar 2024 11:37:12 -0700 Subject: [PATCH 25/29] Address b/w compatibility Signed-off-by: Kiran Prakash --- .../main/java/org/opensearch/indices/IndicesRequestCache.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 87eacf6f109ad..9c9549a2deeb9 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -81,6 +81,8 @@ import java.util.function.Function; import java.util.function.ToLongBiFunction; +import static org.opensearch.indices.IndicesService.INDICES_CACHE_CLEAN_INTERVAL_SETTING; + /** * The indices request cache allows to cache a shard level request stage responses, helping with improving * similar requests that are potentially expensive (because of aggs for example). The cache is fully coherent @@ -122,7 +124,7 @@ public final class IndicesRequestCache implements RemovalListener INDICES_REQUEST_CACHE_CLEAN_INTERVAL_SETTING = Setting.positiveTimeSetting( "indices.requests.cache.cleanup.interval", - TimeValue.timeValueMinutes(1), + INDICES_CACHE_CLEAN_INTERVAL_SETTING, Property.NodeScope ); public static final Setting INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING = new Setting<>( From a4fb34c59e1d6785486f020bd777276d34906c4e Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Fri, 15 Mar 2024 12:24:55 -0700 Subject: [PATCH 26/29] remove synchronized for updateCleanupKeyToCountMapOnCacheEviction Signed-off-by: Kiran Prakash --- .../org/opensearch/indices/IndicesRequestCache.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 9c9549a2deeb9..842751af8bc52 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -486,7 +486,7 @@ private void updateCleanupKeyToCountMapOnCacheInsertion(CleanupKey cleanupKey) { cleanupKeyToCountMap.computeIfAbsent(shardId, k -> new HashMap<>()).merge(cleanupKey.readerCacheKeyId, 1, Integer::sum); } - private synchronized void updateCleanupKeyToCountMapOnCacheEviction(CleanupKey cleanupKey) { + private void updateCleanupKeyToCountMapOnCacheEviction(CleanupKey cleanupKey) { if (stalenessThreshold == 0.0 || cleanupKey.entity == null) { return; } @@ -496,9 +496,8 @@ private synchronized void updateCleanupKeyToCountMapOnCacheEviction(CleanupKey c return; } ShardId shardId = indexShard.shardId(); - // If the key doesn't exist, ignore - HashMap keyCountMap = cleanupKeyToCountMap.get(shardId); - if (keyCountMap != null) { + + cleanupKeyToCountMap.computeIfPresent(shardId, (shard, keyCountMap) -> { keyCountMap.computeIfPresent(cleanupKey.readerCacheKeyId, (key, currentValue) -> { // decrement the stale key count staleKeysCount.decrementAndGet(); @@ -506,7 +505,8 @@ private synchronized void updateCleanupKeyToCountMapOnCacheEviction(CleanupKey c // Remove the key if the new value is zero by returning null; otherwise, update with the new value. return newValue == 0 ? null : newValue; }); - } + return keyCountMap; + }); } /** From 15f8fc1bceb5773e71e3a0126b8fec2cbe391f04 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Fri, 15 Mar 2024 16:41:17 -0700 Subject: [PATCH 27/29] make cleanCache synchronized Signed-off-by: Kiran Prakash --- .../main/java/org/opensearch/indices/IndicesRequestCache.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 842751af8bc52..f5f1a7346ed46 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -579,7 +579,7 @@ private void forceCleanCache() { *

If the percentage of stale keys in the cache is less than this threshold,the cache cleanup process is skipped. * @param stalenessThreshold The staleness threshold as a double. */ - private void cleanCache(double stalenessThreshold) { + private synchronized void cleanCache(double stalenessThreshold) { if (logger.isDebugEnabled()) { logger.debug("Cleaning Indices Request Cache with threshold : " + stalenessThreshold); } From 1a67f007bd1baaafd6a9f1ea63ceed39ec41a516 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Fri, 15 Mar 2024 16:42:01 -0700 Subject: [PATCH 28/29] remove shouldRemoveKey Signed-off-by: Kiran Prakash --- .../indices/IndicesRequestCache.java | 33 ++++--------------- 1 file changed, 6 insertions(+), 27 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index f5f1a7346ed46..62f1c5b3a834b 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -608,39 +608,18 @@ private synchronized void cleanCache(double stalenessThreshold) { for (Iterator iterator = cache.keys().iterator(); iterator.hasNext();) { Key key = iterator.next(); - if (shouldRemoveKey(key, cleanupKeysFromOutdatedReaders, cleanupKeysFromClosedShards)) { + if (cleanupKeysFromClosedShards.contains(key.shardId)) { iterator.remove(); + } else { + CleanupKey cleanupKey = new CleanupKey(cacheEntityLookup.apply(key.shardId).orElse(null), key.readerCacheKeyId); + if(cleanupKeysFromOutdatedReaders.contains(cleanupKey)) { + iterator.remove(); + } } } cache.refresh(); } - /** - * Determines whether a key should be removed from the cache. - * - *

This method checks if the key's shardId is present in the cleanupKeysFromClosedShards set, - * indicating that the shard has been closed and the key should be removed. If the shardId is not present, - * it checks if the key's readerCacheKeyId is present in the cleanupKeysFromOutdatedReaders set, - * indicating that the reader has been invalidated and the key should be removed. - * - * @param key The key to check for removal. - * @param cleanupKeysFromOutdatedReaders A set of CleanupKeys with open shard but invalidated readerCacheKeyId. - * @param cleanupKeysFromClosedShards A set of CleanupKeys of a closed shard. - * @return true if the key should be removed, false otherwise. - */ - private synchronized boolean shouldRemoveKey( - Key key, - Set cleanupKeysFromOutdatedReaders, - Set cleanupKeysFromClosedShards - ) { - if (cleanupKeysFromClosedShards.contains(key.shardId)) { - return true; - } else { - CleanupKey cleanupKey = new CleanupKey(cacheEntityLookup.apply(key.shardId).orElse(null), key.readerCacheKeyId); - return cleanupKeysFromOutdatedReaders.contains(cleanupKey); - } - } - /** * Determines whether the cache cleanup process can be skipped based on the staleness threshold. * From dbbdff042bdf1dc29f8d4de43e492171cfd85197 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Fri, 15 Mar 2024 16:43:32 -0700 Subject: [PATCH 29/29] spotlessApply Signed-off-by: Kiran Prakash --- .../main/java/org/opensearch/indices/IndicesRequestCache.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 62f1c5b3a834b..11a226caab8cf 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -612,7 +612,7 @@ private synchronized void cleanCache(double stalenessThreshold) { iterator.remove(); } else { CleanupKey cleanupKey = new CleanupKey(cacheEntityLookup.apply(key.shardId).orElse(null), key.readerCacheKeyId); - if(cleanupKeysFromOutdatedReaders.contains(cleanupKey)) { + if (cleanupKeysFromOutdatedReaders.contains(cleanupKey)) { iterator.remove(); } }